From 1e62bf7f3ba53eddc6897e812d8635830793f129 Mon Sep 17 00:00:00 2001 From: ouqiang Date: Fri, 21 Apr 2017 14:50:40 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E6=89=A7=E8=A1=8C=E5=A4=B1?= =?UTF-8?q?=E8=B4=A5=E5=90=8E=EF=BC=8C=E6=A0=B9=E6=8D=AE=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=E9=87=8D=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- models/task.go | 1 + models/task_log.go | 1 + modules/ssh/ssh.go | 2 +- routers/task/task.go | 26 ++++++------ service/task.go | 80 +++++++++++++++++++++++++++-------- templates/task/index.html | 2 + templates/task/log.html | 2 + templates/task/task_form.html | 10 +++-- 8 files changed, 91 insertions(+), 33 deletions(-) diff --git a/models/task.go b/models/task.go index 846cc56..f885829 100644 --- a/models/task.go +++ b/models/task.go @@ -21,6 +21,7 @@ type Task struct { Protocol TaskProtocol `xorm:"tinyint notnull"` // 协议 1:http 2:ssh-command 3: 本地命令 Command string `xorm:"varchar(512) notnull"` // URL地址或shell命令 Timeout int `xorm:"mediumint notnull default 0"` // 任务执行超时时间(单位秒),0不限制 + RetryTimes int8 `xorm:"tinyint notnull default 0"` // 重试次数 HostId int16 `xorm:"smallint notnull default 0"` // SSH host id, Remark string `xorm:"varchar(512) notnull default ''"` // 备注 Created time.Time `xorm:"datetime notnull created"` // 创建时间 diff --git a/models/task_log.go b/models/task_log.go index f840e78..2d4ea54 100644 --- a/models/task_log.go +++ b/models/task_log.go @@ -16,6 +16,7 @@ type TaskLog struct { Protocol TaskProtocol `xorm:"tinyint notnull"` // 协议 1:http 2:ssh-command Command string `xorm:"varchar(512) notnull"` // URL地址或shell命令 Timeout int `xorm:"mediumint notnull default 0"` // 任务执行超时时间(单位秒),0不限制 + RetryTimes int8 `xorm:"tinyint notnull default 0"` // 任务重试次数 Hostname string `xorm:"varchar(512) notnull defalut '' "` // SSH主机名,逗号分隔 StartTime time.Time `xorm:"datetime created"` // 开始执行时间 EndTime time.Time `xorm:"datetime updated"` // 执行完成(失败)时间 diff --git a/modules/ssh/ssh.go b/modules/ssh/ssh.go index 0b0c4e0..45eb5a8 100644 --- a/modules/ssh/ssh.go +++ b/modules/ssh/ssh.go @@ -8,7 +8,6 @@ import ( "errors" ) - type HostAuthType int8 // 认证方式 const ( @@ -101,6 +100,7 @@ func Exec(sshConfig SSHConfig, cmd string) (output string, err error) { output, err := session.CombinedOutput(cmd) resultChan <- Result{string(output), err} }() + // todo 等待超时后,如何停止远程正在执行的任务, 使用timeout命令,但不具有通用性 go triggerTimeout(timeoutChan, sshConfig.ExecTimeout) select { case result := <- resultChan: diff --git a/routers/task/task.go b/routers/task/task.go index 2223040..045ded5 100644 --- a/routers/task/task.go +++ b/routers/task/task.go @@ -10,6 +10,19 @@ import ( "github.com/jakecoffman/cron" ) +type TaskForm struct { + Id int + Name string `binding:"Required;"` + Spec string `binding:"Required;MaxSize(64)"` + Protocol models.TaskProtocol `binding:"In(1,2,3)"` + Command string `binding:"Required;MaxSize(512)"` + Timeout int `binding:"Range(0,86400)"` + RetryTimes int8 + HostId int16 + Remark string + Status models.Status `binding:"In(1,2)"` +} + func Index(ctx *macaron.Context) { taskModel := new(models.Task) tasks, err := taskModel.List() @@ -59,18 +72,6 @@ func Edit(ctx *macaron.Context) { ctx.HTML(200, "task/task_form") } -type TaskForm struct { - Id int - Name string `binding:"Required;"` - Spec string `binding:"Required;MaxSize(64)"` - Protocol models.TaskProtocol `binding:"In(1,2,3)"` - Command string `binding:"Required;MaxSize(512)"` - Timeout int `binding:"Range(0,86400)"` - HostId int16 - Remark string - Status models.Status `binding:"In(1,2)"` -} - // 保存任务 func Store(ctx *macaron.Context, form TaskForm) string { json := utils.JsonResponse{} @@ -99,6 +100,7 @@ func Store(ctx *macaron.Context, form TaskForm) string { taskModel.HostId = form.HostId taskModel.Remark = form.Remark taskModel.Status = form.Status + taskModel.RetryTimes = form.RetryTimes if taskModel.Status != models.Enabled { taskModel.Status = models.Disabled } diff --git a/service/task.go b/service/task.go index 3a78fc7..0410549 100644 --- a/service/task.go +++ b/service/task.go @@ -11,12 +11,19 @@ import ( "github.com/jakecoffman/cron" "github.com/ouqiang/gocron/modules/utils" "errors" + "fmt" ) var Cron *cron.Cron type Task struct{} +type TaskResult struct { + Result string + Err error + RetryTimes int8 +} + // 初始化任务, 从数据库取出所有任务, 添加到定时任务并运行 func (task *Task) Initialize() { Cron = cron.New() @@ -43,7 +50,7 @@ func (task *Task) BatchAdd(tasks []models.TaskHost) { // 添加任务 func (task *Task) Add(taskModel models.TaskHost) { - taskFunc := createHandlerJob(taskModel) + taskFunc := createJob(taskModel) if taskFunc == nil { logger.Error("创建任务处理Job失败,不支持的任务协议#", taskModel.Protocol) return @@ -59,7 +66,7 @@ func (task *Task) Add(taskModel models.TaskHost) { // 直接运行任务 func (task *Task) Run(taskModel models.TaskHost) { - go createHandlerJob(taskModel)() + go createJob(taskModel)() } type Handler interface { @@ -69,6 +76,7 @@ type Handler interface { // 本地命令 type LocalCommandHandler struct {} +// 运行本地命令 func (h *LocalCommandHandler) Run(taskModel models.TaskHost) (string, error) { if taskModel.Command == "" { return "", errors.New("invalid command") @@ -81,6 +89,7 @@ func (h *LocalCommandHandler) Run(taskModel models.TaskHost) (string, error) { return h.runOnUnix(taskModel) } +// 执行Windows命令 func (h *LocalCommandHandler) runOnWindows(taskModel models.TaskHost) (string, error) { outputGBK, err := utils.ExecShellWithTimeout(taskModel.Timeout, "cmd", "/C", taskModel.Command) // windows平台编码为gbk,需转换为utf8才能入库 @@ -92,6 +101,7 @@ func (h *LocalCommandHandler) runOnWindows(taskModel models.TaskHost) (string, e return "命令输出转换编码失败(gbk to utf8)", err } +// 执行Unix命令 func (h *LocalCommandHandler) runOnUnix(taskModel models.TaskHost) (string, error) { return utils.ExecShellWithTimeout(taskModel.Timeout, "/bin/bash", "-c", taskModel.Command) } @@ -125,6 +135,10 @@ func (h *HTTPHandler) Run(taskModel models.TaskHost) (result string, err error) body, err := ioutil.ReadAll(resp.Body) if err != nil { logger.Error("任务处理#读取HTTP请求返回值失败-", err.Error()) + return + } + if resp.StatusCode != 200 { + return string(body), errors.New(fmt.Sprintf("HTTP状态码非200-%d", resp.StatusCode)) } return string(body), err @@ -146,6 +160,7 @@ func (h *SSHCommandHandler) Run(taskModel models.TaskHost) (string, error) { return ssh.Exec(sshConfig, taskModel.Command) } +// 创建任务日志 func createTaskLog(taskModel models.TaskHost) (int64, error) { taskLogModel := new(models.TaskLog) taskLogModel.TaskId = taskModel.Id @@ -164,11 +179,13 @@ func createTaskLog(taskModel models.TaskHost) (int64, error) { return insertId, err } -func updateTaskLog(taskLogId int64, result string, err error) (int64, error) { +// 更新任务日志 +func updateTaskLog(taskLogId int64, taskResult TaskResult) (int64, error) { taskLogModel := new(models.TaskLog) var status models.Status - if err != nil { - result = err.Error() + " " + result + var result string + if taskResult.Err != nil { + result = taskResult.Err.Error() + " " + result status = models.Failure } else { status = models.Finish @@ -180,16 +197,8 @@ func updateTaskLog(taskLogId int64, result string, err error) (int64, error) { } -func createHandlerJob(taskModel models.TaskHost) cron.FuncJob { - var handler Handler = nil - switch taskModel.Protocol { - case models.TaskHTTP: - handler = new(HTTPHandler) - case models.TaskSSH: - handler = new(SSHCommandHandler) - case models.TaskLocalCommand: - handler = new(LocalCommandHandler) - } +func createJob(taskModel models.TaskHost) cron.FuncJob { + var handler Handler = createHandler(taskModel) if handler == nil { return nil } @@ -199,8 +208,8 @@ func createHandlerJob(taskModel models.TaskHost) cron.FuncJob { logger.Error("任务开始执行#写入任务日志失败-", err) return } - result, err := handler.Run(taskModel) - _, err = updateTaskLog(taskLogId, result, err) + taskResult := execJob(handler, taskModel) + _, err = updateTaskLog(taskLogId, taskResult) if err != nil { logger.Error("任务结束#更新任务日志失败-", err) } @@ -208,3 +217,40 @@ func createHandlerJob(taskModel models.TaskHost) cron.FuncJob { return taskFunc } + +func createHandler(taskModel models.TaskHost) Handler { + var handler Handler = nil + switch taskModel.Protocol { + case models.TaskHTTP: + handler = new(HTTPHandler) + case models.TaskSSH: + handler = new(SSHCommandHandler) + case models.TaskLocalCommand: + handler = new(LocalCommandHandler) + } + + return handler; +} + +func execJob(handler Handler, taskModel models.TaskHost) TaskResult { + // 默认只运行任务一次 + var execTimes int8 = 1 + if (taskModel.RetryTimes > 0) { + execTimes += taskModel.RetryTimes + } + var i int8 = 0 + var output string + var err error + for i < execTimes { + output, err = handler.Run(taskModel) + if err == nil { + return TaskResult{Result: output, Err: err, RetryTimes: i} + } + i++ + if i < execTimes { + logger.Warnf("任务执行失败#任务id-%d#重试第%d次#输出-%s#错误信息-%s", taskModel.Id, i, output, err) + } + } + + return TaskResult{Result: output, Err: err, RetryTimes: taskModel.RetryTimes} +} \ No newline at end of file diff --git a/templates/task/index.html b/templates/task/index.html index 6d45e08..dab3b0b 100644 --- a/templates/task/index.html +++ b/templates/task/index.html @@ -21,6 +21,7 @@ 协议 命令 超时时间(秒) + 重试次数 主机 备注 状态 @@ -35,6 +36,7 @@ {{{if eq .Protocol 1}}} HTTP {{{else}}} SSH {{{end}}} {{{.Command}}} {{{.Timeout}}} + {{{.RetryTimes}}} {{{.Alias}}} {{{.Remark}}} {{{if eq .Status 1}}} {{{else}}} {{{end}}} diff --git a/templates/task/log.html b/templates/task/log.html index ba8f336..cb8862b 100644 --- a/templates/task/log.html +++ b/templates/task/log.html @@ -29,6 +29,7 @@ cron表达式 协议 超时时间 + 重试次数 主机 执行时长 状态 @@ -42,6 +43,7 @@ {{{.Spec}}} {{{if eq .Protocol 1}}} HTTP {{{else if eq .Protocol 2}}} SSH {{{else}}} 本地命令 {{{end}}} {{{.Timeout}}}秒 + {{{.RetryTimes}}} {{{.Hostname}}} {{{if gt .TotalTime 0}}}{{{.TotalTime}}}秒{{{else}}}1秒{{{end}}}
diff --git a/templates/task/task_form.html b/templates/task/task_form.html index 1041196..04a28c6 100644 --- a/templates/task/task_form.html +++ b/templates/task/task_form.html @@ -73,13 +73,17 @@ -
+
- +
+
+ + +
-
+