refactor(延迟任务): 删除延迟任务模块

当任务较多时, 频繁读写数据库,数据库压力大, 计划拆分为独立项目, 用Redis实现持久化

BREAKING CHANGE: 不再支持延迟任务
pull/21/merge
ouqiang 2017-06-21 19:03:22 +08:00
parent eb02804aec
commit b8f13b4b0e
14 changed files with 6 additions and 469 deletions

5
.gitignore vendored
View File

@ -34,4 +34,7 @@ public/resource/javascript/vue.js
gocron
gocron.exe
gocron-node
gocron-node.exe
gocron-node.exe
node_modules
package.json

View File

@ -9,7 +9,6 @@
* crontab时间表达式精确到秒
* 任务执行失败重试设置
* 任务超时设置
* 延时任务
* 任务依赖配置
* 任务类型
* shell任务

View File

@ -121,27 +121,6 @@ func initModule() {
// 初始化定时任务
serviceTask := new(service.Task)
serviceTask.Initialize()
// 初始化延时任务
delayTaskEnabled, err := config.Key("delay.task.enable").Bool()
if err != nil {
return
}
if !delayTaskEnabled {
return
}
delayTaskSlots, err := config.Key("delay.task.slots").Int()
if err != nil {
return
}
delayTaskTick := config.Key("delay.task.tick").String()
tick, err := time.ParseDuration(delayTaskTick)
if err != nil {
return
}
serviceDelayTask := new(service.DelayTask)
serviceDelayTask.Initialize(tick, delayTaskSlots)
}
// 解析端口
@ -229,12 +208,6 @@ func shutdown() {
// 停止所有任务调度
logger.Info("停止定时任务调度")
serviceTask.StopAll()
delayTaskEnable, _ := app.Setting.Key("delay.task.enable").Bool()
if delayTaskEnable {
logger.Info("停止延时任务调度")
serviceDelayTask := new(service.DelayTask)
serviceDelayTask.Stop()
}
// 释放gRPC连接池
grpcpool.Pool.ReleaseAll()

View File

@ -1,85 +0,0 @@
package models
import (
"time"
"github.com/go-xorm/xorm"
)
// 延迟任务
type DelayTask struct {
Id int64 `xorm:"bigint pk autoincr"`
Url string `xorm:"varchar(128) not null"`
Params string `xorm:"varchar(256) not null default '' "`
Delay int `xorm:"mediumint notnull default 0"` // 延迟时间
Status Status `xorm:"tinyint notnull index(u_status_created) default 5"` // 状态 0:执行失败 1:执行中 2:执行成功 5: 待执行
Created time.Time `xorm:"datetime notnull created index(u_status_created)"`
Updated time.Time `xorm:"datetime updated"`
BaseModel `xorm:"-"`
}
func (task *DelayTask) Create() (insertId int64, err error) {
_, err = Db.Insert(task)
if err == nil {
insertId = task.Id
}
return
}
// 获取所有待执行任务
func (task *DelayTask) ActiveList(endTime time.Time) ([]DelayTask, error) {
list := make([]DelayTask, 0)
fields := "id,url,params,delay,created"
err := Db.Where("status IN (?, ?) AND created <= ?", Waiting, Running, endTime.Format(DefaultTimeFormat)).Cols(fields).Limit(task.PageSize, task.pageLimitOffset()).Find(&list)
return list, err
}
// 获取待执行任务数量
func (task *DelayTask) ActiveNum(endTime time.Time) (int, error) {
count ,err := Db.Where("status IN (?, ?) AND created <= ?", Waiting, Running, endTime.Format(DefaultTimeFormat)).Count(task)
return int(count), err
}
func (task *DelayTask) List(params CommonMap) ([]DelayTask, error) {
task.parsePageAndPageSize(params)
list := make([]DelayTask, 0)
session := Db.Desc("id")
task.parseWhere(session, params)
err := session.Limit(task.PageSize, task.pageLimitOffset()).Find(&list)
return list, err
}
// 更新任务状态
func (task *DelayTask) UpdateStatus(id int64, status Status) (int64, error) {
return Db.Table(task).Id(id).Update(CommonMap{
"status": status,
})
}
// 解析where
func (task *DelayTask) parseWhere(session *xorm.Session, params CommonMap) {
if len(params) == 0 {
return
}
status, ok := params["Status"]
if ok && status.(int) > -1 {
session.And("status = ?", status)
}
}
// 删除N个月前的日志
func (task *DelayTask) Remove(id int) (int64, error) {
t := time.Now().AddDate(0, -id, 0)
return Db.Where("created <= ?", t.Format(DefaultTimeFormat)).Delete(task)
}
func (task *DelayTask) Total(params CommonMap) (int64, error) {
session := Db.NewSession()
task.parseWhere(session, params)
return session.Count(task)
}

View File

@ -15,7 +15,7 @@ func (migration *Migration) Exec(dbName string) error {
setting := new(Setting)
task := new(Task)
tables := []interface{}{
&User{}, task, &DelayTask{}, &TaskLog{}, &Host{}, setting,&LoginLog{},
&User{}, task, &TaskLog{}, &Host{}, setting,&LoginLog{},
}
for _, table := range tables {
exist, err:= Db.IsTableExist(table)

View File

@ -26,7 +26,7 @@ function Util() {
// ajax错误处理
util.ajaxFailure = function() {
// todo 错误处理
swal(FAILURE_MESSAGE, '', 'error');
swal(FAILURE_MESSAGE, '', 'error');
};
// get请求
util.get = function(url, callback) {

View File

@ -1,110 +0,0 @@
package delaytask
import (
"gopkg.in/macaron.v1"
"github.com/ouqiang/gocron/models"
"github.com/ouqiang/gocron/modules/utils"
"strings"
"github.com/ouqiang/gocron/service"
"github.com/ouqiang/gocron/modules/logger"
"github.com/Unknwon/paginater"
"fmt"
"github.com/ouqiang/gocron/routers/base"
"html/template"
"github.com/ouqiang/gocron/modules/app"
)
func Index(ctx *macaron.Context) {
delayTaskModel := new(models.DelayTask)
queryParams := parseQueryParams(ctx)
total, err := delayTaskModel.Total(queryParams)
tasks, err := delayTaskModel.List(queryParams)
if err != nil {
logger.Error(err)
}
PageParams := fmt.Sprintf("status=%d&page_size=%d",
queryParams["Status"], queryParams["PageSize"]);
queryParams["PageParams"] = template.URL(PageParams)
p := paginater.New(int(total), queryParams["PageSize"].(int), queryParams["Page"].(int), 5)
ctx.Data["Pagination"] = p
ctx.Data["Title"] = "延时任务列表"
ctx.Data["Tasks"] = tasks
ctx.Data["Params"] = queryParams
ctx.HTML(200, "task/delay_task")
}
func Create(ctx *macaron.Context) string {
url := ctx.QueryTrim("url")
params := ctx.QueryTrim("params")
delay := ctx.QueryInt("delay")
json := utils.JsonResponse{}
delayTaskEnabled, _ := app.Setting.Key("delay.task.enable").Bool()
if !delayTaskEnabled {
return json.CommonFailure("系统未开启延时任务")
}
if url == "" {
return json.CommonFailure("url地址不能为空")
}
lowerUrl := strings.ToLower(url)
if !strings.HasPrefix(lowerUrl, "http") &&
!strings.HasPrefix(lowerUrl, "https") {
return json.CommonFailure("无效的url地址")
}
if len(url) > 128 {
return json.CommonFailure("url长度不能超过128")
}
maxDelay := 1 << 31
if delay <= 0 || delay > maxDelay {
return json.CommonFailure("无效的delay, 取值范围1-(2^31-1)")
}
if len(params) > 256 {
return json.CommonFailure("params长度不能超过256")
}
delayTask := new(models.DelayTask)
delayTask.Url = url
delayTask.Params = params
delayTask.Delay = delay
delayTask.Status = models.Waiting
_, err := delayTask.Create()
if err != nil {
return json.CommonFailure("添加失败", err)
}
logger.Infof("新增延时任务#id-%d#url-%s#params-%s#delay-%d",
delayTask.Id, delayTask.Url, delayTask.Params, delayTask.Delay)
delayTaskService := new(service.DelayTask)
delayTaskService.Add(*delayTask)
return json.Success("添加成功", nil)
}
// 删除N个月前的日志
func Remove(ctx *macaron.Context) string {
month := ctx.ParamsInt(":id")
json := utils.JsonResponse{}
if month < 1 || month > 12 {
return json.CommonFailure("参数取值范围1-12")
}
delayTaskModel := new(models.DelayTask)
_, err := delayTaskModel.Remove(month)
if err != nil {
return json.CommonFailure("删除失败", err)
}
return json.Success("删除成功", nil)
}
// 解析查询参数
func parseQueryParams(ctx *macaron.Context) (models.CommonMap) {
var params models.CommonMap = models.CommonMap{}
status := ctx.QueryInt("status")
if status >=0 {
status -= 1
}
params["Status"] = status
base.ParsePageAndPageSize(ctx, params)
return params
}

View File

@ -115,9 +115,6 @@ func writeConfig(form InstallForm) error {
"db.max.open.conns", "100",
"allow_ips", "",
"app.name", "定时任务管理系统", // 应用名称
"delay.task.enable", "false", // 是否开启延时任务
"delay.task.slots", "3600", // 时间轮槽数量
"delay.task.tick", "1s", // 时间轮每次转动的时间
"api.key", "",
"api.secret", "",
}

View File

@ -17,7 +17,6 @@ import (
"github.com/go-macaron/gzip"
"github.com/ouqiang/gocron/routers/manage"
"github.com/ouqiang/gocron/routers/loginlog"
"github.com/ouqiang/gocron/routers/delaytask"
"time"
"strconv"
)
@ -60,11 +59,6 @@ func Register(m *macaron.Macaron) {
m.Get("/run/:id", task.Run)
})
// 延时任务
m.Group("/delaytask", func() {
m.Get("", delaytask.Index)
})
// 主机
m.Group("/host", func() {
m.Get("/create", host.Create)
@ -98,8 +92,6 @@ func Register(m *macaron.Macaron) {
// API
m.Group("/api/v1", func() {
m.Post("/tasklog/remove/:id", tasklog.Remove)
m.Post("/delaytask/push", delaytask.Create)
m.Post("/delaytask/log/remove/:id", delaytask.Remove)
}, apiAuth);
// 404错误

View File

@ -1,133 +0,0 @@
package service
import (
"github.com/ouqiang/gocron/models"
"time"
"github.com/ouqiang/gocron/modules/logger"
"math"
"github.com/ouqiang/gocron/modules/httpclient"
"strings"
"github.com/ouqiang/timewheel"
"fmt"
"github.com/ouqiang/gocron/modules/app"
)
var tw *timewheel.TimeWheel
type DelayTask struct {}
// 从数据库中取出所有延迟任务
func (task *DelayTask) Initialize(tick time.Duration, slots int) {
tw = timewheel.New(tick, slots, task.Run)
tw.Start()
taskModel := new(models.DelayTask)
currentTime := time.Now()
taskNum, err := taskModel.ActiveNum(currentTime)
if err != nil {
logger.Error("延迟任务初始化#获取待执行的任务失败", err)
return
}
if taskNum == 0 {
logger.Debugf("延迟任务初始化#待执行的任务数量为0")
return
}
pageSize := 100
totalPage := int( math.Ceil(float64(taskNum) / float64(pageSize)) )
logger.Infof("延迟任务初始化#待执行的任务数量-%d#共%d页#每页取%d条", taskNum, totalPage, pageSize)
taskModel.PageSize = pageSize
for page := 1; page <= totalPage; page++ {
taskModel.Page = page
logger.Debugf("延迟任务初始化#取出任务列表#第%d页", page)
taskList, err := taskModel.ActiveList(currentTime)
if err != nil {
logger.Error("延迟任务初始化#获取任务列表失败", err)
}
task.BatchAdd(taskList)
}
logger.Info("延迟任务初始化完成")
}
// 批量添加任务
func (task *DelayTask) BatchAdd(taskList []models.DelayTask) {
for _, item := range(taskList) {
task.Add(item)
}
}
// 添加任务
func (task *DelayTask) Add(taskModel models.DelayTask) {
currentTimestamp := time.Now().Unix()
execTimestamp := taskModel.Created.Unix() + int64(taskModel.Delay)
// 时间过期, 立即执行任务
data := []interface{}{taskModel.Id, taskModel.Url, taskModel.Params}
if execTimestamp <= currentTimestamp {
go task.Run(data)
return
}
delay := execTimestamp - currentTimestamp
tw.Add(time.Duration(delay) * time.Second, data)
}
// 运行任务
func (task *DelayTask) Run(data []interface{}) {
if len(data) < 3 {
logger.Errorf("延时任务开始执行#参数不足#%+v", data)
return
}
id := data[0].(int64)
url := data[1].(string)
params := data[2].(string)
if id <= 0 || url == "" {
logger.Errorf("延时任务开始执行#参数为空#%+v", data)
return
}
taskModel := new(models.DelayTask)
_, err := taskModel.UpdateStatus(id, models.Running)
if err != nil {
logger.Error("延迟任务开始执行#更新任务状态失败", err)
return
}
timeout := 300
tryTimes := 3
success := false
logger.Infof("延迟任务开始执行#id-%d#url-%s#params-%s", id, url, params)
for i := 0; i < tryTimes; {
response := httpclient.PostParams(url, params, timeout)
if response.StatusCode == 200 && strings.TrimSpace(response.Body) == "success"{
success = true
break;
}
i++
if i < tryTimes {
msg := fmt.Sprintf("延迟任务执行失败#重试第%d次#任务Id-%d#HTTP状态码-%d#HTTP-BODY-%s",
i,id,response.StatusCode,response.Body)
logger.Error(msg)
FailureNotify(msg)
time.Sleep(30 * time.Second)
}
}
logger.Infof("延迟任务执行完成#id-%d", id)
var status models.Status
if success {
status = models.Finish
} else {
status = models.Failure
}
_ ,err = taskModel.UpdateStatus(id, status)
if err != nil {
logger.Error("延迟任务执行完成#更新任务状态失败", err)
}
}
func (task *DelayTask) Stop() {
tw.Stop()
}
func FailureNotify(message string) {
notifyUrl := app.Setting.Key("delay.task.failure.notify.url").String()
notifyUrl = strings.TrimSpace(notifyUrl)
if notifyUrl != "" {
params := fmt.Sprintf("error=%s", message)
httpclient.PostParams(notifyUrl, params, 60)
}
}

View File

@ -69,12 +69,6 @@
el: '.ui.striped.table',
methods: {
ping: function(id) {
swal({
title: '',
text: "连接中.......",
type: 'info',
showConfirmButton: false
});
util.get("/host/ping/" + id, function(code, message) {
swal('', '', 'success');
})

View File

@ -90,12 +90,6 @@
$('.ui.form').form(
{
onSuccess: function(event, fields) {
swal({
title: '',
text: "系统安装中.......",
type: 'info',
showConfirmButton: false
});
util.post('/install/store', fields, function(code, message) {
swal('');
setTimeout(function() {

View File

@ -1,84 +0,0 @@
{{{ template "common/header" . }}}
<style type="text/css">
pre {
white-space: pre-wrap;
word-wrap: break-word;
padding:10px;
background-color: #4C4C4C;
color: white;
}
</style>
<div class="ui grid">
<!--the vertical menu-->
{{{ template "task/menu" . }}}
<div class="twelve wide column">
<div class="pageHeader">
<div class="segment">
<h3 class="ui dividing header">
<div class="content">
</div>
</h3>
</div>
</div>
<form class="ui form">
<div class="fields search">
<div class="field">
<select name="status">
<option value="0"></option>
<option value="1" {{{if eq .Params.Status 0}}}selected{{{end}}} ></option>
<option value="2" {{{if eq .Params.Status 1}}}selected{{{end}}}></option>
<option value="3" {{{if eq .Params.Status 2}}}selected{{{end}}}></option>
<option value="6" {{{if eq .Params.Status 5}}}selected{{{end}}}></option>
</select>
</div>
<div class="field">
<button class="ui linkedin submit button"></button>
</div>
</div>
</form>
<table class="ui pink table">
<thead>
<tr>
<th>ID</th>
<th>URL</th>
<th></th>
<th></th>
<th></th>
<th></th>
<th></th>
</tr>
</thead>
<tbody>
{{{range $i, $v := .Tasks}}}
<tr>
<td>{{{.Id}}}</td>
<td>{{{.Url}}}</td>
<td>{{{.Params}}}</td>
<td>{{{.Delay}}}</td>
<td>{{{.Created.Format "2006-01-02 15:04:05" }}}</td>
<td>
{{{if or (eq .Status 0) (eq .Status 2) }}}
{{{.Updated.Format "2006-01-02 15:04:05" }}}
{{{end}}}
</td>
<td>
{{{if eq .Status 2}}}
{{{else if eq .Status 1}}}
<span style="color:green"></span>
{{{else if eq .Status 0}}}
<span style="color:red"></span>
{{{else if eq .Status 5}}}
<span style="color:#43A102"></span>
{{{end}}}
</td>
</tr>
{{{end}}}
</tbody>
</table>
{{{ template "common/pagination" .}}}
</div>
</div>
{{{ template "common/footer" . }}}

View File

@ -7,9 +7,6 @@
<a class="item {{{if eq .URI "/task/log"}}}active teal{{{end}}} " href="/task/log">
<i class="bar chart icon"></i>
</a>
<a class="item {{{if eq .URI "/delaytask"}}}active teal{{{end}}} " href="/delaytask">
<i class="bar chart icon"></i>
</a>
</div>
</div>
</div>