新增配置项: 同时运行的任务数量

develop
ouqiang 2018-01-30 22:36:18 +08:00
parent 3ed64f9f22
commit e1fa3cf645
3 changed files with 37 additions and 14 deletions

View File

@ -33,6 +33,8 @@ type Setting struct {
CAFile string CAFile string
CertFile string CertFile string
KeyFile string KeyFile string
ConcurrencyQueue int
} }
// 读取配置 // 读取配置
@ -61,6 +63,7 @@ func Read(filename string) (*Setting, error) {
s.ApiKey = section.Key("api.key").MustString("") s.ApiKey = section.Key("api.key").MustString("")
s.ApiSecret = section.Key("api.secret").MustString("") s.ApiSecret = section.Key("api.secret").MustString("")
s.ApiSignEnable = section.Key("api.sign.enable").MustBool(true) s.ApiSignEnable = section.Key("api.sign.enable").MustBool(true)
s.ConcurrencyQueue = section.Key("concurrency.queue").MustInt(1000)
s.EnableTLS = section.Key("enable_tls").MustBool(false) s.EnableTLS = section.Key("enable_tls").MustBool(false)
s.CAFile = section.Key("ca_file").MustString("") s.CAFile = section.Key("ca_file").MustString("")

View File

@ -121,6 +121,7 @@ func writeConfig(form InstallForm) error {
"api.key", "", "api.key", "",
"api.secret", "", "api.secret", "",
"enable_tls", "false", "enable_tls", "false",
"concurrency.queue", "1000",
"ca_file", "", "ca_file", "",
"cert_file", "", "cert_file", "",
"key_file", "", "key_file", "",

View File

@ -11,6 +11,7 @@ import (
"github.com/jakecoffman/cron" "github.com/jakecoffman/cron"
"github.com/ouqiang/gocron/models" "github.com/ouqiang/gocron/models"
"github.com/ouqiang/gocron/modules/app"
"github.com/ouqiang/gocron/modules/httpclient" "github.com/ouqiang/gocron/modules/httpclient"
"github.com/ouqiang/gocron/modules/logger" "github.com/ouqiang/gocron/modules/logger"
"github.com/ouqiang/gocron/modules/notify" "github.com/ouqiang/gocron/modules/notify"
@ -31,13 +32,22 @@ var (
// 任务计数-正在运行的任务 // 任务计数-正在运行的任务
taskCount TaskCount taskCount TaskCount
// 并发队列, 限制同时运行的任务数量
concurrencyQueue ConcurrencyQueue
) )
func init() { // 并发队列
serviceCron = cron.New() type ConcurrencyQueue struct {
serviceCron.Start() queue chan bool
taskCount = TaskCount{sync.WaitGroup{}, make(chan bool)} }
go taskCount.Wait()
func (cq *ConcurrencyQueue) Push() {
cq.queue <- true
}
func (cq *ConcurrencyQueue) Pop() {
<-cq.queue
} }
// 任务计数 // 任务计数
@ -95,6 +105,12 @@ type TaskResult struct {
// 初始化任务, 从数据库取出所有任务, 添加到定时任务并运行 // 初始化任务, 从数据库取出所有任务, 添加到定时任务并运行
func (task Task) Initialize() { func (task Task) Initialize() {
serviceCron = cron.New()
serviceCron.Start()
concurrencyQueue = ConcurrencyQueue{queue: make(chan bool, app.Setting.ConcurrencyQueue)}
taskCount = TaskCount{sync.WaitGroup{}, make(chan bool)}
go taskCount.Wait()
logger.Info("开始初始化定时任务") logger.Info("开始初始化定时任务")
taskModel := new(models.Task) taskModel := new(models.Task)
taskNum := 0 taskNum := 0
@ -287,12 +303,22 @@ func createJob(taskModel models.Task) cron.FuncJob {
return nil return nil
} }
taskFunc := func() { taskFunc := func() {
taskCount.Add()
defer taskCount.Done()
taskLogId := beforeExecJob(taskModel) taskLogId := beforeExecJob(taskModel)
if taskLogId <= 0 { if taskLogId <= 0 {
return return
} }
taskCount.Add()
defer taskCount.Done()
if taskModel.Multi == 0 {
runInstance.add(taskModel.Id)
defer runInstance.done(taskModel.Id)
}
concurrencyQueue.Push()
defer concurrencyQueue.Pop()
logger.Infof("开始执行任务#%s#命令-%s", taskModel.Name, taskModel.Command) logger.Infof("开始执行任务#%s#命令-%s", taskModel.Name, taskModel.Command)
taskResult := execJob(handler, taskModel, taskLogId) taskResult := execJob(handler, taskModel, taskLogId)
logger.Infof("任务完成#%s#命令-%s", taskModel.Name, taskModel.Command) logger.Infof("任务完成#%s#命令-%s", taskModel.Name, taskModel.Command)
@ -320,15 +346,11 @@ func beforeExecJob(taskModel models.Task) (taskLogId int64) {
createTaskLog(taskModel, models.Cancel) createTaskLog(taskModel, models.Cancel)
return return
} }
if taskModel.Multi == 0 {
runInstance.add(taskModel.Id)
}
taskLogId, err := createTaskLog(taskModel, models.Running) taskLogId, err := createTaskLog(taskModel, models.Running)
if err != nil { if err != nil {
logger.Error("任务开始执行#写入任务日志失败-", err) logger.Error("任务开始执行#写入任务日志失败-", err)
return return
} }
logger.Debugf("任务命令-%s", taskModel.Command) logger.Debugf("任务命令-%s", taskModel.Command)
return taskLogId return taskLogId
@ -420,9 +442,6 @@ func execJob(handler Handler, taskModel models.Task, taskUniqueId int64) TaskRes
logger.Error("panic#service/task.go:execJob#", err) logger.Error("panic#service/task.go:execJob#", err)
} }
}() }()
if taskModel.Multi == 0 {
defer runInstance.done(taskModel.Id)
}
// 默认只运行任务一次 // 默认只运行任务一次
var execTimes int8 = 1 var execTimes int8 = 1
if taskModel.RetryTimes > 0 { if taskModel.RetryTimes > 0 {