15.2 太牛了!任务超时控制和重试策略竟然还能这样实现?
在分布式任务调度系统中,任务超时控制和重试策略是确保系统稳定性和任务可靠执行的重要机制。今天我们将深入探讨如何实现这些关键功能。
任务超时控制机制
任务超时控制是防止任务执行时间过长而阻塞系统资源的重要手段。我们需要实现精确的超时检测和优雅的超时中断机制。
packagetimeoutimport("context""fmt""sync""time")// TimeoutManager 超时管理器typeTimeoutManagerstruct{tasksmap[string]*TaskTimeoutInfo mu sync.RWMutex stopChchanstruct{}}// TaskTimeoutInfo 任务超时信息typeTaskTimeoutInfostruct{TaskIDstringDeadline time.Time Timeout time.Duration CancelFunc context.CancelFunc Callback TimeoutCallback Status TimeoutStatus}// TimeoutStatus 超时状态typeTimeoutStatusintconst(TimeoutStatusPending TimeoutStatus=iotaTimeoutStatusRunning TimeoutStatusTimeout TimeoutStatusCompleted)// TimeoutCallback 超时回调函数typeTimeoutCallbackfunc(taskIDstring)error// NewTimeoutManager 创建超时管理器funcNewTimeoutManager()*TimeoutManager{tm:=&TimeoutManager{tasks:make(map[string]*TaskTimeoutInfo),stopCh:make(chanstruct{}),}// 启动超时检查协程gotm.run()returntm}// AddTask 添加任务超时监控func(tm*TimeoutManager)AddTask(taskIDstring,timeout time.Duration,callback TimeoutCallback)error{tm.mu.Lock()defertm.mu.Unlock()// 检查任务是否已存在if_,exists:=tm.tasks[taskID];exists{returnfmt.Errorf("task %s already exists",taskID)}// 创建上下文和取消函数ctx,cancel:=context.WithTimeout(context.Background(),timeout)// 创建任务超时信息taskInfo:=&TaskTimeoutInfo{TaskID:taskID,Deadline:time.Now().Add(timeout),Timeout:timeout,CancelFunc:cancel,Callback:callback,Status:TimeoutStatusPending,}tm.tasks[taskID]=taskInforeturnnil}// StartTask 开始任务超时监控func(tm*TimeoutManager)StartTask(taskIDstring)error{tm.mu.Lock()defertm.mu.Unlock()taskInfo,exists:=tm.tasks[taskID]if!exists{returnfmt.Errorf("task %s not found",taskID)}taskInfo.Status=TimeoutStatusRunning taskInfo.Deadline=time.Now().Add(taskInfo.Timeout)returnnil}// CompleteTask 完成任务func(tm*TimeoutManager)CompleteTask(taskIDstring)error{tm.mu.Lock()defertm.mu.Unlock()taskInfo,exists:=tm.tasks[taskID]if!exists{returnfmt.Errorf("task %s not found",taskID)}// 取消上下文iftaskInfo.CancelFunc!=nil{taskInfo.CancelFunc()}taskInfo.Status=TimeoutStatusCompleted// 从监控列表中移除delete(tm.tasks,taskID)returnnil}// run 运行超时检查func(tm*TimeoutManager)run(){ticker:=time.NewTicker(100*time.Millisecond)deferticker.Stop()for{select{case<-ticker.C:tm.checkTimeouts()case<-tm.stopCh:return}}}// checkTimeouts 检查超时任务func(tm*TimeoutManager)checkTimeouts(){tm.mu.Lock()defertm.mu.Unlock()now:=time.Now()vartimeoutTasks[]*TaskTimeoutInfo// 查找超时任务for_,taskInfo:=rangetm.tasks{iftaskInfo.Status==TimeoutStatusRunning&&now.After(taskInfo.Deadline){timeoutTasks=append(timeoutTasks,taskInfo)}}// 处理超时任务for_,taskInfo:=rangetimeoutTasks{tm.handleTimeout(taskInfo)}}// handleTimeout 处理超时任务func(tm*TimeoutManager)handleTimeout(taskInfo*TaskTimeoutInfo){taskInfo.Status=TimeoutStatusTimeout// 取消上下文iftaskInfo.CancelFunc!=nil{taskInfo.CancelFunc()}// 调用回调函数iftaskInfo.Callback!=nil{gofunc(){iferr:=taskInfo.Callback(taskInfo.TaskID);err!=nil{fmt.Printf("Timeout callback failed for task %s: %v\n",taskInfo.TaskID,err)}}()}</