diff --git a/README.md b/README.md index 92c8760..4a623b3 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,7 @@ * 任务执行失败重试设置 * 任务超时设置 * 延时任务 +* 任务依赖 * 任务类型 * shell任务 > 在远程服务器上执行shell命令 @@ -65,6 +66,7 @@ ### 命令 * gocron web + * --host 默认0.0.0.0 * -p 端口, 指定端口, 默认5920 * -e 指定运行环境, dev|test|prod, dev模式下可查看更多日志信息, 默认prod * -d 后台运行 diff --git a/cmd/web.go b/cmd/web.go index 111fbf8..357d079 100644 --- a/cmd/web.go +++ b/cmd/web.go @@ -31,6 +31,11 @@ var CmdWeb = cli.Command{ Usage: "run web server", Action: runWeb, Flags: []cli.Flag{ + cli.StringFlag{ + Name: "host", + Value: "0.0.0.0", + Usage: "bind host", + }, cli.IntFlag{ Name: "port,p", Value: DefaultPort, @@ -66,9 +71,10 @@ func runWeb(ctx *cli.Context) { routers.Register(m) // 注册中间件. routers.RegisterMiddleware(m) + host := parseHost(ctx) port := parsePort(ctx) fmt.Println("server start") - m.Run(port) + m.Run(host, port) } func becomeDaemon(ctx *cli.Context) { @@ -151,6 +157,14 @@ func parsePort(ctx *cli.Context) int { return port } +func parseHost(ctx *cli.Context) string { + if ctx.IsSet("host") { + return ctx.String("host") + } + + return "0.0.0.0" +} + func setEnvironment(ctx *cli.Context) { var env string = "prod" if ctx.IsSet("env") { diff --git a/models/model.go b/models/model.go index 768866b..d0ada72 100644 --- a/models/model.go +++ b/models/model.go @@ -10,6 +10,7 @@ import ( "github.com/ouqiang/gocron/modules/logger" "github.com/ouqiang/gocron/modules/app" "strconv" + "time" ) type Status int8 @@ -93,6 +94,7 @@ func CreateDb() *xorm.Engine { engine.Logger().SetLevel(core.LOG_DEBUG) } + go keepDbAlived(engine) return engine } @@ -138,4 +140,12 @@ func getDbConfig() map[string]string { db["max_open_conns"] = app.Setting.Key("db.max.open.conns").String() return db +} + +func keepDbAlived(engine *xorm.Engine) { + t := time.Tick(180 * time.Second) + for { + <- t + engine.Ping() + } } \ No newline at end of file diff --git a/models/task.go b/models/task.go index 80895be..fb09f7b 100644 --- a/models/task.go +++ b/models/task.go @@ -4,6 +4,7 @@ import ( "time" "github.com/go-xorm/xorm" "errors" + "strings" ) type TaskProtocol int8 @@ -13,22 +14,39 @@ const ( TaskRPC // RPC方式执行命令 ) +type TaskLevel int8 + +const ( + TaskLevelParent TaskLevel = 1 // 父任务 + TaskLevelChild TaskLevel = 2 // 子任务(依赖任务) +) + +type TaskDependencyStatus int8 + +const ( + TaskDependencyStatusStrong TaskDependencyStatus = 1 // 强依赖 + TaskDependencyStatusWeak TaskDependencyStatus = 2 // 弱依赖 +) + // 任务 type Task struct { Id int `xorm:"int pk autoincr"` Name string `xorm:"varchar(32) notnull"` // 任务名称 + Level TaskLevel `xorm:"smallint notnull index default 1"` // 任务等级 1: 主任务 2: 依赖任务 + DependencyTaskId string `xorm:"varchar(64) notnull default ''"` // 依赖任务ID,多个ID逗号分隔 + DependencyStatus TaskDependencyStatus `xorm:"smallint notnull default 1"` // 依赖关系 1:强依赖 主任务执行成功, 依赖任务才会被执行 2:弱依赖 Spec string `xorm:"varchar(64) notnull"` // crontab - Protocol TaskProtocol `xorm:"tinyint notnull"` // 协议 1:http 2:系统命令 + Protocol TaskProtocol `xorm:"tinyint notnull index"` // 协议 1:http 2:系统命令 Command string `xorm:"varchar(256) notnull"` // URL地址或shell命令 Timeout int `xorm:"mediumint notnull default 0"` // 任务执行超时时间(单位秒),0不限制 Multi int8 `xorm:"tinyint notnull default 1"` // 是否允许多实例运行 RetryTimes int8 `xorm:"tinyint notnull default 0"` // 重试次数 - HostId int16 `xorm:"smallint notnull default 0"` // RPC host id, + HostId int16 `xorm:"smallint notnull index default 0"` // RPC host id, NotifyStatus int8 `xorm:"smallint notnull default 1"` // 任务执行结束是否通知 0: 不通知 1: 失败通知 2: 执行结束通知 NotifyType int8 `xorm:"smallint notnull default 0"` // 通知类型 1: 邮件 2: slack NotifyReceiverId string `xorm:"varchar(256) notnull default '' "` // 通知接受者ID, setting表主键ID,多个ID逗号分隔 Remark string `xorm:"varchar(100) notnull default ''"` // 备注 - Status Status `xorm:"tinyint notnull default 0"` // 状态 1:正常 0:停止 + Status Status `xorm:"tinyint notnull index default 0"` // 状态 1:正常 0:停止 Created time.Time `xorm:"datetime notnull created"` // 创建时间 Deleted time.Time `xorm:"datetime deleted"` // 删除时间 BaseModel `xorm:"-"` @@ -59,6 +77,7 @@ func (task *Task) Create() (insertId int, err error) { func (task *Task) CreateTestTask() { // HTTP任务 task.Name = "测试HTTP任务" + task.Level = TaskLevelParent task.Protocol = TaskHTTP task.Spec = "*/30 * * * * *" // 查询IP地址区域信息 @@ -68,7 +87,9 @@ func (task *Task) CreateTestTask() { } func (task *Task) UpdateBean(id int) (int64, error) { - return Db.ID(id).Cols("name,spec,protocol,command,timeout,multi,retry_times,host_id,remark,notify_status,notify_type,notify_receiver_id").Update(task) + return Db.ID(id). + Cols("name,spec,protocol,command,timeout,multi,retry_times,host_id,remark,notify_status,notify_type,notify_receiver_id, dependency_task_id, dependency_status"). + Update(task) } // 更新 @@ -95,7 +116,11 @@ func (task *Task) Enable(id int) (int64, error) { func (task *Task) ActiveList() ([]TaskHost, error) { list := make([]TaskHost, 0) fields := "t.*, host.alias,host.name,host.port" - err := Db.Alias("t").Join("LEFT", hostTableName(), "t.host_id=host.id").Where("t.status = ?", Enabled).Cols(fields).Find(&list) + err := Db.Alias("t"). + Join("LEFT", hostTableName(), "t.host_id=host.id"). + Where("t.status = ? AND t.level = ?", Enabled, TaskLevelParent). + Cols(fields). + Find(&list) return list, err } @@ -104,7 +129,11 @@ func (task *Task) ActiveList() ([]TaskHost, error) { func (task *Task) ActiveListByHostId(hostId int16) ([]TaskHost, error) { list := make([]TaskHost, 0) fields := "t.*, host.alias,host.name,host.port" - err := Db.Alias("t").Join("LEFT", hostTableName(), "t.host_id=host.id").Where("t.status = ? AND t.host_id = ?", Enabled, hostId).Cols(fields).Find(&list) + err := Db.Alias("t"). + Join("LEFT", hostTableName(), "t.host_id=host.id"). + Where("t.status = ? AND t.host_id = ? AND t.level = ?", Enabled, hostId, TaskLevelParent). + Cols(fields). + Find(&list) return list, err } @@ -158,6 +187,28 @@ func (task *Task) List(params CommonMap) ([]TaskHost, error) { return list, err } +// 获取依赖任务列表 +func (task *Task) GetDependencyTaskList(ids string) ([]TaskHost, error) { + list := make([]TaskHost, 0) + if ids == "" { + return list, nil + } + idList := strings.Split(ids, ",") + taskIds := make([]interface{}, len(idList)) + for i, v := range idList { + taskIds[i] = v + } + fields := "t.*, host.alias,host.name,host.port" + err := Db.Alias("t"). + Join("LEFT", hostTableName(), "t.host_id=host.id"). + Where("t.level = ?", TaskLevelChild). + In("t.id", taskIds). + Cols(fields). + Find(&list) + + return list, err +} + func (task *Task) Total(params CommonMap) (int64, error) { session := Db.Alias("t").Join("LEFT", hostTableName(), "t.host_id=host.id") task.parseWhere(session, params) diff --git a/routers/task/task.go b/routers/task/task.go index 2f32b75..74688de 100644 --- a/routers/task/task.go +++ b/routers/task/task.go @@ -18,8 +18,11 @@ import ( type TaskForm struct { Id int + Level models.TaskLevel `binding:"Required;In(1,2)"` + DependencyStatus models.TaskDependencyStatus + DependencyTaskId string Name string `binding:"Required;MaxSize(32)"` - Spec string `binding:"Required;MaxSize(64)"` + Spec string Protocol models.TaskProtocol `binding:"In(1,2)"` Command string `binding:"Required;MaxSize(256)"` Timeout int `binding:"Range(0,86400)"` @@ -99,10 +102,6 @@ func Store(ctx *macaron.Context, form TaskForm) string { json := utils.JsonResponse{} taskModel := models.Task{} var id int = form.Id - _, err := cron.Parse(form.Spec) - if err != nil { - return json.CommonFailure("crontab表达式解析失败", err) - } nameExists, err := taskModel.NameExist(form.Name, form.Id) if err != nil { return json.CommonFailure(utils.FailureContent, err) @@ -134,8 +133,11 @@ func Store(ctx *macaron.Context, form TaskForm) string { taskModel.NotifyType = form.NotifyType - 1 taskModel.NotifyReceiverId = form.NotifyReceiverId taskModel.Spec = form.Spec + taskModel.Level = form.Level + taskModel.DependencyStatus = form.DependencyStatus + taskModel.DependencyTaskId = strings.TrimSpace(form.DependencyTaskId) if taskModel.NotifyStatus > 0 && taskModel.NotifyReceiverId == "" { - return json.CommonFailure("请至少选择一个接收者") + return json.CommonFailure("至少选择一个通知接收者") } if taskModel.Protocol == models.TaskHTTP { command := strings.ToLower(taskModel.Command) @@ -151,6 +153,27 @@ func Store(ctx *macaron.Context, form TaskForm) string { return json.CommonFailure("任务重试次数取值0-10") } + if (taskModel.DependencyStatus != models.TaskDependencyStatusStrong && + taskModel.DependencyStatus != models.TaskDependencyStatusWeak) { + return json.CommonFailure("请选择依赖关系") + } + + if taskModel.Level == models.TaskLevelParent { + _, err = cron.Parse(form.Spec) + if err != nil { + return json.CommonFailure("crontab表达式解析失败", err) + } + } else { + taskModel.DependencyTaskId = "" + taskModel.Spec = "" + } + + if id > 0 && taskModel.DependencyTaskId != "" { + dependencyTaskIds := strings.Split(taskModel.DependencyTaskId, ",") + if utils.InStringSlice(dependencyTaskIds, strconv.Itoa(id)) { + return json.CommonFailure("不允许设置当前任务为子任务") + } + } if id == 0 { // 任务添加后开始调度执行 @@ -165,7 +188,7 @@ func Store(ctx *macaron.Context, form TaskForm) string { } status, err := taskModel.GetStatus(id) - if status == models.Enabled { + if status == models.Enabled && taskModel.Level == models.TaskLevelParent { addTaskToTimer(id) } diff --git a/service/task.go b/service/task.go index e11298f..f63142c 100644 --- a/service/task.go +++ b/service/task.go @@ -13,6 +13,7 @@ import ( "sync" rpcClient "github.com/ouqiang/gocron/modules/rpc/client" pb "github.com/ouqiang/gocron/modules/rpc/proto" + "strings" ) // 定时任务调度管理器 @@ -47,13 +48,16 @@ func (c *TaskCount) Num() int { return c.num } -// 任务ID作为Key, 不会出现并发写, 不加锁 +// 任务ID作为Key type Instance struct { Status map[int]bool + sync.RWMutex } // 是否有任务处于运行中 func (i *Instance) has(key int) bool { + i.RLock() + defer i.RUnlock() running, ok := i.Status[key] if ok && running { return true @@ -63,11 +67,15 @@ func (i *Instance) has(key int) bool { } func (i *Instance) add(key int) { + i.Lock() + defer i.Unlock() i.Status[key] = true } func (i *Instance) done(key int) { - i.Status[key] = false + i.Lock() + defer i.Unlock() + delete(i.Status, key) } type Task struct{} @@ -82,7 +90,7 @@ type TaskResult struct { func (task *Task) Initialize() { Cron = cron.New() Cron.Start() - runInstance = Instance{make(map[int]bool)} + runInstance = Instance{make(map[int]bool), sync.RWMutex{}} TaskNum = TaskCount{0, sync.RWMutex{}} taskModel := new(models.Task) @@ -107,6 +115,10 @@ func (task *Task) BatchAdd(tasks []models.TaskHost) { // 添加任务 func (task *Task) Add(taskModel models.TaskHost) { + if taskModel.Level == models.TaskLevelChild { + logger.Errorf("添加任务失败#不允许添加子任务到调度器#任务Id-%d", taskModel.Id); + return + } taskFunc := createJob(taskModel) if taskFunc == nil { logger.Error("创建任务处理Job失败,不支持的任务协议#", taskModel.Protocol) @@ -239,6 +251,7 @@ func createHandler(taskModel models.TaskHost) Handler { return handler; } +// 任务前置操作 func beforeExecJob(taskModel models.TaskHost) (taskLogId int64) { if taskModel.Multi == 0 && runInstance.has(taskModel.Id) { createTaskLog(taskModel, models.Cancel) @@ -258,6 +271,7 @@ func beforeExecJob(taskModel models.TaskHost) (taskLogId int64) { return taskLogId } +// 任务执行后置操作 func afterExecJob(taskModel models.TaskHost, taskResult TaskResult, taskLogId int64) { if taskResult.Err != nil { taskResult.Result = taskResult.Err.Error() + "\n" + taskResult.Result @@ -267,7 +281,47 @@ func afterExecJob(taskModel models.TaskHost, taskResult TaskResult, taskLogId in logger.Error("任务结束#更新任务日志失败-", err) } - SendNotification(taskModel, taskResult) + // 发送邮件 + go SendNotification(taskModel, taskResult) + // 执行依赖任务 + go execDependencyTask(taskModel, taskResult) +} + +// 执行依赖任务, 多个任务并发执行 +func execDependencyTask(taskModel models.TaskHost, taskResult TaskResult) { + // 父任务才能执行子任务 + if taskModel.Level != models.TaskLevelParent { + return + } + + // 是否存在子任务 + dependencyTaskId := strings.TrimSpace(taskModel.DependencyTaskId) + if dependencyTaskId == "" { + return + } + + // 父子任务关系为强依赖, 父任务执行失败, 不执行依赖任务 + if taskModel.DependencyStatus == models.TaskDependencyStatusStrong && taskResult.Err != nil { + logger.Infof("父子任务为强依赖关系, 父任务执行失败, 不运行依赖任务#主任务ID-%d", taskModel.Id) + return + } + + // 获取子任务 + model := new(models.Task) + tasks , err := model.GetDependencyTaskList(dependencyTaskId) + if err != nil { + logger.Errorf("获取依赖任务失败#主任务ID-%d#%s", taskModel.Id, err.Error()) + return + } + if len(tasks) == 0 { + logger.Errorf("依赖任务列表为空#主任务ID-%d", taskModel.Id) + } + + serviceTask := new(Task) + for _, task := range tasks { + task.Spec = fmt.Sprintf("依赖任务(主任务ID-%d)", taskModel.Id) + serviceTask.Run(task) + } } // 发送任务结果通知 diff --git a/templates/task/index.html b/templates/task/index.html index 158763d..8eb1a03 100644 --- a/templates/task/index.html +++ b/templates/task/index.html @@ -54,6 +54,7 @@ 任务ID 任务名称 + 任务类型 cron表达式 执行方式 超时时间 @@ -69,20 +70,27 @@ {{{.Id}}} {{{.Task.Name}}} + {{{if eq .Level 1}}}主任务{{{else}}}子任务{{{end}}} {{{.Spec}}} {{{if eq .Protocol 1}}} HTTP {{{else if eq .Protocol 2}}} SHELL {{{end}}} {{{if eq .Timeout -1}}}后台运行{{{else if gt .Timeout 0}}}{{{.Timeout}}}秒{{{else}}}不限制{{{end}}} {{{.RetryTimes}}} {{{if gt .Multi 0}}}否{{{else}}}是{{{end}}} {{{.Alias}}}-{{{.Name}}} - {{{if eq .Status 1}}}激活{{{else}}}停止{{{end}}} + + {{{if eq .Level 1}}} + {{{if eq .Status 1}}}激活{{{else}}}停止{{{end}}} + {{{end}}} +
编辑 - {{{if eq .Status 1}}} - - {{{else}}} - + {{{if eq .Level 1}}} + {{{if eq .Status 1}}} + + {{{else}}} + + {{{end}}} {{{end}}}
@@ -97,7 +105,6 @@
-