From c4af1653ff1370ef91d84576fdce749776ae59ae Mon Sep 17 00:00:00 2001 From: ouqiang Date: Thu, 7 Sep 2017 21:32:53 +0800 Subject: [PATCH] =?UTF-8?q?feat($task):=20=E5=A2=9E=E5=8A=A0=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E6=A0=87=E7=AD=BE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- models/migration.go | 36 ++++++++++++++++++++++++++++------- models/task.go | 9 ++++++++- routers/install/install.go | 4 ++++ routers/task/task.go | 7 +++++-- service/task.go | 34 --------------------------------- templates/task/index.html | 7 +++++++ templates/task/task_form.html | 9 +++++++++ 7 files changed, 62 insertions(+), 44 deletions(-) diff --git a/models/migration.go b/models/migration.go index ee186ea..8d3999d 100644 --- a/models/migration.go +++ b/models/migration.go @@ -49,21 +49,30 @@ func isDatabaseExist(name string) bool { // 迭代升级数据库, 新建表、新增字段等 func (migration *Migration) Upgrade(oldVersionId int) { - versionIds := []int{110} - upgradeFuncs := []func(*xorm.Session) error { - migration.upgradeFor110, + // v1.2版本不支持升级 + if oldVersionId == 120 { + return } - // 默认当前版本为v1.0 - startIndex := 0 + versionIds := []int{110, 122} + upgradeFuncs := []func(*xorm.Session) error { + migration.upgradeFor110, + migration.upgradeFor122, + } + + startIndex := -1 // 从当前版本的下一版本开始升级 for i, value := range versionIds { - if oldVersionId == value { - startIndex = i + 1 + if value > oldVersionId { + startIndex = i break; } } + if startIndex == -1 { + return + } + length := len(versionIds) if startIndex >= length { return @@ -135,3 +144,16 @@ func (migration *Migration) upgradeFor110(session *xorm.Session) error { return err } + +// 升级到1.2.2版本 +func (migration *Migration) upgradeFor122(session *xorm.Session) error { + logger.Info("开始升级到v1.2.2") + + tableName := TablePrefix + "task" + // task表增加tag字段 + _, err := session.Exec(fmt.Sprintf("ALTER TABLE %s ADD COLUMN tag VARCHAR(32) NOT NULL DEFAULT '' ", tableName)) + + logger.Info("已升级到v1.2.2\n") + + return err +} \ No newline at end of file diff --git a/models/task.go b/models/task.go index a0fe28e..c2c62ee 100644 --- a/models/task.go +++ b/models/task.go @@ -44,6 +44,7 @@ type Task struct { NotifyStatus int8 `xorm:"smallint notnull default 1"` // 任务执行结束是否通知 0: 不通知 1: 失败通知 2: 执行结束通知 NotifyType int8 `xorm:"smallint notnull default 0"` // 通知类型 1: 邮件 2: slack NotifyReceiverId string `xorm:"varchar(256) notnull default '' "` // 通知接受者ID, setting表主键ID,多个ID逗号分隔 + Tag string `xorm:"varchar(32) notnull default ''"` Remark string `xorm:"varchar(100) notnull default ''"` // 备注 Status Status `xorm:"tinyint notnull index default 0"` // 状态 1:正常 0:停止 Created time.Time `xorm:"datetime notnull created"` // 创建时间 @@ -73,6 +74,7 @@ func (task *Task) CreateTestTask() { task.Level = TaskLevelParent task.Protocol = TaskHTTP task.Spec = "*/30 * * * * *" + task.Tag = "test-task" // 查询IP地址区域信息 task.Command = "http://ip.taobao.com/service/getIpInfo.php?ip=117.27.140.253" task.Status = Enabled @@ -81,7 +83,7 @@ func (task *Task) CreateTestTask() { func (task *Task) UpdateBean(id int) (int64, error) { return Db.ID(id). - Cols("name,spec,protocol,command,timeout,multi,retry_times,remark,notify_status,notify_type,notify_receiver_id, dependency_task_id, dependency_status"). + Cols("name,spec,protocol,command,timeout,multi,retry_times,remark,notify_status,notify_type,notify_receiver_id, dependency_task_id, dependency_status, tag"). Update(task) } @@ -261,5 +263,10 @@ func (task *Task) parseWhere(session *xorm.Session, params CommonMap) { if ok && status.(int) > -1 { session.And("status = ?", status) } + + tag, ok := params["Tag"] + if ok && tag.(string) != "" { + session.And("tag = ? ", tag) + } } diff --git a/routers/install/install.go b/routers/install/install.go index b3c7458..821839f 100644 --- a/routers/install/install.go +++ b/routers/install/install.go @@ -120,6 +120,10 @@ func writeConfig(form InstallForm) error { "app.name", "定时任务管理系统", // 应用名称 "api.key", "", "api.secret", "", + "enable_tls", "false", + "ca_file", "", + "cert_file", "", + "key_file", "", } return setting.Write(dbConfig, app.AppConfig) diff --git a/routers/task/task.go b/routers/task/task.go index 9958f16..b35cd8d 100644 --- a/routers/task/task.go +++ b/routers/task/task.go @@ -29,6 +29,7 @@ type TaskForm struct { Multi int8 `binding:"In(1,2)"` RetryTimes int8 HostId string + Tag string Remark string NotifyStatus int8 `binding:"In(1,2,3)"` NotifyType int8 `binding:"In(1,2,3)"` @@ -63,8 +64,8 @@ func Index(ctx *macaron.Context) { if ok { safeNameHTML = template.HTMLEscapeString(name) } - PageParams := fmt.Sprintf("id=%d&host_id=%d&name=%s&protocol=%d&status=%d&page_size=%d", - queryParams["Id"], queryParams["HostId"], safeNameHTML, queryParams["Protocol"], queryParams["Status"], queryParams["PageSize"]); + PageParams := fmt.Sprintf("id=%d&host_id=%d&name=%s&protocol=%d&tag=%s&status=%d&page_size=%d", + queryParams["Id"], queryParams["HostId"], safeNameHTML, queryParams["Protocol"], queryParams["Tag"], queryParams["Status"], queryParams["PageSize"]); queryParams["PageParams"] = template.URL(PageParams) p := paginater.New(int(total), queryParams["PageSize"].(int), queryParams["Page"].(int), 5) ctx.Data["Pagination"] = p @@ -131,6 +132,7 @@ func Store(ctx *macaron.Context, form TaskForm) string { taskModel.Protocol = form.Protocol taskModel.Command = form.Command taskModel.Timeout = form.Timeout + taskModel.Tag = form.Tag taskModel.Remark = form.Remark taskModel.Multi = form.Multi taskModel.RetryTimes = form.RetryTimes @@ -301,6 +303,7 @@ func parseQueryParams(ctx *macaron.Context) (models.CommonMap) { params["HostId"] = ctx.QueryInt("host_id") params["Name"] = ctx.QueryTrim("name") params["Protocol"] = ctx.QueryInt("protocol") + params["Tag"] = ctx.QueryTrim("tag") status := ctx.QueryInt("status") if status >=0 { status -= 1 diff --git a/service/task.go b/service/task.go index 1585d61..ca2d403 100644 --- a/service/task.go +++ b/service/task.go @@ -14,9 +14,6 @@ import ( rpcClient "github.com/ouqiang/gocron/modules/rpc/client" pb "github.com/ouqiang/gocron/modules/rpc/proto" "strings" - "text/template" - "bytes" - "encoding/base64" ) // 定时任务调度管理器 @@ -304,9 +301,6 @@ func beforeExecJob(taskModel models.Task) (taskLogId int64) { // 任务执行后置操作 func afterExecJob(taskModel models.Task, taskResult TaskResult, taskLogId int64) { - if taskResult.Err != nil { - taskResult.Result = taskResult.Err.Error() + "\n" + taskResult.Result - } _, err := updateTaskLog(taskLogId, taskResult) if err != nil { logger.Error("任务结束#更新任务日志失败-", err) @@ -349,39 +343,11 @@ func execDependencyTask(taskModel models.Task, taskResult TaskResult) { } serviceTask := new(Task) for _, task := range tasks { - task.Command = appendResultToCommand(task.Command, taskResult) task.Spec = fmt.Sprintf("依赖任务(主任务ID-%d)", taskModel.Id) serviceTask.Run(task) } } -/** - * 添加主任务执行结果到子任务命令中, 占位符{{.Code}} {{.Message}} - */ -func appendResultToCommand(command string, taskResult TaskResult) string { - var code int8 = 0 - if taskResult.Err != nil { - code = 1 - } - data := map[string]interface{} { - "Code": code, - "Message": base64.StdEncoding.EncodeToString([]byte(taskResult.Result)), - } - var buf *bytes.Buffer = new(bytes.Buffer) - tmpl, err := template.New("command").Parse(command) - if err != nil { - logger.Errorf("替换子任务命令占位符失败#%s", err.Error()) - return command - } - err = tmpl.Execute(buf, data) - if err != nil { - logger.Errorf("替换子任务命令占位符失败#%s", err.Error()) - return command - } - - return buf.String() -} - // 发送任务结果通知 func SendNotification(taskModel models.Task, taskResult TaskResult) { var statusName string diff --git a/templates/task/index.html b/templates/task/index.html index 12ca680..949fe4a 100644 --- a/templates/task/index.html +++ b/templates/task/index.html @@ -22,6 +22,11 @@
+
+ +
+ + + +
+ +
+ +
+