增加延迟任务

pull/21/merge
ouqiang 2017-05-14 22:09:36 +08:00
parent ce6ba171e7
commit 0148fd424b
16 changed files with 643 additions and 2 deletions

View File

@ -9,6 +9,7 @@
* crontab时间表达式精确到秒
* 任务执行失败重试设置
* 任务超时设置
* 延时任务
* 任务执行方式
* 调用本机系统命令
* 通过SSH执行远程命令

View File

@ -71,6 +71,17 @@ func initModule() {
// 初始化定时任务
serviceTask := new(service.Task)
serviceTask.Initialize()
delayTaskEnabled, err := config.Key("delay.task.enable").Bool()
if err != nil {
logger.Error("获取延时任务配置失败", err)
return
}
if !delayTaskEnabled {
return
}
serviceDelayTask := new(service.DelayTask)
serviceDelayTask.Initialize()
}
// 解析端口
@ -128,6 +139,11 @@ func shutdown() {
serviceTask := new(service.Task)
// 停止所有任务调度
serviceTask.StopAll()
delayTaskEnable, _ := app.Setting.Key("delay.task.enable").Bool()
if delayTaskEnable {
serviceDelayTask := new(service.DelayTask)
serviceDelayTask.Stop()
}
taskNumInRunning := service.TaskNum.Num()
logger.Infof("正在运行的任务有%d个", taskNumInRunning)
if taskNumInRunning > 0 {

85
models/delay_task.go Normal file
View File

@ -0,0 +1,85 @@
package models
import (
"time"
"github.com/go-xorm/xorm"
)
// 延迟任务
type DelayTask struct {
Id int64 `xorm:"bigint pk autoincr"`
Url string `xorm:"varchar(128) not null"`
Params string `xorm:"varchar(256) not null default '' "`
Delay int `xorm:"mediumint notnull default 0"` // 延迟时间
Status Status `xorm:"tinyint notnull index(u_status_created) default 5"` // 状态 0:执行失败 1:执行中 2:执行成功 5: 待执行
Created time.Time `xorm:"datetime notnull created index(u_status_created)"`
Updated time.Time `xorm:"datetime updated"`
BaseModel `xorm:"-"`
}
func (task *DelayTask) Create() (insertId int64, err error) {
_, err = Db.Insert(task)
if err == nil {
insertId = task.Id
}
return
}
// 获取所有待执行任务
func (task *DelayTask) ActiveList(endTime time.Time) ([]DelayTask, error) {
list := make([]DelayTask, 0)
fields := "id,url,params,delay,created"
err := Db.Where("status = ? AND created <= ?", Waiting, endTime.Format(DefaultTimeFormat)).Cols(fields).Limit(task.PageSize, task.pageLimitOffset()).Find(&list)
return list, err
}
// 获取待执行任务数量
func (task *DelayTask) ActiveNum(endTime time.Time) (int, error) {
count ,err := Db.Where("status = ? AND created <= ?", Waiting, endTime.Format(DefaultTimeFormat)).Count(task)
return int(count), err
}
func (task *DelayTask) List(params CommonMap) ([]DelayTask, error) {
task.parsePageAndPageSize(params)
list := make([]DelayTask, 0)
session := Db.Desc("id")
task.parseWhere(session, params)
err := session.Limit(task.PageSize, task.pageLimitOffset()).Find(&list)
return list, err
}
// 更新任务状态
func (task *DelayTask) UpdateStatus(id int64, status Status) (int64, error) {
return Db.Table(task).Id(id).Update(CommonMap{
"status": status,
})
}
// 解析where
func (task *DelayTask) parseWhere(session *xorm.Session, params CommonMap) {
if len(params) == 0 {
return
}
status, ok := params["Status"]
if ok && status.(int) > -1 {
session.And("status = ?", status)
}
}
// 删除N个月前的日志
func (task *DelayTask) Remove(id int) (int64, error) {
t := time.Now().AddDate(0, -id, 0)
return Db.Where("created <= ?", t.Format(DefaultTimeFormat)).Delete(task)
}
func (task *DelayTask) Total(params CommonMap) (int64, error) {
session := Db.NewSession()
task.parseWhere(session, params)
return session.Count(task)
}

View File

@ -15,7 +15,7 @@ func (migration *Migration) Exec(dbName string) error {
setting := new(Setting)
task := new(Task)
tables := []interface{}{
&User{}, task, &TaskLog{}, &Host{}, setting,&LoginLog{},
&User{}, task, &DelayTask{}, &TaskLog{}, &Host{}, setting,&LoginLog{},
}
for _, table := range tables {
exist, err:= Db.IsTableExist(table)

View File

@ -26,6 +26,7 @@ const (
Finish Status = 2 // 完成
Cancel Status = 3 // 取消
Background Status = 4 // 后台运行
Waiting Status = 5 // 等待中
)
const (

View File

@ -0,0 +1,110 @@
package delaytask
import (
"gopkg.in/macaron.v1"
"github.com/ouqiang/gocron/models"
"github.com/ouqiang/gocron/modules/utils"
"strings"
"github.com/ouqiang/gocron/service"
"github.com/ouqiang/gocron/modules/logger"
"github.com/Unknwon/paginater"
"fmt"
"github.com/ouqiang/gocron/routers/base"
"html/template"
"github.com/ouqiang/gocron/modules/app"
)
func Index(ctx *macaron.Context) {
delayTaskModel := new(models.DelayTask)
queryParams := parseQueryParams(ctx)
total, err := delayTaskModel.Total(queryParams)
tasks, err := delayTaskModel.List(queryParams)
if err != nil {
logger.Error(err)
}
PageParams := fmt.Sprintf("status=%d&page_size=%d",
queryParams["Status"], queryParams["PageSize"]);
queryParams["PageParams"] = template.URL(PageParams)
p := paginater.New(int(total), queryParams["PageSize"].(int), queryParams["Page"].(int), 5)
ctx.Data["Pagination"] = p
ctx.Data["Title"] = "延时任务列表"
ctx.Data["Tasks"] = tasks
ctx.Data["Params"] = queryParams
ctx.HTML(200, "task/delay_task")
}
func Create(ctx *macaron.Context) string {
url := ctx.QueryTrim("url")
params := ctx.QueryTrim("params")
delay := ctx.QueryInt("delay")
json := utils.JsonResponse{}
delayTaskEnabled, _ := app.Setting.Key("delay.task.enable").Bool()
if !delayTaskEnabled {
return json.CommonFailure("未开启延时任务")
}
if url == "" {
return json.CommonFailure("url地址不能为空")
}
lowerUrl := strings.ToLower(url)
if !strings.HasPrefix(lowerUrl, "http") &&
!strings.HasPrefix(lowerUrl, "https") {
return json.CommonFailure("无效的url地址")
}
if len(url) > 128 {
return json.CommonFailure("url长度不能超过128")
}
maxDelay := 1 << 31
if delay <= 0 || delay > maxDelay {
return json.CommonFailure("无效的delay, 取值范围1-(2^31-1)")
}
if len(params) > 256 {
return json.CommonFailure("params长度不能超过256")
}
delayTask := new(models.DelayTask)
delayTask.Url = url
delayTask.Params = params
delayTask.Delay = delay
delayTask.Status = models.Waiting
_, err := delayTask.Create()
if err != nil {
return json.CommonFailure("添加失败", err)
}
logger.Infof("新增延时任务#id-%d#url-%s#params-%s#delay-%d",
delayTask.Id, delayTask.Url, delayTask.Params, delayTask.Delay)
delayTaskService := new(service.DelayTask)
delayTaskService.Add(*delayTask)
return json.Success("添加成功", nil)
}
// 删除N个月前的日志
func Remove(ctx *macaron.Context) string {
month := ctx.ParamsInt(":id")
json := utils.JsonResponse{}
if month < 1 || month > 12 {
return json.CommonFailure("参数取值范围1-12")
}
delayTaskModel := new(models.DelayTask)
_, err := delayTaskModel.Remove(month)
if err != nil {
return json.CommonFailure("删除失败", err)
}
return json.Success("删除成功", nil)
}
// 解析查询参数
func parseQueryParams(ctx *macaron.Context) (models.CommonMap) {
var params models.CommonMap = models.CommonMap{}
status := ctx.QueryInt("status")
if status >=0 {
status -= 1
}
params["Status"] = status
base.ParsePageAndPageSize(ctx, params)
return params
}

View File

@ -113,6 +113,7 @@ func writeConfig(form InstallForm) error {
"db.charset": "utf8",
"allow_ips" : "",
"app.name": "定时任务管理系统", // 应用名称
"delay.task.enable": "false", // 是否开启延时任务
}
return setting.Write(dbConfig, app.AppConfig)

View File

@ -17,6 +17,7 @@ import (
"github.com/go-macaron/gzip"
"github.com/ouqiang/gocron/routers/manage"
"github.com/ouqiang/gocron/routers/loginlog"
"github.com/ouqiang/gocron/routers/delaytask"
)
// 静态文件目录
@ -43,7 +44,7 @@ func Register(m *macaron.Macaron) {
m.Post("/editPassword", user.UpdatePassword)
})
// 任务
// 定时任务
m.Group("/task", func() {
m.Get("/create", task.Create)
m.Post("/store", binding.Bind(task.TaskForm{}), task.Store)
@ -57,6 +58,11 @@ func Register(m *macaron.Macaron) {
m.Get("/run/:id", task.Run)
})
// 延时任务
m.Group("/delaytask", func() {
m.Get("", delaytask.Index)
})
// 主机
m.Group("/host", func() {
m.Get("/create", host.Create)
@ -91,6 +97,8 @@ func Register(m *macaron.Macaron) {
m.Group("/api/v1", func() {
m.Route("/tasklog/update-status", "GET,POST", tasklog.UpdateStatus)
m.Post("/tasklog/remove/:id", tasklog.Remove)
m.Post("/delaytask/create", delaytask.Create)
m.Post("/delaytask/remove/:id", delaytask.Remove)
});
// 404错误

111
service/delay_task.go Normal file
View File

@ -0,0 +1,111 @@
package service
import (
"github.com/ouqiang/gocron/models"
"time"
"github.com/ouqiang/gocron/modules/logger"
"math"
"github.com/ouqiang/gocron/modules/httpclient"
"strings"
"github.com/ouqiang/timewheel"
)
var tw *timewheel.TimeWheel
type DelayTask struct {}
// 从数据库中取出所有延迟任务
func (task *DelayTask) Initialize() {
tw = timewheel.New(1 * time.Second, 3600)
tw.Start()
taskModel := new(models.DelayTask)
currentTime := time.Now()
taskNum, err := taskModel.ActiveNum(currentTime)
if err != nil {
logger.Error("延迟任务初始化#获取待执行的任务失败", err)
return
}
if taskNum == 0 {
logger.Debugf("延迟任务初始化#待执行的任务数量为0")
return
}
pageSize := 100
totalPage := int( math.Ceil(float64(taskNum) / float64(pageSize)) )
logger.Infof("延迟任务初始化#待执行的任务数量-%d#共%d页#每页取%d条", taskNum, totalPage, pageSize)
taskModel.PageSize = pageSize
for page := 1; page <= totalPage; page++ {
taskModel.Page = page
logger.Debugf("延迟任务初始化#取出任务列表#第%d页", page)
taskList, err := taskModel.ActiveList(currentTime)
if err != nil {
logger.Error("延迟任务初始化#获取任务列表失败", err)
}
task.BatchAdd(taskList)
}
logger.Info("延迟任务初始化完成")
}
// 批量添加任务
func (task *DelayTask) BatchAdd(taskList []models.DelayTask) {
for _, item := range(taskList) {
task.Add(item)
}
}
// 添加任务
func (task *DelayTask) Add(taskModel models.DelayTask) {
currentTimestamp := time.Now().Unix()
execTimestamp := taskModel.Created.Unix() + int64(taskModel.Delay)
// 时间过期, 立即执行任务
if execTimestamp <= currentTimestamp {
go task.Run(taskModel.Id, taskModel.Url, taskModel.Params)
return
}
delay := execTimestamp - currentTimestamp
tw.Add(time.Duration(delay) * time.Second, func() {
task.Run(taskModel.Id, taskModel.Url, taskModel.Params)
})
}
// 运行任务
func (task *DelayTask) Run(id int64, url, params string) {
taskModel := new(models.DelayTask)
_, err := taskModel.UpdateStatus(id, models.Running)
if err != nil {
logger.Error("延迟任务开始执行#更新任务状态失败", err)
return
}
timeout := 300
tryTimes := 3
success := false
logger.Infof("延迟任务开始执行#id-%d#url-%s#params-%s", id, url, params)
for i := 0; i < tryTimes; {
response := httpclient.PostBody(url, params, timeout)
if response.StatusCode == 200 && strings.TrimSpace(response.Body) == "success"{
success = true
break;
}
i++
if i < tryTimes {
logger.Errorf("延迟任务执行失败#重试第%d次#任务Id-%d#HTTP状态码-%d#HTTP-BODY-%s",
i,id,response.StatusCode,response.Body,
)
time.Sleep(30 * time.Second)
}
}
logger.Infof("延迟任务执行完成#id-%d", id)
var status models.Status
if success {
status = models.Finish
} else {
status = models.Failure
}
_ ,err = taskModel.UpdateStatus(id, status)
if err != nil {
logger.Error("延迟任务执行完成#更新任务状态失败", err)
}
}
func (task *DelayTask) Stop() {
tw.Stop()
}

View File

@ -0,0 +1,84 @@
{{{ template "common/header" . }}}
<style type="text/css">
pre {
white-space: pre-wrap;
word-wrap: break-word;
padding:10px;
background-color: #4C4C4C;
color: white;
}
</style>
<div class="ui grid">
<!--the vertical menu-->
{{{ template "task/menu" . }}}
<div class="twelve wide column">
<div class="pageHeader">
<div class="segment">
<h3 class="ui dividing header">
<div class="content">
</div>
</h3>
</div>
</div>
<form class="ui form">
<div class="fields search">
<div class="field">
<select name="status">
<option value="0"></option>
<option value="1" {{{if eq .Params.Status 0}}}selected{{{end}}} ></option>
<option value="2" {{{if eq .Params.Status 1}}}selected{{{end}}}></option>
<option value="3" {{{if eq .Params.Status 2}}}selected{{{end}}}></option>
<option value="6" {{{if eq .Params.Status 5}}}selected{{{end}}}></option>
</select>
</div>
<div class="field">
<button class="ui linkedin submit button"></button>
</div>
</div>
</form>
<table class="ui pink table">
<thead>
<tr>
<th>ID</th>
<th>URL</th>
<th></th>
<th></th>
<th></th>
<th></th>
<th></th>
</tr>
</thead>
<tbody>
{{{range $i, $v := .Tasks}}}
<tr>
<td>{{{.Id}}}</td>
<td>{{{.Url}}}</td>
<td>{{{.Params}}}</td>
<td>{{{.Delay}}}</td>
<td>{{{.Created.Format "2006-01-02 15:04:05" }}}</td>
<td>
{{{if or (eq .Status 0) (eq .Status 2) }}}
{{{.Updated.Format "2006-01-02 15:04:05" }}}
{{{end}}}
</td>
<td>
{{{if eq .Status 2}}}
{{{else if eq .Status 1}}}
<span style="color:green"></span>
{{{else if eq .Status 0}}}
<span style="color:red"></span>
{{{else if eq .Status 5}}}
<span style="color:#43A102"></span>
{{{end}}}
</td>
</tr>
{{{end}}}
</tbody>
</table>
{{{ template "common/pagination" .}}}
</div>
</div>
{{{ template "common/footer" . }}}

37
test.go Normal file
View File

@ -0,0 +1,37 @@
package main
import (
"github.com/ouqiang/timewheel"
"time"
"fmt"
"github.com/ouqiang/gocron/models"
)
func main() {
// tick刻度为1秒, 3600个槽
tw := timewheel.New(1 * time.Second, 3600)
tw.Start()
t := time.Now()
tw.Add(5 * time.Second, func() {
fmt.Println("5分钟", time.Now(), t.Add(5 * time.Second).Format(models.DefaultTimeFormat))
})
tw.Add(10 * time.Second, func() {
fmt.Println("10分钟", time.Now(), t.Add(10 * time.Second).Format(models.DefaultTimeFormat))
})
tw.Add(35 * time.Second, func() {
fmt.Println("35分钟", time.Now(), t.Add(35 * time.Second).Format(models.DefaultTimeFormat))
})
tw.Add(178 * time.Second, func() {
fmt.Println("178分钟", time.Now(), t.Add(178 * time.Second).Format(models.DefaultTimeFormat))
})
tw.Add(27 * time.Second, func() {
fmt.Println("27分钟", time.Now(), t.Add(27 * time.Second).Format(models.DefaultTimeFormat))
})
tw.Add(78 * time.Second, func() {
fmt.Println("78分钟", time.Now(), t.Add(78 * time.Second).Format(models.DefaultTimeFormat))
})
tw.Add(3 * time.Second, func() {
fmt.Println("3分钟", time.Now(), t.Add(3 * time.Second).Format(models.DefaultTimeFormat))
})
select {}
}

21
vendor/github.com/ouqiang/timewheel/LICENSE generated vendored Normal file
View File

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2017 qiang.ou
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

40
vendor/github.com/ouqiang/timewheel/README.md generated vendored Normal file
View File

@ -0,0 +1,40 @@
# timewheel
Golang实现的时间轮
![时间轮](https://raw.githubusercontent.com/ouqiang/timewheel/master/timewheel.jpg)
# 安装
```shell
go get -u github.com/ouqiang/timewheel
```
# 使用
```go
package main
import (
"github.com/ouqiang/timewheel"
"time"
)
func main() {
// tick刻度为1秒, 3600个槽
tw := timewheel.New(1 * time.Second, 3600)
tw.Start()
tw.Add(5 * time.Second, func() {
// do something
})
tw.Add(10 * time.Minute, func() {
// do something
})
tw.Add(35 * time.Hour, func() {
// do something
})
// 停止
tw.Stop()
}
```

120
vendor/github.com/ouqiang/timewheel/timewheel.go generated vendored Normal file
View File

@ -0,0 +1,120 @@
package timewheel
import (
"time"
"container/ring"
"container/list"
)
// @author qiang.ou<qingqianludao@gmail.com>
type TimeWheel struct {
interval time.Duration
ticker *time.Ticker
slots *ring.Ring
slotNum int
taskChannel chan Task
stopChannel chan bool
}
type Task struct {
delay time.Duration
circle int
job Job
}
type Job func()
func New(interval time.Duration, slotNum int) *TimeWheel {
if interval <= 0 || slotNum <= 0 {
return nil
}
tw := &TimeWheel{
interval: interval,
slots: ring.New(slotNum),
slotNum: slotNum,
taskChannel: make(chan Task),
stopChannel: make(chan bool),
}
tw.initSlots()
return tw
}
func (tw *TimeWheel) initSlots() {
for i := 0; i < tw.slots.Len(); i++ {
tw.slots.Value = list.New()
tw.slots = tw.slots.Next()
}
}
func (tw *TimeWheel) Start() {
tw.ticker = time.NewTicker(tw.interval)
go tw.start()
}
func (tw *TimeWheel) Add(delay time.Duration, job Job) {
if delay < 0 || job == nil {
return
}
tw.taskChannel <- Task{delay:delay, job: job}
}
func (tw *TimeWheel) Stop() {
tw.stopChannel <- true
}
func (tw *TimeWheel) start() {
for {
select {
case <- tw.ticker.C:
tw.tickHandler()
case task := <- tw.taskChannel:
tw.addTask(&task)
case <- tw.stopChannel:
tw.ticker.Stop()
return
}
}
}
func (tw *TimeWheel) tickHandler() {
l := tw.slots.Value.(*list.List)
tw.scanAndRunTask(l)
tw.slots = tw.slots.Next()
}
func (tw *TimeWheel) scanAndRunTask(l *list.List) {
for e := l.Front(); e != nil; {
task := e.Value.(*Task)
if task.circle > 0 {
task.circle--
e = e.Next()
continue
}
go task.job()
next := e.Next()
l.Remove(e)
e = next
}
}
func (tw *TimeWheel) addTask(task *Task) {
step, circle := tw.getStepAndCircle(task.delay)
task.circle = circle
l := tw.slots.Move(step).Value.(*list.List)
l.PushBack(task)
}
func (tw *TimeWheel) getStepAndCircle(d time.Duration) (step int, circle int) {
delaySeconds := int(d.Seconds())
intervalSeconds := int(tw.interval.Seconds())
circle = int(delaySeconds / intervalSeconds / tw.slotNum)
step = int(delaySeconds / intervalSeconds) % tw.slotNum
return
}

BIN
vendor/github.com/ouqiang/timewheel/timewheel.jpg generated vendored Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 12 KiB

6
vendor/vendor.json vendored
View File

@ -132,6 +132,12 @@
"revision": "09cded8978dc9e80714c4d85b0322337b0a1e5e0",
"revisionTime": "2016-03-02T07:53:16Z"
},
{
"checksumSHA1": "kIFW+u9fHefC8sWE4W9pYIfJv5k=",
"path": "github.com/ouqiang/timewheel",
"revision": "c28ec761087c32fd75ad7514db2a4988d5c872d9",
"revisionTime": "2017-05-14T12:16:09Z"
},
{
"checksumSHA1": "1keN4Q9F8uHk/Gt5YXG0oCEecKM=",
"path": "github.com/urfave/cli",