时间轮启动参数从配置文件中读取

pull/21/merge v0.2
ouqiang 2017-05-16 11:03:28 +08:00
parent a7fd364b7d
commit 93bb9f90b1
8 changed files with 37 additions and 13 deletions

View File

@ -72,16 +72,26 @@ func initModule() {
serviceTask := new(service.Task) serviceTask := new(service.Task)
serviceTask.Initialize() serviceTask.Initialize()
// 初始化延时任务
delayTaskEnabled, err := config.Key("delay.task.enable").Bool() delayTaskEnabled, err := config.Key("delay.task.enable").Bool()
if err != nil { if err != nil {
logger.Error("获取延时任务配置失败", err)
return return
} }
if !delayTaskEnabled { if !delayTaskEnabled {
return return
} }
delayTaskSlots, err := config.Key("delay.task.slots").Int()
if err != nil {
return
}
delayTaskTick := config.Key("delay.task.tick").String()
tick, err := time.ParseDuration(delayTaskTick)
if err != nil {
return
}
serviceDelayTask := new(service.DelayTask) serviceDelayTask := new(service.DelayTask)
serviceDelayTask.Initialize() serviceDelayTask.Initialize(tick, delayTaskSlots)
} }
// 解析端口 // 解析端口
@ -135,12 +145,14 @@ func shutdown() {
os.Exit(0) os.Exit(0)
return return
} }
logger.Info("应用准备退出, 停止任务调度") logger.Info("应用准备退出")
serviceTask := new(service.Task) serviceTask := new(service.Task)
// 停止所有任务调度 // 停止所有任务调度
logger.Info("停止定时任务调度")
serviceTask.StopAll() serviceTask.StopAll()
delayTaskEnable, _ := app.Setting.Key("delay.task.enable").Bool() delayTaskEnable, _ := app.Setting.Key("delay.task.enable").Bool()
if delayTaskEnable { if delayTaskEnable {
logger.Info("停止延时任务调度")
serviceDelayTask := new(service.DelayTask) serviceDelayTask := new(service.DelayTask)
serviceDelayTask.Stop() serviceDelayTask.Stop()
} }

View File

@ -13,7 +13,7 @@ import (
"github.com/ouqiang/gocron/cmd" "github.com/ouqiang/gocron/cmd"
) )
const AppVersion = "0.1" const AppVersion = "0.2"
func main() { func main() {
app := cli.NewApp() app := cli.NewApp()

View File

@ -25,7 +25,18 @@ func Get(url string, timeout int) ResponseWrapper {
return request(req, timeout) return request(req, timeout)
} }
func PostBody(url string, body string, timeout int) ResponseWrapper { func PostParams(url string,params string, timeout int) ResponseWrapper {
buf := bytes.NewBufferString(params)
req, err := http.NewRequest("POST", url, buf)
if err != nil {
return createRequestError(err)
}
req.Header.Set("Content-type", "application/x-www-form-urlencoded")
return request(req, timeout)
}
func PostJson(url string, body string, timeout int) ResponseWrapper {
buf := bytes.NewBufferString(body) buf := bytes.NewBufferString(body)
req, err := http.NewRequest("POST", url, buf) req, err := http.NewRequest("POST", url, buf)
if err != nil { if err != nil {

View File

@ -43,7 +43,7 @@ func (slack *Slack) send(msg Message, slackUrl string, channel string) {
maxTimes := 3 maxTimes := 3
i := 0 i := 0
for i < maxTimes { for i < maxTimes {
resp := httpclient.PostBody(slackUrl, formatBody, timeout) resp := httpclient.PostJson(slackUrl, formatBody, timeout)
if resp.StatusCode == 200 { if resp.StatusCode == 200 {
break; break;
} }

View File

@ -40,7 +40,7 @@ func Create(ctx *macaron.Context) string {
json := utils.JsonResponse{} json := utils.JsonResponse{}
delayTaskEnabled, _ := app.Setting.Key("delay.task.enable").Bool() delayTaskEnabled, _ := app.Setting.Key("delay.task.enable").Bool()
if !delayTaskEnabled { if !delayTaskEnabled {
return json.CommonFailure("未开启延时任务") return json.CommonFailure("系统未开启延时任务")
} }
if url == "" { if url == "" {
return json.CommonFailure("url地址不能为空") return json.CommonFailure("url地址不能为空")

View File

@ -114,6 +114,8 @@ func writeConfig(form InstallForm) error {
"allow_ips" : "", "allow_ips" : "",
"app.name": "定时任务管理系统", // 应用名称 "app.name": "定时任务管理系统", // 应用名称
"delay.task.enable": "false", // 是否开启延时任务 "delay.task.enable": "false", // 是否开启延时任务
"delay.task.slots": "3600", // 时间轮槽数量
"delay.task.tick": "1s", // 时间轮每次转动的时间
} }
return setting.Write(dbConfig, app.AppConfig) return setting.Write(dbConfig, app.AppConfig)

View File

@ -98,7 +98,7 @@ func Register(m *macaron.Macaron) {
m.Route("/tasklog/update-status", "GET,POST", tasklog.UpdateStatus) m.Route("/tasklog/update-status", "GET,POST", tasklog.UpdateStatus)
m.Post("/tasklog/remove/:id", tasklog.Remove) m.Post("/tasklog/remove/:id", tasklog.Remove)
m.Post("/delaytask/push", delaytask.Create) m.Post("/delaytask/push", delaytask.Create)
m.Post("/delaytask/remove/:id", delaytask.Remove) m.Post("/delaytask/log/remove/:id", delaytask.Remove)
}); });
// 404错误 // 404错误

View File

@ -15,8 +15,8 @@ var tw *timewheel.TimeWheel
type DelayTask struct {} type DelayTask struct {}
// 从数据库中取出所有延迟任务 // 从数据库中取出所有延迟任务
func (task *DelayTask) Initialize() { func (task *DelayTask) Initialize(tick time.Duration, slots int) {
tw = timewheel.New(1 * time.Second, 3600) tw = timewheel.New(tick, slots)
tw.Start() tw.Start()
taskModel := new(models.DelayTask) taskModel := new(models.DelayTask)
currentTime := time.Now() currentTime := time.Now()
@ -80,7 +80,7 @@ func (task *DelayTask) Run(id int64, url, params string) {
success := false success := false
logger.Infof("延迟任务开始执行#id-%d#url-%s#params-%s", id, url, params) logger.Infof("延迟任务开始执行#id-%d#url-%s#params-%s", id, url, params)
for i := 0; i < tryTimes; { for i := 0; i < tryTimes; {
response := httpclient.PostBody(url, params, timeout) response := httpclient.PostParams(url, params, timeout)
if response.StatusCode == 200 && strings.TrimSpace(response.Body) == "success"{ if response.StatusCode == 200 && strings.TrimSpace(response.Body) == "success"{
success = true success = true
break; break;
@ -88,8 +88,7 @@ func (task *DelayTask) Run(id int64, url, params string) {
i++ i++
if i < tryTimes { if i < tryTimes {
logger.Errorf("延迟任务执行失败#重试第%d次#任务Id-%d#HTTP状态码-%d#HTTP-BODY-%s", logger.Errorf("延迟任务执行失败#重试第%d次#任务Id-%d#HTTP状态码-%d#HTTP-BODY-%s",
i,id,response.StatusCode,response.Body, i,id,response.StatusCode,response.Body)
)
time.Sleep(30 * time.Second) time.Sleep(30 * time.Second)
} }
} }