From 50638e6dfa41036d30557baf8753cdec9e26e319 Mon Sep 17 00:00:00 2001 From: ouqiang Date: Sat, 27 Jan 2018 18:08:46 +0800 Subject: [PATCH] =?UTF-8?q?HTTP=E4=BB=BB=E5=8A=A1=E6=94=AF=E6=8C=81POST?= =?UTF-8?q?=E8=AF=B7=E6=B1=82=20Close=20#19?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/web.go | 10 ++-- models/migration.go | 11 ++++- models/model.go | 4 +- models/task.go | 14 ++++-- modules/app/app.go | 11 +++-- modules/httpclient/http_client.go | 3 +- modules/notify/notify.go | 2 +- modules/rpc/server/server.go | 6 --- modules/utils/utils.go | 14 +----- modules/utils/utils_unix.go | 2 +- routers/host/host.go | 5 +- routers/install/install.go | 3 +- routers/task/task.go | 18 +++---- routers/tasklog/task_log.go | 3 +- service/task.go | 82 ++++++++++++++++++++----------- templates/task/log.html | 10 ++-- templates/task/task_form.html | 12 +++++ 17 files changed, 120 insertions(+), 90 deletions(-) diff --git a/cmd/web.go b/cmd/web.go index 32bccaa..6dd9220 100644 --- a/cmd/web.go +++ b/cmd/web.go @@ -78,13 +78,12 @@ func initModule() { upgradeIfNeed() // 初始化定时任务 - serviceTask := new(service.Task) - serviceTask.Initialize() + service.ServiceTask.Initialize() } // 解析端口 func parsePort(ctx *cli.Context) int { - var port int = DefaultPort + port := DefaultPort if ctx.IsSet("port") { port = ctx.Int("port") } @@ -104,7 +103,7 @@ func parseHost(ctx *cli.Context) string { } func setEnvironment(ctx *cli.Context) { - var env string = "prod" + env := "prod" if ctx.IsSet("env") { env = ctx.String("env") } @@ -147,10 +146,9 @@ func shutdown() { return } logger.Info("应用准备退出") - serviceTask := new(service.Task) // 停止所有任务调度 logger.Info("停止定时任务调度") - serviceTask.WaitAndExit() + service.ServiceTask.WaitAndExit() } // 判断应用是否需要升级, 当存在版本号文件且版本小于app.VersionId时升级 diff --git a/models/migration.go b/models/migration.go index 2ce201d..3f5b386 100644 --- a/models/migration.go +++ b/models/migration.go @@ -178,7 +178,16 @@ func (migration *Migration) upgradeFor140(session *xorm.Session) error { tableName := TablePrefix + "task" // task表增加字段 // retry_interval 重试间隔时间(秒) - _, err := session.Exec(fmt.Sprintf("ALTER TABLE %s ADD COLUMN retry_interval SMALLINT NOT NULL DEFAULT 0", tableName)) + // http_method http请求方法 + sql := fmt.Sprintf( + "ALTER TABLE %s ADD COLUMN retry_interval SMALLINT NOT NULL DEFAULT 0,ADD COLUMN http_method TINYINT NOT NULL DEFAULT 1", tableName) + _, err := session.Exec(sql) + + if err != nil { + return err + } + + logger.Info("已升级到v1.4\n") diff --git a/models/model.go b/models/model.go index 81195b9..1270978 100644 --- a/models/model.go +++ b/models/model.go @@ -16,7 +16,7 @@ import ( type Status int8 type CommonMap map[string]interface{} -var TablePrefix string = "" +var TablePrefix = "" var Db *xorm.Engine const ( @@ -99,7 +99,7 @@ func CreateTmpDb(setting *setting.Setting) (*xorm.Engine, error) { // 获取数据库引擎DSN mysql,sqlite func getDbEngineDSN(setting *setting.Setting) string { engine := strings.ToLower(setting.Db.Engine) - var dsn string = "" + dsn := "" switch engine { case "mysql": dsn = fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=%s", diff --git a/models/task.go b/models/task.go index a9ab26f..6c397ec 100644 --- a/models/task.go +++ b/models/task.go @@ -28,6 +28,13 @@ const ( TaskDependencyStatusWeak TaskDependencyStatus = 2 // 弱依赖 ) +type TaskHTTPMethod int8 + +const ( + TaskHTTPMethodGet TaskHTTPMethod = 1; + TaskHttpMethodPost TaskHTTPMethod = 2; +) + // 任务 type Task struct { Id int `xorm:"int pk autoincr"` @@ -38,6 +45,7 @@ type Task struct { Spec string `xorm:"varchar(64) notnull"` // crontab Protocol TaskProtocol `xorm:"tinyint notnull index"` // 协议 1:http 2:系统命令 Command string `xorm:"varchar(256) notnull"` // URL地址或shell命令 + HttpMethod TaskHTTPMethod `xorm:"tinyint notnull default 1"` // http请求方法 Timeout int `xorm:"mediumint notnull default 0"` // 任务执行超时时间(单位秒),0不限制 Multi int8 `xorm:"tinyint notnull default 1"` // 是否允许多实例运行 RetryTimes int8 `xorm:"tinyint notnull default 0"` // 重试次数 @@ -84,9 +92,9 @@ func (task *Task) CreateTestTask() { func (task *Task) UpdateBean(id int) (int64, error) { return Db.ID(id). - Cols("name,spec,protocol,command,timeout,multi," + - "retry_times,retry_interval,remark,notify_status," + - "notify_type,notify_receiver_id, dependency_task_id, dependency_status, tag"). + Cols(`name,spec,protocol,command,timeout,multi, + retry_times,retry_interval,remark,notify_status, + notify_type,notify_receiver_id, dependency_task_id, dependency_status, tag,http_method`). Update(task) } diff --git a/modules/app/app.go b/modules/app/app.go index 5551438..3fc60c8 100644 --- a/modules/app/app.go +++ b/modules/app/app.go @@ -110,11 +110,12 @@ func ToNumberVersion(versionString string) int { // 检测目录是否存在 func createDirIfNotExists(path ...string) { for _, value := range path { - if !utils.FileExist(value) { - err := os.Mkdir(value, 0755) - if err != nil { - logger.Fatal(fmt.Sprintf("创建目录失败:%s", err.Error())) - } + if utils.FileExist(value) { + continue + } + err := os.Mkdir(value, 0755) + if err != nil { + logger.Fatal(fmt.Sprintf("创建目录失败:%s", err.Error())) } } } diff --git a/modules/httpclient/http_client.go b/modules/httpclient/http_client.go index 29f4334..afd9c20 100644 --- a/modules/httpclient/http_client.go +++ b/modules/httpclient/http_client.go @@ -73,8 +73,7 @@ func request(req *http.Request, timeout int) ResponseWrapper { } func setRequestHeader(req *http.Request) { - req.Header.Set("Accept-Language", "zh-CN,zh;q=0.8,en;q=0.6") - req.Header.Set("User-Agent", "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/57.0.2987.133 Safari/537.36 golang/gocron") + req.Header.Set("User-Agent", "golang/gocron") } func createRequestError(err error) ResponseWrapper { diff --git a/modules/notify/notify.go b/modules/notify/notify.go index 1e34ddf..083ff24 100644 --- a/modules/notify/notify.go +++ b/modules/notify/notify.go @@ -12,7 +12,7 @@ type Notifiable interface { Send(msg Message) } -var queue chan Message = make(chan Message, 100) +var queue = make(chan Message, 100) func init() { go run() diff --git a/modules/rpc/server/server.go b/modules/rpc/server/server.go index 1e0ad55..9dc3c33 100644 --- a/modules/rpc/server/server.go +++ b/modules/rpc/server/server.go @@ -32,12 +32,6 @@ func (s Server) Run(ctx context.Context, req *pb.TaskRequest) (*pb.TaskResponse, } func Start(addr string, enableTLS bool, certificate auth.Certificate) { - defer func() { - if err := recover(); err != nil { - grpclog.Println("panic", err) - } - }() - l, err := net.Listen("tcp", addr) if err != nil { grpclog.Fatal(err) diff --git a/modules/utils/utils.go b/modules/utils/utils.go index a1cacab..62c08c8 100644 --- a/modules/utils/utils.go +++ b/modules/utils/utils.go @@ -3,11 +3,9 @@ package utils import ( "crypto/md5" "encoding/hex" - "fmt" "github.com/Tang-RoseChild/mahonia" "math/rand" "os" - "runtime" "strings" "time" ) @@ -41,11 +39,6 @@ func RandNumber(max int) int { return r.Intn(max) } -// 判断当前系统是否是windows -func IsWindows() bool { - return runtime.GOOS == "windows" -} - // GBK编码转换为UTF8 func GBK2UTF8(s string) (string, bool) { dec := mahonia.NewDecoder("gbk") @@ -99,9 +92,4 @@ func FileExist(file string) bool { } return true -} - -// 格式化环境变量 -func FormatUnixEnv(key, value string) string { - return fmt.Sprintf("export %s=%s; ", key, value) -} +} \ No newline at end of file diff --git a/modules/utils/utils_unix.go b/modules/utils/utils_unix.go index d6d68c9..0122e18 100644 --- a/modules/utils/utils_unix.go +++ b/modules/utils/utils_unix.go @@ -20,7 +20,7 @@ func ExecShell(ctx context.Context, command string) (string, error) { cmd.SysProcAttr = &syscall.SysProcAttr{ Setpgid: true, } - var resultChan chan Result = make(chan Result) + resultChan := make(chan Result) go func() { output, err := cmd.CombinedOutput() resultChan <- Result{string(output), err} diff --git a/routers/host/host.go b/routers/host/host.go index 2747bd1..b029356 100644 --- a/routers/host/host.go +++ b/routers/host/host.go @@ -127,8 +127,7 @@ func Store(ctx *macaron.Context, form HostForm) string { if err != nil { return json.CommonFailure("刷新任务主机信息失败", err) } - serviceTask := new(service.Task) - serviceTask.BatchAdd(tasks) + service.ServiceTask.BatchAdd(tasks) } return json.Success("保存成功", nil) @@ -190,7 +189,7 @@ func Ping(ctx *macaron.Context) string { // 解析查询参数 func parseQueryParams(ctx *macaron.Context) models.CommonMap { - var params models.CommonMap = models.CommonMap{} + var params = models.CommonMap{} params["Id"] = ctx.QueryInt("id") params["Name"] = ctx.QueryTrim("name") base.ParsePageAndPageSize(ctx, params) diff --git a/routers/install/install.go b/routers/install/install.go index f9bc750..43cc734 100644 --- a/routers/install/install.go +++ b/routers/install/install.go @@ -97,8 +97,7 @@ func Store(ctx *macaron.Context, form InstallForm) string { app.Installed = true // 初始化定时任务 - serviceTask := new(service.Task) - serviceTask.Initialize() + service.ServiceTask.Initialize() return json.Success("安装成功", nil) } diff --git a/routers/task/task.go b/routers/task/task.go index 74876c4..f6471a8 100644 --- a/routers/task/task.go +++ b/routers/task/task.go @@ -25,6 +25,7 @@ type TaskForm struct { Spec string Protocol models.TaskProtocol `binding:"In(1,2)"` Command string `binding:"Required;MaxSize(256)"` + HttpMethod models.TaskHTTPMethod `binding:"In(1,2)"` Timeout int `binding:"Range(0,86400)"` Multi int8 `binding:"In(1,2)"` RetryTimes int8 @@ -111,11 +112,11 @@ func Edit(ctx *macaron.Context) { ctx.HTML(200, "task/task_form") } -// 保存任务 +// 保存任务 todo 拆分为多个方法 快变成意大利面条式代码了 func Store(ctx *macaron.Context, form TaskForm) string { json := utils.JsonResponse{} taskModel := models.Task{} - var id int = form.Id + var id = form.Id nameExists, err := taskModel.NameExist(form.Name, form.Id) if err != nil { return json.CommonFailure(utils.FailureContent, err) @@ -130,7 +131,7 @@ func Store(ctx *macaron.Context, form TaskForm) string { taskModel.Name = form.Name taskModel.Protocol = form.Protocol - taskModel.Command = form.Command + taskModel.Command = strings.TrimSpace(form.Command) taskModel.Timeout = form.Timeout taskModel.Tag = form.Tag taskModel.Remark = form.Remark @@ -150,6 +151,7 @@ func Store(ctx *macaron.Context, form TaskForm) string { if taskModel.NotifyStatus > 0 && taskModel.NotifyReceiverId == "" { return json.CommonFailure("至少选择一个通知接收者") } + taskModel.HttpMethod = form.HttpMethod if taskModel.Protocol == models.TaskHTTP { command := strings.ToLower(taskModel.Command) if !strings.HasPrefix(command, "http://") && !strings.HasPrefix(command, "https://") { @@ -235,7 +237,7 @@ func Remove(ctx *macaron.Context) string { taskHostModel := new(models.TaskHost) taskHostModel.Remove(id) - service.Cron.RemoveJob(strconv.Itoa(id)) + service.ServiceTask.Remove(id) return json.Success(utils.SuccessContent, nil) } @@ -261,8 +263,7 @@ func Run(ctx *macaron.Context) string { } task.Spec = "手动运行" - serviceTask := new(service.Task) - serviceTask.Run(task) + service.ServiceTask.Run(task) return json.Success("任务已开始运行, 请到任务日志中查看结果", nil) } @@ -282,7 +283,7 @@ func changeStatus(ctx *macaron.Context, status models.Status) string { if status == models.Enabled { addTaskToTimer(id) } else { - service.Cron.RemoveJob(strconv.Itoa(id)) + service.ServiceTask.Remove(id) } return json.Success(utils.SuccessContent, nil) @@ -297,8 +298,7 @@ func addTaskToTimer(id int) { return } - taskService := service.Task{} - taskService.Add(task) + service.ServiceTask.Add(task) } // 解析查询参数 diff --git a/routers/tasklog/task_log.go b/routers/tasklog/task_log.go index 1a0a838..c7333d2 100644 --- a/routers/tasklog/task_log.go +++ b/routers/tasklog/task_log.go @@ -65,9 +65,8 @@ func Stop(ctx *macaron.Context) string { if len(task.Hosts) == 0 { return json.CommonFailure("任务节点列表为空") } - serviceTask := new(service.Task) for _, host := range task.Hosts { - serviceTask.Stop(host.Name, host.Port, id) + service.ServiceTask.Stop(host.Name, host.Port, id) } diff --git a/service/task.go b/service/task.go index e5b20c7..9b737b9 100644 --- a/service/task.go +++ b/service/task.go @@ -17,14 +17,27 @@ import ( "net/http" ) -// 定时任务调度管理器 -var Cron *cron.Cron +var ( + ServiceTask Task +) -// 同一任务是否有实例处于运行中 -var runInstance Instance +var ( + // 定时任务调度管理器 + serviceCron *cron.Cron -// 任务计数-正在运行中的任务 -var taskCount TaskCount + // 同一任务是否有实例处于运行中 + runInstance Instance + + // 任务计数-正在运行的任务 + taskCount TaskCount +) + +func init() { + serviceCron = cron.New() + serviceCron.Start() + taskCount = TaskCount{sync.WaitGroup{}, make(chan bool)} + go taskCount.Wait() +} // 任务计数 type TaskCount struct { @@ -79,14 +92,9 @@ type TaskResult struct { RetryTimes int8 } -// 初始化任务, 从数据库取出所有任务, 添加到定时任务并运行 -func (task *Task) Initialize() { - Cron = cron.New() - Cron.Start() - runInstance = Instance{} - taskCount = TaskCount{sync.WaitGroup{}, make(chan bool)} - go taskCount.Wait() +// 初始化任务, 从数据库取出所有任务, 添加到定时任务并运行 +func (task Task) Initialize() { taskModel := new(models.Task) taskList, err := taskModel.ActiveList() if err != nil { @@ -101,14 +109,14 @@ func (task *Task) Initialize() { } // 批量添加任务 -func (task *Task) BatchAdd(tasks []models.Task) { +func (task Task) BatchAdd(tasks []models.Task) { for _, item := range tasks { task.Add(item) } } // 添加任务 -func (task *Task) Add(taskModel models.Task) { +func (task Task) Add(taskModel models.Task) { if taskModel.Level == models.TaskLevelChild { logger.Errorf("添加任务失败#不允许添加子任务到调度器#任务Id-%d", taskModel.Id) return @@ -121,26 +129,30 @@ func (task *Task) Add(taskModel models.Task) { cronName := strconv.Itoa(taskModel.Id) // Cron任务采用数组存储, 删除任务需遍历数组, 并对数组重新赋值, 任务较多时,有性能问题 - Cron.RemoveJob(cronName) - err := Cron.AddFunc(taskModel.Spec, taskFunc, cronName) + serviceCron.RemoveJob(cronName) + err := serviceCron.AddFunc(taskModel.Spec, taskFunc, cronName) if err != nil { logger.Error("添加任务到调度器失败#", err) } } // 停止运行中的任务 -func (task *Task) Stop(ip string, port int, id int64) { +func (task Task) Stop(ip string, port int, id int64) { rpcClient.Stop(ip, port, id) } +func (task Task) Remove(id int) { + serviceCron.RemoveJob(strconv.Itoa(id)) +} + // 等待所有任务结束后退出 -func (task *Task) WaitAndExit() { - Cron.Stop() +func (task Task) WaitAndExit() { + serviceCron.Stop() taskCount.Exit() } // 直接运行任务 -func (task *Task) Run(taskModel models.Task) { +func (task Task) Run(taskModel models.Task) { go createJob(taskModel)() } @@ -158,7 +170,18 @@ func (h *HTTPHandler) Run(taskModel models.Task, taskUniqueId int64) (result str if taskModel.Timeout <= 0 || taskModel.Timeout > HttpExecTimeout { taskModel.Timeout = HttpExecTimeout } - resp := httpclient.Get(taskModel.Command, taskModel.Timeout) + var resp httpclient.ResponseWrapper + if (taskModel.HttpMethod == models.TaskHTTPMethodGet) { + resp = httpclient.Get(taskModel.Command, taskModel.Timeout) + } else { + urlFields := strings.Split(taskModel.Command, "?") + taskModel.Command = urlFields[0] + var params string + if (len(urlFields) >= 2) { + params = urlFields[1] + } + resp = httpclient.PostParams(taskModel.Command, params, taskModel.Timeout) + } // 返回状态码非200,均为失败 if resp.StatusCode != http.StatusOK { return resp.Body, errors.New(fmt.Sprintf("HTTP状态码非200-->%d", resp.StatusCode)) @@ -175,11 +198,11 @@ func (h *RPCHandler) Run(taskModel models.Task, taskUniqueId int64) (result stri taskRequest.Timeout = int32(taskModel.Timeout) taskRequest.Command = taskModel.Command taskRequest.Id = taskUniqueId - var resultChan chan TaskResult = make(chan TaskResult, len(taskModel.Hosts)) + resultChan := make(chan TaskResult, len(taskModel.Hosts)) for _, taskHost := range taskModel.Hosts { go func(th models.TaskHostDetail) { output, err := rpcClient.Exec(th.Name, th.Port, taskRequest) - var errorMessage string = "" + errorMessage := "" if err != nil { errorMessage = err.Error() } @@ -191,7 +214,7 @@ func (h *RPCHandler) Run(taskModel models.Task, taskUniqueId int64) (result stri } var aggregationErr error = nil - var aggregationResult string = "" + aggregationResult := "" for i := 0; i < len(taskModel.Hosts); i++ { taskResult := <-resultChan aggregationResult += taskResult.Result @@ -213,7 +236,7 @@ func createTaskLog(taskModel models.Task, status models.Status) (int64, error) { taskLogModel.Command = taskModel.Command taskLogModel.Timeout = taskModel.Timeout if taskModel.Protocol == models.TaskRPC { - var aggregationHost string = "" + aggregationHost := "" for _, host := range taskModel.Hosts { aggregationHost += fmt.Sprintf("%s-%s
", host.Alias, host.Name) } @@ -230,7 +253,7 @@ func createTaskLog(taskModel models.Task, status models.Status) (int64, error) { func updateTaskLog(taskLogId int64, taskResult TaskResult) (int64, error) { taskLogModel := new(models.TaskLog) var status models.Status - var result string = taskResult.Result + result := taskResult.Result if taskResult.Err != nil { status = models.Failure } else { @@ -245,7 +268,7 @@ func updateTaskLog(taskLogId int64, taskResult TaskResult) (int64, error) { } func createJob(taskModel models.Task) cron.FuncJob { - var handler Handler = createHandler(taskModel) + handler := createHandler(taskModel) if handler == nil { return nil } @@ -339,10 +362,9 @@ func execDependencyTask(taskModel models.Task, taskResult TaskResult) { if len(tasks) == 0 { logger.Errorf("依赖任务列表为空#主任务ID-%d", taskModel.Id) } - serviceTask := new(Task) for _, task := range tasks { task.Spec = fmt.Sprintf("依赖任务(主任务ID-%d)", taskModel.Id) - serviceTask.Run(task) + ServiceTask.Run(task) } } diff --git a/templates/task/log.html b/templates/task/log.html index 6ff61ab..ab95772 100644 --- a/templates/task/log.html +++ b/templates/task/log.html @@ -102,8 +102,10 @@ {{{end}}} - {{{if and (eq .Status 1) (eq .Protocol 2) }}} - + {{{if $.IsAdmin}}} + {{{if and (eq .Status 1) (eq .Protocol 2) }}} + + {{{end}}} {{{end}}} @@ -155,8 +157,8 @@ function stopTask(id, taskId) { util.confirm("确定要停止任务吗", function () { - util.post("/task/log/stop/", {id: id, task_id:taskId}, function () { - location.reload(); + util.post("/task/log/stop/", {id: id, task_id:taskId}, function (code, message) { + swal('提示', message, 'info'); }); }); } diff --git a/templates/task/task_form.html b/templates/task/task_form.html index f3a6f69..914c493 100644 --- a/templates/task/task_form.html +++ b/templates/task/task_form.html @@ -112,6 +112,16 @@ +
@@ -299,10 +309,12 @@ var protocol = $('#protocol').val(); if (protocol == 2) { $('#hostField').show(); + $('#http-method').hide(); return; } $('#hostField').hide(); + $('#http-method').show(); } $('.ui.checkbox')