在定时任务中,有这样一个场景,定时给某些用户发送消息,或者定时给某些数据进行对账,而这些场景有一个要求是,下次处理是上次处理的定时间隔(比如下次的1分钟)等,这样就会出现,每秒处理的数据和用户是不一样的。比如13:13:13 对数据A处理失败了(处理失败常见,非常规意义的失败),那下一分钟接着处理A(13:14:13),13:13:14 对数据B处理失败了那下一分钟接着处理B(13:14:14)。这样就会发现,每分钟的60秒都在处理一批批数据。类似于时间轮一样的轮循。

当然,也可以使用消息队列的延时队列,但这种情况会造成,一个消息的堆积,而且无法处理这种清空,比如A,B的任务相差一分钟,那在A的2分钟后是发送A的第2次,而B是第1次,这两次是同时发送的,消息队列比较难处理这种情况,同时对于消息的标记也比较麻烦。
方案:可以使用go里的定时器,每秒触发一次任务,对任务进行轮循处理,简单的demo如下:(这里忽略了任务的进入,具体可以使用kafka结合到达的时间点来标记队列的轮片)
package main
import (
"fmt"
"os"
"os/signal"
"syscall"
"time"
)
type Task struct {
msg string //任务的内容, 具体可以是一个复杂的结构体对象
pri int // 任务的优先级,在对同一个bucket的数据,可以按照优先级来处理
idx int // bucket 的标识
status bool // 任务标识,标识任务是否执行成功,是否需要删除
}
func (t *Task) runTask() { //简单的执行任务
fmt.Println("run message", t.msg)
t.status = true
}
var taskList = map[int][]Task{}
func sendTask(idx int) {
msg := fmt.Sprintf("task message %d", idx)
pri := idx / 60
idx = idx % 60
task := Task{
msg,
pri,
idx,
false,
}
taskList[idx] = append(taskList[idx], task)
}
/**
* 假设 i是任务的id号,表示有一个150个任务要进如队列审核
*/
func initTask() {
for i := 0; i < 150; i++ {
sendTask(i)
}
}
var ticker = time.NewTicker(1 * time.Second)
var cc = 0 //轮片指针
func main() {
c := make(chan os.Signal)
status:=true
signal.Notify(c,
syscall.SIGKILL,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGQUIT,
os.Interrupt,
os.Kill,
)
initTask()
go func() {
for {
select {
case <-ticker.C:
for _, t := range taskList[cc] {
if t.status == false {
t.runTask()
}
}
cc += 1
cc = cc%60 //循环轮询
case <-c: //监听 信号
ticker.Stop()
fmt.Println("kill task")
status = false
break
}
}
}()
for {// 常驻
time.Sleep(1*time.Second)
if status == false {
break
}
}
}
如有不对的地方,请指正
网友评论