diff --git a/README.md b/README.md index 6137b21..168f871 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ * 任务依赖配置 * 任务类型 * shell任务 - > 在任务节点上执行shell命令 + > 在任务节点上执行shell命令, 支持任务同时在多个节点上运行 * HTTP任务 > 访问指定的URL地址, 由调度器直接执行, 不依赖任务节点 * 查看任务执行日志 @@ -33,7 +33,7 @@ ## 下载 -[v1.0](https://github.com/ouqiang/gocron/releases/tag/v1.0) +[v1.1](https://github.com/ouqiang/gocron/releases/tag/v1.1) ## 安装 @@ -45,9 +45,9 @@ * 调度器启动 * Windows: `gocron.exe web` * Linux、Mac OS: `./gocron web` -* 任务节点启动 - * Windows: `gocron-node.exe ip:port (默认0.0.0.0:5921)` - * Linux、Mac OS: `./gocron-node ip:port (默认0.0.0.0:5921)` +* 任务节点启动, 默认监听0.0.0.0:5921 + * Windows: `gocron-node.exe` + * Linux、Mac OS: `./gocron-node` 4. 浏览器访问 http://localhost:5920 ### 源码安装 @@ -64,12 +64,13 @@ * --host 默认0.0.0.0 * -p 端口, 指定端口, 默认5920 * -e 指定运行环境, dev|test|prod, dev模式下可查看更多日志信息, 默认prod - * -d 后台运行 * -h 查看帮助 -* gocron-node ip:port, 默认0.0.0.0:5921 +* gocron-node + * -allow-root *nix平台允许以root用户运行 + * -s ip:port 监听地址 ## 程序使用的组件 -* web框架 [Macaron](http://go-macaron.com/) +* Web框架 [Macaron](http://go-macaron.com/) * 定时任务调度 [Cron](https://github.com/robfig/cron) * ORM [Xorm](https://github.com/go-xorm/xorm) * UI框架 [Semantic UI](https://semantic-ui.com/) @@ -78,3 +79,14 @@ ## 反馈 提交[issue](https://github.com/ouqiang/gocron/issues/new) + +## ChangeLog + +v1.1 +-------- + +* 任务可同时在多个节点上运行 +* *nix平台默认禁止以root用户运行任务节点 +* 子任务命令中增加预定义占位符, 子任务可根据主任务运行结果执行相应操作 +* 删除守护进程模块 +* Web访问日志输出到终端 diff --git a/cmd/web.go b/cmd/web.go index 7800c1a..def8ad4 100644 --- a/cmd/web.go +++ b/cmd/web.go @@ -40,10 +40,6 @@ var CmdWeb = cli.Command{ Value: "prod", Usage: "runtime environment, dev|test|prod", }, - cli.BoolFlag{ - Name: "d", - Usage: "-d=true, run as daemon process", - }, }, } @@ -155,8 +151,6 @@ func shutdown() { // 停止所有任务调度 logger.Info("停止定时任务调度") serviceTask.StopAll() - // 释放gRPC连接池 - grpcpool.Pool.ReleaseAll() taskNumInRunning := service.TaskNum.Num() logger.Infof("正在运行的任务有%d个", taskNumInRunning) @@ -170,4 +164,7 @@ func shutdown() { time.Sleep(3 * time.Second) taskNumInRunning = service.TaskNum.Num() } + + // 释放gRPC连接池 + grpcpool.Pool.ReleaseAll() } \ No newline at end of file diff --git a/gocron.go b/gocron.go index b988da9..cc58076 100644 --- a/gocron.go +++ b/gocron.go @@ -10,7 +10,7 @@ import ( "github.com/ouqiang/gocron/cmd" ) -const AppVersion = "1.0" +const AppVersion = "1.1" func main() { app := cli.NewApp() diff --git a/models/host.go b/models/host.go index d9ea5a6..cd13247 100644 --- a/models/host.go +++ b/models/host.go @@ -12,6 +12,7 @@ type Host struct { Port int `xorm:"notnull default 22"` // 主机端口 Remark string `xorm:"varchar(100) notnull default '' "` // 备注 BaseModel `xorm:"-"` + Selected bool `xorm:"-"` } // 新增 diff --git a/models/migration.go b/models/migration.go index 9ab26ab..06c728e 100644 --- a/models/migration.go +++ b/models/migration.go @@ -15,7 +15,7 @@ func (migration *Migration) Exec(dbName string) error { setting := new(Setting) task := new(Task) tables := []interface{}{ - &User{}, task, &TaskLog{}, &Host{}, setting,&LoginLog{}, + &User{}, task, &TaskLog{}, &Host{}, setting,&LoginLog{},&TaskHost{}, } for _, table := range tables { exist, err:= Db.IsTableExist(table) diff --git a/models/model.go b/models/model.go index d0ada72..f5c5c20 100644 --- a/models/model.go +++ b/models/model.go @@ -54,8 +54,8 @@ func (model *BaseModel) parsePageAndPageSize(params CommonMap) { if model.Page <= 0 { model.Page = Page } - if model.PageSize <= 0 || model.PageSize > MaxPageSize { - model.PageSize = PageSize + if model.PageSize <= 0 { + model.PageSize = MaxPageSize } } diff --git a/models/task.go b/models/task.go index fb09f7b..1572743 100644 --- a/models/task.go +++ b/models/task.go @@ -41,7 +41,6 @@ type Task struct { Timeout int `xorm:"mediumint notnull default 0"` // 任务执行超时时间(单位秒),0不限制 Multi int8 `xorm:"tinyint notnull default 1"` // 是否允许多实例运行 RetryTimes int8 `xorm:"tinyint notnull default 0"` // 重试次数 - HostId int16 `xorm:"smallint notnull index default 0"` // RPC host id, NotifyStatus int8 `xorm:"smallint notnull default 1"` // 任务执行结束是否通知 0: 不通知 1: 失败通知 2: 执行结束通知 NotifyType int8 `xorm:"smallint notnull default 0"` // 通知类型 1: 邮件 2: slack NotifyReceiverId string `xorm:"varchar(256) notnull default '' "` // 通知接受者ID, setting表主键ID,多个ID逗号分隔 @@ -50,17 +49,11 @@ type Task struct { Created time.Time `xorm:"datetime notnull created"` // 创建时间 Deleted time.Time `xorm:"datetime deleted"` // 删除时间 BaseModel `xorm:"-"` + Hosts []TaskHostDetail `xorm:"-"` } -type TaskHost struct { - Task `xorm:"extends"` - Name string - Port int - Alias string -} - -func (TaskHost) TableName() string { - return TablePrefix + "task" +func taskHostTableName() []string { + return []string{TablePrefix + "task_host", "th"} } // 新增 @@ -88,7 +81,7 @@ func (task *Task) CreateTestTask() { func (task *Task) UpdateBean(id int) (int64, error) { return Db.ID(id). - Cols("name,spec,protocol,command,timeout,multi,retry_times,host_id,remark,notify_status,notify_type,notify_receiver_id, dependency_task_id, dependency_status"). + Cols("name,spec,protocol,command,timeout,multi,retry_times,remark,notify_status,notify_type,notify_receiver_id, dependency_task_id, dependency_status"). Update(task) } @@ -113,36 +106,48 @@ func (task *Task) Enable(id int) (int64, error) { } // 获取所有激活任务 -func (task *Task) ActiveList() ([]TaskHost, error) { - list := make([]TaskHost, 0) - fields := "t.*, host.alias,host.name,host.port" - err := Db.Alias("t"). - Join("LEFT", hostTableName(), "t.host_id=host.id"). - Where("t.status = ? AND t.level = ?", Enabled, TaskLevelParent). - Cols(fields). +func (task *Task) ActiveList() ([]Task, error) { + list := make([]Task, 0) + err := Db.Where("status = ? AND level = ?", Enabled, TaskLevelParent). Find(&list) - return list, err + if err != nil { + return list, err + } + + return task.setHostsForTasks(list) } // 获取某个主机下的所有激活任务 -func (task *Task) ActiveListByHostId(hostId int16) ([]TaskHost, error) { - list := make([]TaskHost, 0) - fields := "t.*, host.alias,host.name,host.port" - err := Db.Alias("t"). - Join("LEFT", hostTableName(), "t.host_id=host.id"). - Where("t.status = ? AND t.host_id = ? AND t.level = ?", Enabled, hostId, TaskLevelParent). - Cols(fields). +func (task *Task) ActiveListByHostId(hostId int16) ([]Task, error) { + taskHostModel := new(TaskHost) + taskIds, err := taskHostModel.GetTaskIdsByHostId(hostId) + if err != nil { + return nil, err + } + list := make([]Task, 0) + err = Db.Where("status = ? AND level = ?", Enabled, TaskLevelParent). + In("id", taskIds...). Find(&list) + if err != nil { + return list, err + } - return list, err + return task.setHostsForTasks(list) } -// 判断主机id是否有引用 -func (task *Task) HostIdExist(hostId int16) (bool, error) { - count, err := Db.Where("host_id = ?", hostId).Count(task); +func (task *Task) setHostsForTasks(tasks []Task) ([]Task, error) { + taskHostModel := new(TaskHost) + var err error + for i, value := range tasks { + taskHostDetails, err := taskHostModel.GetHostIdsByTaskId(value.Id) + if err != nil { + return nil, err + } + tasks[i].Hosts = taskHostDetails + } - return count > 0, err + return tasks, err } // 判断任务名称是否存在 @@ -168,28 +173,37 @@ func (task *Task) GetStatus(id int) (Status, error) { return task.Status, nil } -func(task *Task) Detail(id int) (TaskHost, error) { - taskHost := TaskHost{} - fields := "t.*, host.alias,host.name,host.port" - _, err := Db.Alias("t").Join("LEFT", hostTableName(), "t.host_id=host.id").Where("t.id=?", id).Cols(fields).Get(&taskHost) +func(task *Task) Detail(id int) (Task, error) { + t := Task{} + _, err := Db.Where("id=?", id).Get(&t) - return taskHost, err + if err != nil { + return t, err + } + + taskHostModel := new(TaskHost) + t.Hosts, err = taskHostModel.GetHostIdsByTaskId(id) + + return t, err } -func (task *Task) List(params CommonMap) ([]TaskHost, error) { +func (task *Task) List(params CommonMap) ([]Task, error) { task.parsePageAndPageSize(params) - list := make([]TaskHost, 0) - fields := "t.*, host.alias,host.name" - session := Db.Alias("t").Join("LEFT", hostTableName(), "t.host_id=host.id") + list := make([]Task, 0) + session := Db.Alias("t").Join("LEFT", taskHostTableName(), "t.id = th.task_id") task.parseWhere(session, params) - err := session.Cols(fields).Desc("t.id").Limit(task.PageSize, task.pageLimitOffset()).Find(&list) + err := session.GroupBy("t.id").Desc("t.id").Cols("t.*").Limit(task.PageSize, task.pageLimitOffset()).Find(&list) - return list, err + if err != nil { + return nil, err + } + + return task.setHostsForTasks(list) } // 获取依赖任务列表 -func (task *Task) GetDependencyTaskList(ids string) ([]TaskHost, error) { - list := make([]TaskHost, 0) +func (task *Task) GetDependencyTaskList(ids string) ([]Task, error) { + list := make([]Task, 0) if ids == "" { return list, nil } @@ -198,21 +212,24 @@ func (task *Task) GetDependencyTaskList(ids string) ([]TaskHost, error) { for i, v := range idList { taskIds[i] = v } - fields := "t.*, host.alias,host.name,host.port" + fields := "t.*" err := Db.Alias("t"). - Join("LEFT", hostTableName(), "t.host_id=host.id"). Where("t.level = ?", TaskLevelChild). In("t.id", taskIds). Cols(fields). Find(&list) - return list, err + if err != nil { + return list, err + } + + return task.setHostsForTasks(list) } func (task *Task) Total(params CommonMap) (int64, error) { - session := Db.Alias("t").Join("LEFT", hostTableName(), "t.host_id=host.id") + session := Db.Alias("t").Join("LEFT", taskHostTableName(), "t.id = th.task_id") task.parseWhere(session, params) - return session.Count(task) + return session.GroupBy("t.id").Count(task) } // 解析where @@ -226,7 +243,7 @@ func (task *Task) parseWhere(session *xorm.Session, params CommonMap) { } hostId, ok := params["HostId"] if ok && hostId.(int) > 0 { - session.And("host_id = ?", hostId) + session.And("th.host_id = ?", hostId) } name, ok := params["Name"] if ok && name.(string) != "" { @@ -242,6 +259,3 @@ func (task *Task) parseWhere(session *xorm.Session, params CommonMap) { } } -func hostTableName() []string { - return []string{TablePrefix + "host", "host"} -} \ No newline at end of file diff --git a/models/task_host.go b/models/task_host.go new file mode 100644 index 0000000..5d1015d --- /dev/null +++ b/models/task_host.go @@ -0,0 +1,81 @@ +package models + + +type TaskHost struct { + Id int `xorm:"int pk autoincr"` + TaskId int `xorm:"int not null index"` + HostId int16 `xorm:"smallint not null index"` +} + +type TaskHostDetail struct { + TaskHost `xorm:"extends"` + Name string + Port int + Alias string +} + +func (TaskHostDetail) TableName() string { + return TablePrefix + "task_host" +} + +func hostTableName() []string { + return []string{TablePrefix + "host", "h"} +} + +func (th *TaskHost) Remove(taskId int) error { + _, err := Db.Where("task_id = ?", taskId).Delete(new(TaskHost)) + + return err +} + +func (th *TaskHost) Add(taskId int, hostIds []int) error { + + err := th.Remove(taskId) + if err != nil { + return err + } + + taskHosts := make([]TaskHost, len(hostIds)) + for i, value := range hostIds { + taskHosts[i].TaskId = taskId + taskHosts[i].HostId = int16(value) + } + + _, err = Db.Insert(&taskHosts) + + return err +} + +func (th *TaskHost) GetHostIdsByTaskId(taskId int) ([]TaskHostDetail, error) { + list := make([]TaskHostDetail, 0) + fields := "th.id,th.host_id,h.alias,h.name,h.port" + err := Db.Alias("th"). + Join("LEFT", hostTableName(), "th.host_id=h.id"). + Where("th.task_id = ?", taskId). + Cols(fields). + Find(&list) + + return list, err +} + +func (th *TaskHost) GetTaskIdsByHostId(hostId int16) ([]interface{}, error) { + list := make([]TaskHost, 0) + err := Db.Where("host_id = ?", hostId).Cols("task_id").Find(&list) + if err != nil { + return nil, err + } + + taskIds := make([]interface{}, len(list)) + for i, value := range list { + taskIds[i] = value.TaskId + } + + return taskIds, err +} + +// 判断主机id是否有引用 +func (th *TaskHost) HostIdExist(hostId int16) (bool, error) { + count, err := Db.Where("host_id = ?", hostId).Count(th); + + return count > 0, err +} \ No newline at end of file diff --git a/routers/host/host.go b/routers/host/host.go index e843d64..9625df4 100644 --- a/routers/host/host.go +++ b/routers/host/host.go @@ -117,7 +117,7 @@ func Store(ctx *macaron.Context, form HostForm) string { grpcpool.Pool.Release(oldAddr) } - taskModel := new(models.TaskHost) + taskModel := new(models.Task) tasks, err := taskModel.ActiveListByHostId(id) if err != nil { return json.CommonFailure("刷新任务主机信息失败", err) @@ -135,8 +135,8 @@ func Remove(ctx *macaron.Context) string { if err != nil { return json.CommonFailure("参数错误", err) } - taskModel := new(models.Task) - exist,err := taskModel.HostIdExist(int16(id)) + taskHostModel := new(models.TaskHost) + exist,err := taskHostModel.HostIdExist(int16(id)) if err != nil { return json.CommonFailure("操作失败", err) } diff --git a/routers/routers.go b/routers/routers.go index 17f8fb5..9bd00c1 100644 --- a/routers/routers.go +++ b/routers/routers.go @@ -19,6 +19,7 @@ import ( "github.com/ouqiang/gocron/routers/loginlog" "time" "strconv" + "html/template" ) // 静态文件目录 @@ -138,6 +139,14 @@ func RegisterMiddleware(m *macaron.Macaron) { IndentJSON: true, // 渲染具有缩进格式的 XML,默认为不缩进 IndentXML: true, + Funcs: []template.FuncMap{map[string]interface{} { + "HostFormat": func(index int) bool { + return (index + 1) % 3 == 0 + }, + "unescape": func(str string) template.HTML { + return template.HTML(str) + }, + }}, })) m.Use(session.Sessioner(session.Options{ Provider: "file", diff --git a/routers/task/task.go b/routers/task/task.go index 74688de..9958f16 100644 --- a/routers/task/task.go +++ b/routers/task/task.go @@ -28,7 +28,7 @@ type TaskForm struct { Timeout int `binding:"Range(0,86400)"` Multi int8 `binding:"In(1,2)"` RetryTimes int8 - HostId int16 + HostId string Remark string NotifyStatus int8 `binding:"In(1,2,3)"` NotifyType int8 `binding:"In(1,2,3)"` @@ -91,9 +91,22 @@ func Edit(ctx *macaron.Context) { logger.Errorf("编辑任务#获取任务详情失败#任务ID-%d#%s", id, err.Error()) ctx.Redirect("/task") } + hostModel := new(models.Host) + hostModel.PageSize = -1 + hosts, err := hostModel.List(models.CommonMap{}) + if err != nil { + logger.Error(err) + } else { + for i, host := range(hosts) { + if inHosts(task.Hosts, host.Id) { + hosts[i].Selected = true + } + } + } + ctx.Data["Task"] = task + ctx.Data["Hosts"] = hosts ctx.Data["Title"] = "编辑" - setHostsToTemplate(ctx) ctx.HTML(200, "task/task_form") } @@ -110,15 +123,10 @@ func Store(ctx *macaron.Context, form TaskForm) string { return json.CommonFailure("任务名称已存在") } - if form.Protocol == models.TaskRPC && form.HostId <= 0 { + if form.Protocol == models.TaskRPC && form.HostId == "" { return json.CommonFailure("请选择主机名") } - if form.Protocol == models.TaskRPC { - taskModel.HostId = form.HostId - } else { - taskModel.HostId = 0 - } taskModel.Name = form.Name taskModel.Protocol = form.Protocol taskModel.Command = form.Command @@ -187,6 +195,18 @@ func Store(ctx *macaron.Context, form TaskForm) string { return json.CommonFailure("保存失败", err) } + taskHostModel := new(models.TaskHost) + if form.Protocol == models.TaskRPC { + hostIdStrList := strings.Split(form.HostId, ",") + hostIds := make([]int, len(hostIdStrList)) + for i, hostIdStr := range hostIdStrList { + hostIds[i], _ = strconv.Atoi(hostIdStr) + } + taskHostModel.Add(id, hostIds) + } else { + taskHostModel.Remove(id) + } + status, err := taskModel.GetStatus(id) if status == models.Enabled && taskModel.Level == models.TaskLevelParent { addTaskToTimer(id) @@ -205,6 +225,9 @@ func Remove(ctx *macaron.Context) string { return json.CommonFailure(utils.FailureContent, err) } + taskHostModel := new(models.TaskHost) + taskHostModel.Remove(id) + service.Cron.RemoveJob(strconv.Itoa(id)) return json.Success(utils.SuccessContent, nil) @@ -290,9 +313,20 @@ func parseQueryParams(ctx *macaron.Context) (models.CommonMap) { func setHostsToTemplate(ctx *macaron.Context) { hostModel := new(models.Host) + hostModel.PageSize = -1 hosts, err := hostModel.List(models.CommonMap{}) if err != nil { logger.Error(err) } ctx.Data["Hosts"] = hosts +} + +func inHosts(slice []models.TaskHostDetail, element int16) bool { + for _, v := range slice { + if v.HostId == element { + return true + } + } + + return false } \ No newline at end of file diff --git a/service/task.go b/service/task.go index 846acfe..c0fc8a2 100644 --- a/service/task.go +++ b/service/task.go @@ -109,14 +109,14 @@ func (task *Task) Initialize() { } // 批量添加任务 -func (task *Task) BatchAdd(tasks []models.TaskHost) { +func (task *Task) BatchAdd(tasks []models.Task) { for _, item := range tasks { task.Add(item) } } // 添加任务 -func (task *Task) Add(taskModel models.TaskHost) { +func (task *Task) Add(taskModel models.Task) { if taskModel.Level == models.TaskLevelChild { logger.Errorf("添加任务失败#不允许添加子任务到调度器#任务Id-%d", taskModel.Id); return @@ -142,12 +142,12 @@ func (task *Task) StopAll() { } // 直接运行任务 -func (task *Task) Run(taskModel models.TaskHost) { +func (task *Task) Run(taskModel models.Task) { go createJob(taskModel)() } type Handler interface { - Run(taskModel models.TaskHost) (string, error) + Run(taskModel models.Task) (string, error) } @@ -157,7 +157,7 @@ type HTTPHandler struct{} // http任务执行时间不超过300秒 const HttpExecTimeout = 300 -func (h *HTTPHandler) Run(taskModel models.TaskHost) (result string, err error) { +func (h *HTTPHandler) Run(taskModel models.Task) (result string, err error) { if taskModel.Timeout <= 0 || taskModel.Timeout > HttpExecTimeout { taskModel.Timeout = HttpExecTimeout } @@ -173,26 +173,54 @@ func (h *HTTPHandler) Run(taskModel models.TaskHost) (result string, err error) // RPC调用执行任务 type RPCHandler struct {} -func (h *RPCHandler) Run(taskModel models.TaskHost) (result string, err error) { +func (h *RPCHandler) Run(taskModel models.Task) (result string, err error) { taskRequest := new(pb.TaskRequest) taskRequest.Timeout = int32(taskModel.Timeout) taskRequest.Command = taskModel.Command + var resultChan chan TaskResult = make(chan TaskResult, len(taskModel.Hosts)) + for _, taskHost := range taskModel.Hosts { + go func(th models.TaskHostDetail) { + output, err := rpcClient.ExecWithRetry(th.Name, th.Port, taskRequest) + var errorMessage string = "" + if err != nil { + errorMessage = err.Error() + } + outputMessage := fmt.Sprintf("主机: [%s-%s]\n%s\n%s\n\n", + th.Alias, th.Name, errorMessage, output, + ) + resultChan <- TaskResult{Err:err, Result: outputMessage} + }(taskHost) + } - return rpcClient.ExecWithRetry(taskModel.Name, taskModel.Port, taskRequest) + var aggregationErr error = nil + var aggregationResult string = "" + for i := 0; i < len(taskModel.Hosts); i++ { + taskResult := <- resultChan + aggregationResult += taskResult.Result + if taskResult.Err != nil { + aggregationErr = taskResult.Err + } + } + + return aggregationResult, aggregationErr } // 创建任务日志 -func createTaskLog(taskModel models.TaskHost, status models.Status) (int64, error) { +func createTaskLog(taskModel models.Task, status models.Status) (int64, error) { taskLogModel := new(models.TaskLog) taskLogModel.TaskId = taskModel.Id - taskLogModel.Name = taskModel.Task.Name + taskLogModel.Name = taskModel.Name taskLogModel.Spec = taskModel.Spec taskLogModel.Protocol = taskModel.Protocol taskLogModel.Command = taskModel.Command taskLogModel.Timeout = taskModel.Timeout if taskModel.Protocol == models.TaskRPC { - taskLogModel.Hostname = taskModel.Alias + "-" + taskModel.Name + var aggregationHost string = "" + for _, host := range taskModel.Hosts { + aggregationHost += fmt.Sprintf("%s-%s
", host.Alias, host.Name) + } + taskLogModel.Hostname = aggregationHost } taskLogModel.StartTime = time.Now() taskLogModel.Status = status @@ -219,7 +247,7 @@ func updateTaskLog(taskLogId int64, taskResult TaskResult) (int64, error) { } -func createJob(taskModel models.TaskHost) cron.FuncJob { +func createJob(taskModel models.Task) cron.FuncJob { var handler Handler = createHandler(taskModel) if handler == nil { return nil @@ -231,16 +259,16 @@ func createJob(taskModel models.TaskHost) cron.FuncJob { if taskLogId <= 0 { return } - logger.Infof("开始执行任务#%s#命令-%s", taskModel.Task.Name, taskModel.Command) + logger.Infof("开始执行任务#%s#命令-%s", taskModel.Name, taskModel.Command) taskResult := execJob(handler, taskModel) - logger.Infof("任务完成#%s#命令-%s", taskModel.Task.Name, taskModel.Command) + logger.Infof("任务完成#%s#命令-%s", taskModel.Name, taskModel.Command) afterExecJob(taskModel, taskResult, taskLogId) } return taskFunc } -func createHandler(taskModel models.TaskHost) Handler { +func createHandler(taskModel models.Task) Handler { var handler Handler = nil switch taskModel.Protocol { case models.TaskHTTP: @@ -254,7 +282,7 @@ func createHandler(taskModel models.TaskHost) Handler { } // 任务前置操作 -func beforeExecJob(taskModel models.TaskHost) (taskLogId int64) { +func beforeExecJob(taskModel models.Task) (taskLogId int64) { if taskModel.Multi == 0 && runInstance.has(taskModel.Id) { createTaskLog(taskModel, models.Cancel) return @@ -274,7 +302,7 @@ func beforeExecJob(taskModel models.TaskHost) (taskLogId int64) { } // 任务执行后置操作 -func afterExecJob(taskModel models.TaskHost, taskResult TaskResult, taskLogId int64) { +func afterExecJob(taskModel models.Task, taskResult TaskResult, taskLogId int64) { if taskResult.Err != nil { taskResult.Result = taskResult.Err.Error() + "\n" + taskResult.Result } @@ -290,7 +318,7 @@ func afterExecJob(taskModel models.TaskHost, taskResult TaskResult, taskLogId in } // 执行依赖任务, 多个任务并发执行 -func execDependencyTask(taskModel models.TaskHost, taskResult TaskResult) { +func execDependencyTask(taskModel models.Task, taskResult TaskResult) { // 父任务才能执行子任务 if taskModel.Level != models.TaskLevelParent { return @@ -354,7 +382,7 @@ func appendResultToCommand(command string, taskResult TaskResult) string { } // 发送任务结果通知 -func SendNotification(taskModel models.TaskHost, taskResult TaskResult) { +func SendNotification(taskModel models.Task, taskResult TaskResult) { var statusName string // 未开启通知 if taskModel.NotifyStatus == 0 { @@ -376,7 +404,7 @@ func SendNotification(taskModel models.TaskHost, taskResult TaskResult) { msg := notify.Message{ "task_type": taskModel.NotifyType, "task_receiver_id": taskModel.NotifyReceiverId, - "name": taskModel.Task.Name, + "name": taskModel.Name, "output": taskResult.Result, "status": statusName, "taskId": taskModel.Id, @@ -385,7 +413,7 @@ func SendNotification(taskModel models.TaskHost, taskResult TaskResult) { } // 执行具体任务 -func execJob(handler Handler, taskModel models.TaskHost) TaskResult { +func execJob(handler Handler, taskModel models.Task) TaskResult { defer func() { if err := recover(); err != nil { logger.Error("panic#service/task.go:execJob#", err) diff --git a/templates/host/host_form.html b/templates/host/host_form.html index 33d7637..d96dbf4 100644 --- a/templates/host/host_form.html +++ b/templates/host/host_form.html @@ -29,7 +29,7 @@
- +
diff --git a/templates/task/index.html b/templates/task/index.html index b15324b..bbacaea 100644 --- a/templates/task/index.html +++ b/templates/task/index.html @@ -69,14 +69,18 @@ {{{range $i, $v := .Tasks}}} {{{.Id}}} - {{{.Task.Name}}} + {{{.Name}}} {{{if eq .Level 1}}}主任务{{{else}}}子任务{{{end}}} {{{.Spec}}} {{{if eq .Protocol 1}}} HTTP {{{else if eq .Protocol 2}}} SHELL {{{end}}} {{{if eq .Timeout -1}}}后台运行{{{else if gt .Timeout 0}}}{{{.Timeout}}}秒{{{else}}}不限制{{{end}}} {{{.RetryTimes}}} {{{if gt .Multi 0}}}否{{{else}}}是{{{end}}} - {{{.Alias}}}-{{{.Name}}} + + {{{range $k, $h := .Hosts}}} + {{{$h.Alias}}}
+ {{{end}}} + {{{if eq .Level 1}}} {{{if eq .Status 1}}}激活{{{else}}}停止{{{end}}} diff --git a/templates/task/log.html b/templates/task/log.html index 7bf97e6..7c57c59 100644 --- a/templates/task/log.html +++ b/templates/task/log.html @@ -54,7 +54,7 @@ 任务ID 任务名称 cron表达式 - 协议 + 执行方式 重试次数 任务节点 执行时长 @@ -70,7 +70,7 @@ {{{.Spec}}} {{{if eq .Protocol 1}}} HTTP {{{else if eq .Protocol 2}}} SHELL {{{end}}} {{{.RetryTimes}}} - {{{.Hostname}}} + {{{unescape .Hostname}}} {{{if and (ne .Status 3) (ne .Status 4)}}} {{{if gt .TotalTime 0}}}{{{.TotalTime}}}秒{{{else}}}1秒{{{end}}}
diff --git a/templates/task/task_form.html b/templates/task/task_form.html index 1ea0525..0178e38 100644 --- a/templates/task/task_form.html +++ b/templates/task/task_form.html @@ -19,7 +19,7 @@
任务名称
- +
@@ -83,20 +83,23 @@
-
+
- - {{{.Alias}}}-{{{.Name}}} + {{{if (HostFormat $i) }}}
{{{end}}} + {{{end}}} -   添加节点 +
 
添加节点
@@ -109,11 +112,11 @@
- +
- +
@@ -313,6 +316,15 @@ return receivers.join(","); } + function parseHostId() { + var hostIds = []; + $('#hostId input:checked').each(function () { + hostIds.push($(this).val()); + }); + + return hostIds.join(","); + } + function changeLevel() { var selected = $('#level').val(); if (selected == 1) { @@ -336,6 +348,12 @@ return false; } fields.notify_receiver_id = parseNotifyReceiver(); + fields.host_id = parseHostId(); + if (fields.protocol == 2 && fields.host_id == "") { + swal('错误提示', '请选择任务节点'); + return false; + } + util.post('/task/store', fields, function(code, message) { location.href = "/task" });