gocron/service/task.go

390 lines
10 KiB
Go
Raw Normal View History

2017-03-10 09:24:06 +00:00
package service
import (
2017-04-07 01:22:00 +00:00
"github.com/ouqiang/gocron/models"
2017-04-02 02:38:49 +00:00
"strconv"
"time"
2017-04-07 01:22:00 +00:00
"github.com/ouqiang/gocron/modules/logger"
2017-04-13 09:35:59 +00:00
"github.com/jakecoffman/cron"
"errors"
"fmt"
2017-04-28 03:54:46 +00:00
"github.com/ouqiang/gocron/modules/httpclient"
"github.com/ouqiang/gocron/modules/notify"
2017-05-04 10:05:25 +00:00
"sync"
2017-05-26 10:09:07 +00:00
rpcClient "github.com/ouqiang/gocron/modules/rpc/client"
pb "github.com/ouqiang/gocron/modules/rpc/proto"
2017-06-08 10:04:55 +00:00
"strings"
2017-03-10 09:24:06 +00:00
)
2017-05-04 10:05:25 +00:00
// 定时任务调度管理器
2017-04-13 09:35:59 +00:00
var Cron *cron.Cron
2017-05-04 10:05:25 +00:00
// 同一任务是否有实例处于运行中
var runInstance Instance
2017-05-04 10:05:25 +00:00
// 任务计数-正在运行中的任务
var TaskNum TaskCount
// 任务计数
type TaskCount struct {
num int
sync.RWMutex
}
func (c *TaskCount) Add() {
c.Lock()
defer c.Unlock()
c.num += 1
}
func (c *TaskCount) Done() {
c.Lock()
defer c.Unlock()
c.num -= 1
}
func (c *TaskCount) Num() int {
c.RLock()
defer c.RUnlock()
return c.num
}
2017-06-08 10:04:55 +00:00
// 任务ID作为Key
type Instance struct {
Status map[int]bool
2017-06-08 10:04:55 +00:00
sync.RWMutex
}
// 是否有任务处于运行中
func (i *Instance) has(key int) bool {
2017-06-08 10:04:55 +00:00
i.RLock()
defer i.RUnlock()
running, ok := i.Status[key]
if ok && running {
return true
}
return false
}
func (i *Instance) add(key int) {
2017-06-08 10:04:55 +00:00
i.Lock()
defer i.Unlock()
i.Status[key] = true
}
func (i *Instance) done(key int) {
2017-06-08 10:04:55 +00:00
i.Lock()
defer i.Unlock()
delete(i.Status, key)
}
2017-04-13 09:35:59 +00:00
2017-04-02 02:19:52 +00:00
type Task struct{}
2017-03-10 09:24:06 +00:00
type TaskResult struct {
Result string
Err error
RetryTimes int8
}
2017-03-24 09:55:44 +00:00
// 初始化任务, 从数据库取出所有任务, 添加到定时任务并运行
2017-04-02 02:19:52 +00:00
func (task *Task) Initialize() {
2017-04-13 09:35:59 +00:00
Cron = cron.New()
Cron.Start()
2017-06-08 10:04:55 +00:00
runInstance = Instance{make(map[int]bool), sync.RWMutex{}}
2017-05-04 10:05:25 +00:00
TaskNum = TaskCount{0, sync.RWMutex{}}
2017-04-02 02:38:49 +00:00
taskModel := new(models.Task)
taskList, err := taskModel.ActiveList()
if err != nil {
logger.Error("定时任务初始化#获取任务列表错误-", err.Error())
2017-04-02 02:38:49 +00:00
return
}
if len(taskList) == 0 {
2017-04-03 07:27:19 +00:00
logger.Debug("任务列表为空")
2017-04-02 02:38:49 +00:00
return
}
2017-04-20 01:36:42 +00:00
task.BatchAdd(taskList)
}
// 批量添加任务
func (task *Task) BatchAdd(tasks []models.TaskHost) {
for _, item := range tasks {
2017-05-29 09:05:21 +00:00
task.Add(item)
2017-04-02 02:38:49 +00:00
}
2017-03-10 09:24:06 +00:00
}
// 添加任务
2017-05-29 09:05:21 +00:00
func (task *Task) Add(taskModel models.TaskHost) {
2017-06-08 10:04:55 +00:00
if taskModel.Level == models.TaskLevelChild {
logger.Errorf("添加任务失败#不允许添加子任务到调度器#任务Id-%d", taskModel.Id);
return
}
taskFunc := createJob(taskModel)
2017-04-02 02:38:49 +00:00
if taskFunc == nil {
logger.Error("创建任务处理Job失败,不支持的任务协议#", taskModel.Protocol)
2017-04-02 02:38:49 +00:00
return
}
2017-04-13 09:35:59 +00:00
cronName := strconv.Itoa(taskModel.Id)
// Cron任务采用数组存储, 删除任务需遍历数组, 并对数组重新赋值, 任务较多时,有性能问题
2017-04-13 09:35:59 +00:00
Cron.RemoveJob(cronName)
err := Cron.AddFunc(taskModel.Spec, taskFunc, cronName)
2017-04-14 10:05:34 +00:00
if err != nil {
logger.Error("添加任务到调度器失败#", err)
}
2017-03-10 09:24:06 +00:00
}
2017-05-04 10:05:25 +00:00
// 停止所有任务
func (task *Task) StopAll() {
Cron.Stop()
}
2017-04-21 05:36:45 +00:00
// 直接运行任务
func (task *Task) Run(taskModel models.TaskHost) {
2017-05-29 09:05:21 +00:00
go createJob(taskModel)()
2017-04-21 05:36:45 +00:00
}
2017-03-10 09:24:06 +00:00
type Handler interface {
2017-05-29 09:05:21 +00:00
Run(taskModel models.TaskHost) (string, error)
2017-03-10 09:24:06 +00:00
}
2017-04-16 18:01:41 +00:00
2017-03-23 05:31:16 +00:00
// HTTP任务
2017-04-02 02:19:52 +00:00
type HTTPHandler struct{}
2017-03-10 09:24:06 +00:00
// http任务执行时间不超过300秒
const HttpExecTimeout = 300
2017-05-29 09:05:21 +00:00
func (h *HTTPHandler) Run(taskModel models.TaskHost) (result string, err error) {
if taskModel.Timeout <= 0 || taskModel.Timeout > HttpExecTimeout {
taskModel.Timeout = HttpExecTimeout
}
2017-04-28 03:54:46 +00:00
resp := httpclient.Get(taskModel.Command, taskModel.Timeout)
// 返回状态码非200均为失败
if resp.StatusCode != 200 {
2017-04-28 03:54:46 +00:00
return resp.Body, errors.New(fmt.Sprintf("HTTP状态码非200-->%d", resp.StatusCode))
2017-04-02 02:38:49 +00:00
}
2017-04-28 03:54:46 +00:00
return resp.Body, err
2017-03-10 09:24:06 +00:00
}
2017-05-26 10:09:07 +00:00
// RPC调用执行任务
type RPCHandler struct {}
2017-03-10 09:24:06 +00:00
2017-05-29 09:05:21 +00:00
func (h *RPCHandler) Run(taskModel models.TaskHost) (result string, err error) {
2017-05-26 10:09:07 +00:00
taskRequest := new(pb.TaskRequest)
taskRequest.Timeout = int32(taskModel.Timeout)
taskRequest.Command = taskModel.Command
2017-05-01 05:59:52 +00:00
2017-06-08 13:25:42 +00:00
return rpcClient.ExecWithRetry(taskModel.Name, taskModel.Port, taskRequest)
2017-03-23 05:31:16 +00:00
}
2017-05-26 10:09:07 +00:00
// 创建任务日志
2017-05-29 09:05:21 +00:00
func createTaskLog(taskModel models.TaskHost, status models.Status) (int64, error) {
2017-04-02 02:38:49 +00:00
taskLogModel := new(models.TaskLog)
2017-04-14 10:05:34 +00:00
taskLogModel.TaskId = taskModel.Id
taskLogModel.Name = taskModel.Task.Name
2017-04-06 06:40:48 +00:00
taskLogModel.Spec = taskModel.Spec
taskLogModel.Protocol = taskModel.Protocol
taskLogModel.Command = taskModel.Command
taskLogModel.Timeout = taskModel.Timeout
2017-05-26 10:09:07 +00:00
if taskModel.Protocol == models.TaskRPC {
taskLogModel.Hostname = taskModel.Alias + "-" + taskModel.Name
}
2017-04-02 02:38:49 +00:00
taskLogModel.StartTime = time.Now()
taskLogModel.Status = status
2017-04-02 02:38:49 +00:00
insertId, err := taskLogModel.Create()
2017-03-23 05:31:16 +00:00
2017-05-26 10:09:07 +00:00
return insertId, err
2017-03-24 05:06:53 +00:00
}
// 更新任务日志
func updateTaskLog(taskLogId int64, taskResult TaskResult) (int64, error) {
2017-04-02 02:38:49 +00:00
taskLogModel := new(models.TaskLog)
var status models.Status
2017-04-21 09:41:59 +00:00
var result string = taskResult.Result
if taskResult.Err != nil {
2017-04-02 02:38:49 +00:00
status = models.Failure
2017-05-26 10:09:07 +00:00
} else {
2017-04-02 02:38:49 +00:00
status = models.Finish
}
return taskLogModel.Update(taskLogId, models.CommonMap{
2017-04-21 09:41:59 +00:00
"retry_times": taskResult.RetryTimes,
2017-04-02 02:38:49 +00:00
"status": status,
"result": result,
})
2017-03-23 05:31:16 +00:00
}
2017-05-29 09:05:21 +00:00
func createJob(taskModel models.TaskHost) cron.FuncJob {
var handler Handler = createHandler(taskModel)
if handler == nil {
return nil
}
2017-04-02 02:38:49 +00:00
taskFunc := func() {
2017-05-04 10:05:25 +00:00
TaskNum.Add()
defer TaskNum.Done()
2017-05-26 10:09:07 +00:00
taskLogId := beforeExecJob(taskModel)
2017-05-04 02:47:14 +00:00
if taskLogId <= 0 {
2017-04-02 02:38:49 +00:00
return
}
2017-05-08 06:07:06 +00:00
logger.Infof("开始执行任务#%s#命令-%s", taskModel.Task.Name, taskModel.Command)
taskResult := execJob(handler, taskModel)
2017-05-08 06:07:06 +00:00
logger.Infof("任务完成#%s#命令-%s", taskModel.Task.Name, taskModel.Command)
2017-05-04 02:47:14 +00:00
afterExecJob(taskModel, taskResult, taskLogId)
2017-04-02 02:38:49 +00:00
}
return taskFunc
2017-04-02 02:19:52 +00:00
}
2017-05-29 09:05:21 +00:00
func createHandler(taskModel models.TaskHost) Handler {
var handler Handler = nil
switch taskModel.Protocol {
case models.TaskHTTP:
handler = new(HTTPHandler)
2017-05-26 10:09:07 +00:00
case models.TaskRPC:
handler = new(RPCHandler)
}
2017-05-29 09:05:21 +00:00
return handler;
}
2017-06-08 10:04:55 +00:00
// 任务前置操作
2017-05-29 09:05:21 +00:00
func beforeExecJob(taskModel models.TaskHost) (taskLogId int64) {
2017-05-04 02:47:14 +00:00
if taskModel.Multi == 0 && runInstance.has(taskModel.Id) {
2017-05-26 10:09:07 +00:00
createTaskLog(taskModel, models.Cancel)
2017-05-04 02:47:14 +00:00
return
}
if taskModel.Multi == 0 {
runInstance.add(taskModel.Id)
}
2017-05-26 10:09:07 +00:00
taskLogId, err := createTaskLog(taskModel, models.Running)
2017-05-04 02:47:14 +00:00
if err != nil {
logger.Error("任务开始执行#写入任务日志失败-", err)
return
}
logger.Debugf("任务命令-%s", taskModel.Command)
2017-05-04 02:47:14 +00:00
return taskLogId
}
2017-06-08 10:04:55 +00:00
// 任务执行后置操作
2017-05-29 09:05:21 +00:00
func afterExecJob(taskModel models.TaskHost, taskResult TaskResult, taskLogId int64) {
2017-05-04 02:47:14 +00:00
if taskResult.Err != nil {
taskResult.Result = taskResult.Err.Error() + "\n" + taskResult.Result
}
_, err := updateTaskLog(taskLogId, taskResult)
if err != nil {
logger.Error("任务结束#更新任务日志失败-", err)
}
2017-06-08 10:04:55 +00:00
// 发送邮件
go SendNotification(taskModel, taskResult)
// 执行依赖任务
go execDependencyTask(taskModel, taskResult)
}
// 执行依赖任务, 多个任务并发执行
func execDependencyTask(taskModel models.TaskHost, taskResult TaskResult) {
// 父任务才能执行子任务
if taskModel.Level != models.TaskLevelParent {
return
}
// 是否存在子任务
dependencyTaskId := strings.TrimSpace(taskModel.DependencyTaskId)
if dependencyTaskId == "" {
return
}
// 父子任务关系为强依赖, 父任务执行失败, 不执行依赖任务
if taskModel.DependencyStatus == models.TaskDependencyStatusStrong && taskResult.Err != nil {
logger.Infof("父子任务为强依赖关系, 父任务执行失败, 不运行依赖任务#主任务ID-%d", taskModel.Id)
return
}
// 获取子任务
model := new(models.Task)
tasks , err := model.GetDependencyTaskList(dependencyTaskId)
if err != nil {
logger.Errorf("获取依赖任务失败#主任务ID-%d#%s", taskModel.Id, err.Error())
return
}
if len(tasks) == 0 {
logger.Errorf("依赖任务列表为空#主任务ID-%d", taskModel.Id)
}
serviceTask := new(Task)
for _, task := range tasks {
task.Spec = fmt.Sprintf("依赖任务(主任务ID-%d)", taskModel.Id)
serviceTask.Run(task)
}
2017-05-04 02:47:14 +00:00
}
// 发送任务结果通知
2017-05-29 09:05:21 +00:00
func SendNotification(taskModel models.TaskHost, taskResult TaskResult) {
2017-05-04 02:47:14 +00:00
var statusName string
// 未开启通知
if taskModel.NotifyStatus == 0 {
return
}
if taskModel.NotifyStatus == 1 && taskResult.Err == nil {
// 执行失败才发送通知
return
}
2017-05-10 09:58:05 +00:00
if taskModel.NotifyReceiverId == "" {
return
}
2017-05-04 02:47:14 +00:00
if taskResult.Err != nil {
statusName = "失败"
} else {
statusName = "成功"
}
// 发送通知
msg := notify.Message{
"task_type": taskModel.NotifyType,
"task_receiver_id": taskModel.NotifyReceiverId,
"name": taskModel.Task.Name,
"output": taskResult.Result,
"status": statusName,
"taskId": taskModel.Id,
};
notify.Push(msg)
}
// 执行具体任务
2017-05-29 09:05:21 +00:00
func execJob(handler Handler, taskModel models.TaskHost) TaskResult {
2017-05-29 07:30:59 +00:00
defer func() {
if err := recover(); err != nil {
logger.Error("panic#service/task.go:execJob#", err)
}
} ()
2017-05-04 02:47:14 +00:00
if taskModel.Multi == 0 {
defer runInstance.done(taskModel.Id)
}
// 默认只运行任务一次
var execTimes int8 = 1
if (taskModel.RetryTimes > 0) {
execTimes += taskModel.RetryTimes
}
var i int8 = 0
var output string
var err error
for i < execTimes {
output, err = handler.Run(taskModel)
if err == nil {
return TaskResult{Result: output, Err: err, RetryTimes: i}
}
i++
if i < execTimes {
2017-04-21 09:41:59 +00:00
logger.Warnf("任务执行失败#任务id-%d#重试第%d次#输出-%s#错误-%s", taskModel.Id, i, output, err.Error())
// 重试间隔时间每次递增1分钟
time.Sleep( time.Duration(i) * time.Minute)
}
}
return TaskResult{Result: output, Err: err, RetryTimes: taskModel.RetryTimes}
}