From 93bb9f90b1ae77b29b1e21b874d484cc301f9909 Mon Sep 17 00:00:00 2001 From: ouqiang Date: Tue, 16 May 2017 11:03:28 +0800 Subject: [PATCH] =?UTF-8?q?=E6=97=B6=E9=97=B4=E8=BD=AE=E5=90=AF=E5=8A=A8?= =?UTF-8?q?=E5=8F=82=E6=95=B0=E4=BB=8E=E9=85=8D=E7=BD=AE=E6=96=87=E4=BB=B6?= =?UTF-8?q?=E4=B8=AD=E8=AF=BB=E5=8F=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/web.go | 18 +++++++++++++++--- gocron.go | 2 +- modules/httpclient/http_client.go | 13 ++++++++++++- modules/notify/slack.go | 2 +- routers/delaytask/delay_task.go | 2 +- routers/install/install.go | 2 ++ routers/routers.go | 2 +- service/delay_task.go | 9 ++++----- 8 files changed, 37 insertions(+), 13 deletions(-) diff --git a/cmd/web.go b/cmd/web.go index cad092b..a9f5dd5 100644 --- a/cmd/web.go +++ b/cmd/web.go @@ -72,16 +72,26 @@ func initModule() { serviceTask := new(service.Task) serviceTask.Initialize() + // 初始化延时任务 delayTaskEnabled, err := config.Key("delay.task.enable").Bool() if err != nil { - logger.Error("获取延时任务配置失败", err) return } if !delayTaskEnabled { return } + delayTaskSlots, err := config.Key("delay.task.slots").Int() + if err != nil { + return + } + delayTaskTick := config.Key("delay.task.tick").String() + tick, err := time.ParseDuration(delayTaskTick) + if err != nil { + return + } + serviceDelayTask := new(service.DelayTask) - serviceDelayTask.Initialize() + serviceDelayTask.Initialize(tick, delayTaskSlots) } // 解析端口 @@ -135,12 +145,14 @@ func shutdown() { os.Exit(0) return } - logger.Info("应用准备退出, 停止任务调度") + logger.Info("应用准备退出") serviceTask := new(service.Task) // 停止所有任务调度 + logger.Info("停止定时任务调度") serviceTask.StopAll() delayTaskEnable, _ := app.Setting.Key("delay.task.enable").Bool() if delayTaskEnable { + logger.Info("停止延时任务调度") serviceDelayTask := new(service.DelayTask) serviceDelayTask.Stop() } diff --git a/gocron.go b/gocron.go index d585c50..d3131ec 100644 --- a/gocron.go +++ b/gocron.go @@ -13,7 +13,7 @@ import ( "github.com/ouqiang/gocron/cmd" ) -const AppVersion = "0.1" +const AppVersion = "0.2" func main() { app := cli.NewApp() diff --git a/modules/httpclient/http_client.go b/modules/httpclient/http_client.go index daf3234..36d5c19 100644 --- a/modules/httpclient/http_client.go +++ b/modules/httpclient/http_client.go @@ -25,7 +25,18 @@ func Get(url string, timeout int) ResponseWrapper { return request(req, timeout) } -func PostBody(url string, body string, timeout int) ResponseWrapper { +func PostParams(url string,params string, timeout int) ResponseWrapper { + buf := bytes.NewBufferString(params) + req, err := http.NewRequest("POST", url, buf) + if err != nil { + return createRequestError(err) + } + req.Header.Set("Content-type", "application/x-www-form-urlencoded") + + return request(req, timeout) +} + +func PostJson(url string, body string, timeout int) ResponseWrapper { buf := bytes.NewBufferString(body) req, err := http.NewRequest("POST", url, buf) if err != nil { diff --git a/modules/notify/slack.go b/modules/notify/slack.go index 226d77f..6c5f8e8 100644 --- a/modules/notify/slack.go +++ b/modules/notify/slack.go @@ -43,7 +43,7 @@ func (slack *Slack) send(msg Message, slackUrl string, channel string) { maxTimes := 3 i := 0 for i < maxTimes { - resp := httpclient.PostBody(slackUrl, formatBody, timeout) + resp := httpclient.PostJson(slackUrl, formatBody, timeout) if resp.StatusCode == 200 { break; } diff --git a/routers/delaytask/delay_task.go b/routers/delaytask/delay_task.go index 9bfd691..297dbc2 100644 --- a/routers/delaytask/delay_task.go +++ b/routers/delaytask/delay_task.go @@ -40,7 +40,7 @@ func Create(ctx *macaron.Context) string { json := utils.JsonResponse{} delayTaskEnabled, _ := app.Setting.Key("delay.task.enable").Bool() if !delayTaskEnabled { - return json.CommonFailure("未开启延时任务") + return json.CommonFailure("系统未开启延时任务") } if url == "" { return json.CommonFailure("url地址不能为空") diff --git a/routers/install/install.go b/routers/install/install.go index 1aa1a14..12beb33 100644 --- a/routers/install/install.go +++ b/routers/install/install.go @@ -114,6 +114,8 @@ func writeConfig(form InstallForm) error { "allow_ips" : "", "app.name": "定时任务管理系统", // 应用名称 "delay.task.enable": "false", // 是否开启延时任务 + "delay.task.slots": "3600", // 时间轮槽数量 + "delay.task.tick": "1s", // 时间轮每次转动的时间 } return setting.Write(dbConfig, app.AppConfig) diff --git a/routers/routers.go b/routers/routers.go index c5f9d00..ed0c5bc 100644 --- a/routers/routers.go +++ b/routers/routers.go @@ -98,7 +98,7 @@ func Register(m *macaron.Macaron) { m.Route("/tasklog/update-status", "GET,POST", tasklog.UpdateStatus) m.Post("/tasklog/remove/:id", tasklog.Remove) m.Post("/delaytask/push", delaytask.Create) - m.Post("/delaytask/remove/:id", delaytask.Remove) + m.Post("/delaytask/log/remove/:id", delaytask.Remove) }); // 404错误 diff --git a/service/delay_task.go b/service/delay_task.go index f15f11d..4942f24 100644 --- a/service/delay_task.go +++ b/service/delay_task.go @@ -15,8 +15,8 @@ var tw *timewheel.TimeWheel type DelayTask struct {} // 从数据库中取出所有延迟任务 -func (task *DelayTask) Initialize() { - tw = timewheel.New(1 * time.Second, 3600) +func (task *DelayTask) Initialize(tick time.Duration, slots int) { + tw = timewheel.New(tick, slots) tw.Start() taskModel := new(models.DelayTask) currentTime := time.Now() @@ -80,7 +80,7 @@ func (task *DelayTask) Run(id int64, url, params string) { success := false logger.Infof("延迟任务开始执行#id-%d#url-%s#params-%s", id, url, params) for i := 0; i < tryTimes; { - response := httpclient.PostBody(url, params, timeout) + response := httpclient.PostParams(url, params, timeout) if response.StatusCode == 200 && strings.TrimSpace(response.Body) == "success"{ success = true break; @@ -88,8 +88,7 @@ func (task *DelayTask) Run(id int64, url, params string) { i++ if i < tryTimes { logger.Errorf("延迟任务执行失败#重试第%d次#任务Id-%d#HTTP状态码-%d#HTTP-BODY-%s", - i,id,response.StatusCode,response.Body, - ) + i,id,response.StatusCode,response.Body) time.Sleep(30 * time.Second) } }