diff --git a/modules/setting/setting.go b/modules/setting/setting.go index 9995550..7e8d541 100644 --- a/modules/setting/setting.go +++ b/modules/setting/setting.go @@ -33,6 +33,8 @@ type Setting struct { CAFile string CertFile string KeyFile string + + ConcurrencyQueue int } // 读取配置 @@ -61,6 +63,7 @@ func Read(filename string) (*Setting, error) { s.ApiKey = section.Key("api.key").MustString("") s.ApiSecret = section.Key("api.secret").MustString("") s.ApiSignEnable = section.Key("api.sign.enable").MustBool(true) + s.ConcurrencyQueue = section.Key("concurrency.queue").MustInt(1000) s.EnableTLS = section.Key("enable_tls").MustBool(false) s.CAFile = section.Key("ca_file").MustString("") diff --git a/routers/install/install.go b/routers/install/install.go index 15f71db..28f9845 100644 --- a/routers/install/install.go +++ b/routers/install/install.go @@ -121,6 +121,7 @@ func writeConfig(form InstallForm) error { "api.key", "", "api.secret", "", "enable_tls", "false", + "concurrency.queue", "1000", "ca_file", "", "cert_file", "", "key_file", "", diff --git a/service/task.go b/service/task.go index 7bf1ecb..47084ce 100644 --- a/service/task.go +++ b/service/task.go @@ -11,6 +11,7 @@ import ( "github.com/jakecoffman/cron" "github.com/ouqiang/gocron/models" + "github.com/ouqiang/gocron/modules/app" "github.com/ouqiang/gocron/modules/httpclient" "github.com/ouqiang/gocron/modules/logger" "github.com/ouqiang/gocron/modules/notify" @@ -31,13 +32,22 @@ var ( // 任务计数-正在运行的任务 taskCount TaskCount + + // 并发队列, 限制同时运行的任务数量 + concurrencyQueue ConcurrencyQueue ) -func init() { - serviceCron = cron.New() - serviceCron.Start() - taskCount = TaskCount{sync.WaitGroup{}, make(chan bool)} - go taskCount.Wait() +// 并发队列 +type ConcurrencyQueue struct { + queue chan bool +} + +func (cq *ConcurrencyQueue) Push() { + cq.queue <- true +} + +func (cq *ConcurrencyQueue) Pop() { + <-cq.queue } // 任务计数 @@ -95,6 +105,12 @@ type TaskResult struct { // 初始化任务, 从数据库取出所有任务, 添加到定时任务并运行 func (task Task) Initialize() { + serviceCron = cron.New() + serviceCron.Start() + concurrencyQueue = ConcurrencyQueue{queue: make(chan bool, app.Setting.ConcurrencyQueue)} + taskCount = TaskCount{sync.WaitGroup{}, make(chan bool)} + go taskCount.Wait() + logger.Info("开始初始化定时任务") taskModel := new(models.Task) taskNum := 0 @@ -287,12 +303,22 @@ func createJob(taskModel models.Task) cron.FuncJob { return nil } taskFunc := func() { - taskCount.Add() - defer taskCount.Done() taskLogId := beforeExecJob(taskModel) if taskLogId <= 0 { return } + + taskCount.Add() + defer taskCount.Done() + + if taskModel.Multi == 0 { + runInstance.add(taskModel.Id) + defer runInstance.done(taskModel.Id) + } + + concurrencyQueue.Push() + defer concurrencyQueue.Pop() + logger.Infof("开始执行任务#%s#命令-%s", taskModel.Name, taskModel.Command) taskResult := execJob(handler, taskModel, taskLogId) logger.Infof("任务完成#%s#命令-%s", taskModel.Name, taskModel.Command) @@ -320,15 +346,11 @@ func beforeExecJob(taskModel models.Task) (taskLogId int64) { createTaskLog(taskModel, models.Cancel) return } - if taskModel.Multi == 0 { - runInstance.add(taskModel.Id) - } taskLogId, err := createTaskLog(taskModel, models.Running) if err != nil { logger.Error("任务开始执行#写入任务日志失败-", err) return } - logger.Debugf("任务命令-%s", taskModel.Command) return taskLogId @@ -420,9 +442,6 @@ func execJob(handler Handler, taskModel models.Task, taskUniqueId int64) TaskRes logger.Error("panic#service/task.go:execJob#", err) } }() - if taskModel.Multi == 0 { - defer runInstance.done(taskModel.Id) - } // 默认只运行任务一次 var execTimes int8 = 1 if taskModel.RetryTimes > 0 {