gocron/service/delay_task.go

133 lines
4.1 KiB
Go

package service
import (
"github.com/ouqiang/gocron/models"
"time"
"github.com/ouqiang/gocron/modules/logger"
"math"
"github.com/ouqiang/gocron/modules/httpclient"
"strings"
"github.com/ouqiang/timewheel"
"fmt"
"github.com/ouqiang/gocron/modules/app"
)
var tw *timewheel.TimeWheel
type DelayTask struct {}
// 从数据库中取出所有延迟任务
func (task *DelayTask) Initialize(tick time.Duration, slots int) {
tw = timewheel.New(tick, slots, task.Run)
tw.Start()
taskModel := new(models.DelayTask)
currentTime := time.Now()
taskNum, err := taskModel.ActiveNum(currentTime)
if err != nil {
logger.Error("延迟任务初始化#获取待执行的任务失败", err)
return
}
if taskNum == 0 {
logger.Debugf("延迟任务初始化#待执行的任务数量为0")
return
}
pageSize := 100
totalPage := int( math.Ceil(float64(taskNum) / float64(pageSize)) )
logger.Infof("延迟任务初始化#待执行的任务数量-%d#共%d页#每页取%d条", taskNum, totalPage, pageSize)
taskModel.PageSize = pageSize
for page := 1; page <= totalPage; page++ {
taskModel.Page = page
logger.Debugf("延迟任务初始化#取出任务列表#第%d页", page)
taskList, err := taskModel.ActiveList(currentTime)
if err != nil {
logger.Error("延迟任务初始化#获取任务列表失败", err)
}
task.BatchAdd(taskList)
}
logger.Info("延迟任务初始化完成")
}
// 批量添加任务
func (task *DelayTask) BatchAdd(taskList []models.DelayTask) {
for _, item := range(taskList) {
task.Add(item)
}
}
// 添加任务
func (task *DelayTask) Add(taskModel models.DelayTask) {
currentTimestamp := time.Now().Unix()
execTimestamp := taskModel.Created.Unix() + int64(taskModel.Delay)
// 时间过期, 立即执行任务
data := []interface{}{taskModel.Id, taskModel.Url, taskModel.Params}
if execTimestamp <= currentTimestamp {
go task.Run(data)
return
}
delay := execTimestamp - currentTimestamp
tw.Add(time.Duration(delay) * time.Second, data)
}
// 运行任务
func (task *DelayTask) Run(data []interface{}) {
if len(data) < 3 {
logger.Errorf("延时任务开始执行#参数不足#%+v", data)
return
}
id := data[0].(int64)
url := data[1].(string)
params := data[2].(string)
if id <= 0 || url == "" {
logger.Errorf("延时任务开始执行#参数为空#%+v", data)
return
}
taskModel := new(models.DelayTask)
_, err := taskModel.UpdateStatus(id, models.Running)
if err != nil {
logger.Error("延迟任务开始执行#更新任务状态失败", err)
return
}
timeout := 300
tryTimes := 3
success := false
logger.Infof("延迟任务开始执行#id-%d#url-%s#params-%s", id, url, params)
for i := 0; i < tryTimes; {
response := httpclient.PostParams(url, params, timeout)
if response.StatusCode == 200 && strings.TrimSpace(response.Body) == "success"{
success = true
break;
}
i++
if i < tryTimes {
msg := fmt.Sprintf("延迟任务执行失败#重试第%d次#任务Id-%d#HTTP状态码-%d#HTTP-BODY-%s",
i,id,response.StatusCode,response.Body)
logger.Error(msg)
FailureNotify(msg)
time.Sleep(30 * time.Second)
}
}
logger.Infof("延迟任务执行完成#id-%d", id)
var status models.Status
if success {
status = models.Finish
} else {
status = models.Failure
}
_ ,err = taskModel.UpdateStatus(id, status)
if err != nil {
logger.Error("延迟任务执行完成#更新任务状态失败", err)
}
}
func (task *DelayTask) Stop() {
tw.Stop()
}
func FailureNotify(message string) {
notifyUrl := app.Setting.Key("delay.task.failure.notify.url").String()
notifyUrl = strings.TrimSpace(notifyUrl)
if notifyUrl != "" {
params := fmt.Sprintf("error=%s", message)
httpclient.PostParams(notifyUrl, params, 60)
}
}