时间轮所有任务共用一个回调函数

pull/21/merge
ouqiang 2017-05-17 11:49:54 +08:00
parent 93bb9f90b1
commit 28eda835c0
2 changed files with 27 additions and 15 deletions

View File

@ -16,7 +16,7 @@ type DelayTask struct {}
// 从数据库中取出所有延迟任务 // 从数据库中取出所有延迟任务
func (task *DelayTask) Initialize(tick time.Duration, slots int) { func (task *DelayTask) Initialize(tick time.Duration, slots int) {
tw = timewheel.New(tick, slots) tw = timewheel.New(tick, slots, task.Run)
tw.Start() tw.Start()
taskModel := new(models.DelayTask) taskModel := new(models.DelayTask)
currentTime := time.Now() currentTime := time.Now()
@ -57,18 +57,28 @@ func (task *DelayTask) Add(taskModel models.DelayTask) {
currentTimestamp := time.Now().Unix() currentTimestamp := time.Now().Unix()
execTimestamp := taskModel.Created.Unix() + int64(taskModel.Delay) execTimestamp := taskModel.Created.Unix() + int64(taskModel.Delay)
// 时间过期, 立即执行任务 // 时间过期, 立即执行任务
data := []interface{}{taskModel.Id, taskModel.Url, taskModel.Params}
if execTimestamp <= currentTimestamp { if execTimestamp <= currentTimestamp {
go task.Run(taskModel.Id, taskModel.Url, taskModel.Params) go task.Run(data)
return return
} }
delay := execTimestamp - currentTimestamp delay := execTimestamp - currentTimestamp
tw.Add(time.Duration(delay) * time.Second, func() { tw.Add(time.Duration(delay) * time.Second, data)
task.Run(taskModel.Id, taskModel.Url, taskModel.Params)
})
} }
// 运行任务 // 运行任务
func (task *DelayTask) Run(id int64, url, params string) { func (task *DelayTask) Run(data []interface{}) {
if len(data) < 3 {
logger.Errorf("延时任务开始执行#参数不足#%+v", data)
return
}
id := data[0].(int64)
url := data[1].(string)
params := data[2].(string)
if id <= 0 || url == "" {
logger.Errorf("延时任务开始执行#参数为空#%+v", data)
return
}
taskModel := new(models.DelayTask) taskModel := new(models.DelayTask)
_, err := taskModel.UpdateStatus(id, models.Running) _, err := taskModel.UpdateStatus(id, models.Running)
if err != nil { if err != nil {

View File

@ -7,12 +7,15 @@ import (
// @author qiang.ou<qingqianludao@gmail.com> // @author qiang.ou<qingqianludao@gmail.com>
type Job func([]interface{})
type TimeWheel struct { type TimeWheel struct {
interval time.Duration interval time.Duration
ticker *time.Ticker ticker *time.Ticker
slots []*list.List slots []*list.List
currentPos int currentPos int
slotNum int slotNum int
job Job
taskChannel chan Task taskChannel chan Task
stopChannel chan bool stopChannel chan bool
} }
@ -21,19 +24,18 @@ type TimeWheel struct {
type Task struct { type Task struct {
delay time.Duration delay time.Duration
circle int circle int
job Job data []interface{}
} }
type Job func() func New(interval time.Duration, slotNum int, job Job) *TimeWheel {
if interval <= 0 || slotNum <= 0 || job == nil {
func New(interval time.Duration, slotNum int) *TimeWheel {
if interval <= 0 || slotNum <= 0 {
return nil return nil
} }
tw := &TimeWheel{ tw := &TimeWheel{
interval: interval, interval: interval,
slots: make([]*list.List, slotNum), slots: make([]*list.List, slotNum),
currentPos: 0, currentPos: 0,
job: job,
slotNum: slotNum, slotNum: slotNum,
taskChannel: make(chan Task), taskChannel: make(chan Task),
stopChannel: make(chan bool), stopChannel: make(chan bool),
@ -55,11 +57,11 @@ func (tw *TimeWheel) Start() {
go tw.start() go tw.start()
} }
func (tw *TimeWheel) Add(delay time.Duration, job Job) { func (tw *TimeWheel) Add(delay time.Duration, data []interface{}) {
if delay < 0 || job == nil { if delay <= 0 {
return return
} }
tw.taskChannel <- Task{delay:delay, job: job} tw.taskChannel <- Task{delay:delay, data: data}
} }
func (tw *TimeWheel) Stop() { func (tw *TimeWheel) Stop() {
@ -99,7 +101,7 @@ func (tw *TimeWheel) scanAndRunTask(l *list.List) {
continue continue
} }
go task.job() go tw.job(task.data)
next := e.Next() next := e.Next()
l.Remove(e) l.Remove(e)
e = next e = next