diff --git a/cmd/web.go b/cmd/web.go index 5e275a9..1e59ec1 100644 --- a/cmd/web.go +++ b/cmd/web.go @@ -4,7 +4,6 @@ import ( "github.com/ouqiang/gocron/models" "github.com/ouqiang/gocron/modules/app" "github.com/ouqiang/gocron/modules/logger" - "github.com/ouqiang/gocron/modules/rpc/grpcpool" "github.com/ouqiang/gocron/modules/setting" "github.com/ouqiang/gocron/routers" "github.com/ouqiang/gocron/service" @@ -13,7 +12,6 @@ import ( "os" "os/signal" "syscall" - "time" ) // web服务器默认端口 @@ -152,23 +150,7 @@ func shutdown() { serviceTask := new(service.Task) // 停止所有任务调度 logger.Info("停止定时任务调度") - serviceTask.StopAll() - - taskNumInRunning := service.TaskNum.Num() - logger.Infof("正在运行的任务有%d个", taskNumInRunning) - if taskNumInRunning > 0 { - logger.Info("等待所有任务执行完成后退出") - } - for { - if taskNumInRunning <= 0 { - break - } - time.Sleep(3 * time.Second) - taskNumInRunning = service.TaskNum.Num() - } - - // 释放gRPC连接池 - grpcpool.Pool.ReleaseAll() + serviceTask.Stop() } // 判断应用是否需要升级, 当存在版本号文件且版本小于app.VersionId时升级 diff --git a/modules/app/app.go b/modules/app/app.go index 1af491a..0a82a73 100644 --- a/modules/app/app.go +++ b/modules/app/app.go @@ -9,6 +9,7 @@ import ( "io/ioutil" "strconv" "strings" + "fmt" ) var ( @@ -35,7 +36,7 @@ func InitEnv(versionString string) { DataDir = AppDir + "/data" AppConfig = ConfDir + "/app.ini" VersionFile = ConfDir + "/.version" - checkDirExists(ConfDir, LogDir, DataDir) + createDirIfNeed(ConfDir, LogDir, DataDir) Installed = IsInstalled() VersionId = ToNumberVersion(versionString) } @@ -107,10 +108,13 @@ func ToNumberVersion(versionString string) int { } // 检测目录是否存在 -func checkDirExists(path ...string) { +func createDirIfNeed(path ...string) { for _, value := range path { if !utils.FileExist(value) { - logger.Fatal(value + "目录不存在或无权限访问") + err := os.Mkdir(value, 0755) + if err != nil { + logger.Fatal(fmt.Sprintf("创建目录失败:%s", err.Error())) + } } } } diff --git a/service/task.go b/service/task.go index 3c02620..a4a2884 100644 --- a/service/task.go +++ b/service/task.go @@ -23,61 +23,51 @@ var Cron *cron.Cron var runInstance Instance // 任务计数-正在运行中的任务 -var TaskNum TaskCount +var taskCount TaskCount // 任务计数 type TaskCount struct { - num int - sync.RWMutex + wg sync.WaitGroup + exit chan bool } -func (c *TaskCount) Add() { - c.Lock() - defer c.Unlock() - c.num += 1 +func (tc *TaskCount) Add() { + tc.wg.Add(1) } -func (c *TaskCount) Done() { - c.Lock() - defer c.Unlock() - c.num -= 1 +func (tc *TaskCount) Done() { + tc.wg.Done() } -func (c *TaskCount) Num() int { - c.RLock() - defer c.RUnlock() +func (tc *TaskCount) Exit() { + tc.wg.Done() + <-tc.exit +} - return c.num +func (tc *TaskCount) Wait() { + tc.Add() + tc.wg.Wait() + close(tc.exit) } // 任务ID作为Key type Instance struct { - Status map[int]bool - sync.RWMutex + m sync.Map } // 是否有任务处于运行中 func (i *Instance) has(key int) bool { - i.RLock() - defer i.RUnlock() - running, ok := i.Status[key] - if ok && running { - return true - } + _, ok := i.m.Load(key) - return false + return ok } func (i *Instance) add(key int) { - i.Lock() - defer i.Unlock() - i.Status[key] = true + i.m.Store(key, true) } func (i *Instance) done(key int) { - i.Lock() - defer i.Unlock() - delete(i.Status, key) + i.m.Delete(key) } type Task struct{} @@ -92,8 +82,9 @@ type TaskResult struct { func (task *Task) Initialize() { Cron = cron.New() Cron.Start() - runInstance = Instance{make(map[int]bool), sync.RWMutex{}} - TaskNum = TaskCount{0, sync.RWMutex{}} + runInstance = Instance{} + taskCount = TaskCount{sync.WaitGroup{}, make(chan bool)} + go taskCount.Wait() taskModel := new(models.Task) taskList, err := taskModel.ActiveList() @@ -137,8 +128,9 @@ func (task *Task) Add(taskModel models.Task) { } // 停止所有任务 -func (task *Task) StopAll() { +func (task *Task) Stop() { Cron.Stop() + taskCount.Exit() } // 直接运行任务 @@ -251,8 +243,8 @@ func createJob(taskModel models.Task) cron.FuncJob { return nil } taskFunc := func() { - TaskNum.Add() - defer TaskNum.Done() + taskCount.Add() + defer taskCount.Done() taskLogId := beforeExecJob(taskModel) if taskLogId <= 0 { return