diff --git a/cmd/web.go b/cmd/web.go index 25b0783..4fb537a 100644 --- a/cmd/web.go +++ b/cmd/web.go @@ -7,8 +7,6 @@ import ( "github.com/go-macaron/session" "github.com/go-macaron/csrf" "github.com/ouqiang/cron-scheduler/modules/app" - "fmt" - "os" "github.com/ouqiang/cron-scheduler/routers" ) @@ -31,10 +29,7 @@ var CmdWeb = cli.Command{ } func run(ctx *cli.Context) { - // 检测环境 - app.CheckEnv() - // 启动定时任务 - runScheduler() + app.InitEnv() m := macaron.Classic() // 注册路由 routers.Register(m) @@ -44,12 +39,6 @@ func run(ctx *cli.Context) { m.Run(port) } -// 定时任务调度 -func runScheduler() { - fmt.Println("hello world") - os.Exit(1) -} - // 中间件注册 func registerMiddleware(m *macaron.Macaron) { m.Use(macaron.Logger()) diff --git a/main.go b/main.go index 71cf34d..a8ee150 100644 --- a/main.go +++ b/main.go @@ -3,7 +3,7 @@ package main /*-------------------------------------------------------- 定时任务调度 兼容Linux crontab时间格式语法,最小粒度可精确到每秒 - 支持通过HTTP、SSH协议触发任务执行 + 支持通过HTTP、SSH协议任务执行 --------------------------------------------------------*/ import ( diff --git a/models/host.go b/models/host.go index 79e17d2..7d9a469 100644 --- a/models/host.go +++ b/models/host.go @@ -23,8 +23,13 @@ const ( ) // 新增 -func(host *Host) Create() (int64, error) { - return Db.Insert(host) +func(host *Host) Create() (insertId int16, err error) { + _, err = Db.Insert(host) + if err == nil { + insertId = host.Id + } + + return } // 更新 diff --git a/models/task.go b/models/task.go index 99c5bd9..3b78660 100644 --- a/models/task.go +++ b/models/task.go @@ -26,7 +26,8 @@ type Task struct { Protocol Protocol `xorm:"tinyint notnull"` // 协议 1:http 2:ssh Type TaskType `xorm:"tinyint notnull default 1"` // 任务类型 1: 定时任务 2: 延时任务 Command string `xorm:"varchar(512) notnull"` // URL地址或shell命令 - Timeout int `xorm:"mediumint notnull default 0"` // 定时任务:执行超时时间(单位秒),0不限制 延时任务: 延时timeout秒后执行 + Timeout int `xorm:"mediumint notnull default 0"` // 任务执行超时时间(单位秒),0不限制 + Delay int `xorm:"int notnull default 0"` // 延时任务,延时时间(单位秒) SshHosts string `xorm:"varchar(512) notnull defalut '' "` // SSH主机名, host id,逗号分隔 Remark string `xorm:"varchar(512) notnull default ''"` // 备注 Created time.Time `xorm:"datetime notnull created"` // 创建时间 @@ -37,10 +38,15 @@ type Task struct { } // 新增 -func(task *Task) Create() (int64, error) { +func(task *Task) Create() (insertId int, err error) { task.Status = Enabled - return Db.Insert(task) + _, err = Db.Insert(task) + if err == nil { + insertId = task.Id + } + + return } // 更新 @@ -63,6 +69,14 @@ func(task *Task) Enable(id int) (int64, error) { return task.Update(id, CommonMap{"status": Enabled}) } +func(task *Task) ActiveList() ([]Task, error) { + task.parsePageAndPageSize() + list := make([]Task, 0) + err := Db.Where("status = ?", Enabled).Desc("id").Find(&list) + + return list, err +} + func(task *Task) List() ([]Task, error) { task.parsePageAndPageSize() list := make([]Task, 0) diff --git a/models/task_log.go b/models/task_log.go index ce3c1dd..01ee06a 100644 --- a/models/task_log.go +++ b/models/task_log.go @@ -16,10 +16,15 @@ type TaskLog struct{ PageSize int `xorm:"-"` } -func(taskLog *TaskLog) Create() (int64, error) { +func(taskLog *TaskLog) Create() (insertId int, err error) { taskLog.Status = Running - return Db.Insert(taskLog) + _, err = Db.Insert(taskLog) + if err == nil { + insertId = taskLog.Id + } + + return } // 更新 diff --git a/models/user.go b/models/user.go index b8cb073..8d413c5 100644 --- a/models/user.go +++ b/models/user.go @@ -24,12 +24,17 @@ type User struct { } // 新增 -func(user *User) Create() (int64, error) { +func(user *User) Create() (insertId int, err error) { user.Status = Enabled user.Salt = user.generateSalt() user.Password = user.encryptPassword(user.Password, user.Salt) - return Db.Insert(user) + _, err = Db.Insert(user) + if err == nil { + insertId = user.Id + } + + return } // 更新 diff --git a/modules/ansible/ansible.go b/modules/ansible/ansible.go index 69e4749..2b1a40d 100644 --- a/modules/ansible/ansible.go +++ b/modules/ansible/ansible.go @@ -1,97 +1,29 @@ package ansible -// ansible ad-hoc playbook命令封装 +// ansible ad-hoc 命令封装 import ( - "os" "errors" - "gopkg.in/yaml.v2" - "io/ioutil" "github.com/ouqiang/cron-scheduler/modules/utils" ) -type Handler map[string]interface{} - -type Playbook struct { - Name string - Hosts string - Tasks []Handler - Handlers []Handler -} - -func(playbook *Playbook) SetHosts(hosts string) { - playbook.Hosts = hosts -} - -func(playbook *Playbook) SetName(name string) { - playbook.Name = name -} - -func(playbook *Playbook) AddTask(handler Handler) { - playbook.Tasks = append(playbook.Tasks, handler) -} - - -func(playbook *Playbook) AddHandler(handler Handler) { - playbook.Handlers = append(playbook.Handlers, handler) -} /** * 执行ad-hoc - * hosts 主机名 逗号分隔 + * hosts 主机名或主机别名 逗号分隔 + * hostFile 主机名文件 * module 调用模块 * args 传递给模块的参数 */ -func ExecCommand(hosts string, module string, args... string) (output string, err error) { - if hosts== "" || module == "" { +func ExecCommand(hosts string, hostFile string, args... string) (output string, err error) { + if hosts== "" || hostFile == "" || len(args) == 0 { err = errors.New("参数不完整") return } - hostFile := DefaultHosts.GetFilename() - commandArgs := []string{hosts, "-i", hostFile, "-m", module} - if len(args) != 0 { - commandArgs = append(commandArgs, "-a") - commandArgs = append(commandArgs, args...) - } + commandArgs := []string{hosts, "-i", hostFile} + commandArgs = append(commandArgs, args...) output, err = utils.ExecShell("ansible", commandArgs...) return -} - -// 执行playbook -func ExecPlaybook(playbook Playbook) (result string, err error) { - data, err := yaml.Marshal([]Playbook{playbook}) - if err != nil { - return - } - - playbookFile, err := ioutil.TempFile(GetTmpDir(), "playbook") - if err != nil { - return - } - hostFile := DefaultHosts.GetFilename() - defer func() { - playbookFile.Close() - os.Remove(playbookFile.Name()) - }() - _, err = playbookFile.Write(data) - if err != nil { - return - } - commandArgs := []string{"-i", hostFile, playbookFile.Name()} - result, err = utils.ExecShell("ansible-playbook", commandArgs...) - - return -} - -// 判断 获取临时目录,默认/dev/shm -func GetTmpDir() string { - dir := "/dev/shm" - _, err := os.Stat(dir) - if os.IsPermission(err) { - return "" - } - - return dir -} +} \ No newline at end of file diff --git a/modules/ansible/host.go b/modules/ansible/host.go index 132f860..d8572ff 100644 --- a/modules/ansible/host.go +++ b/modules/ansible/host.go @@ -19,6 +19,7 @@ type Hosts struct { func NewHosts(hostFilename string) *Hosts { h := &Hosts{sync.RWMutex{}, hostFilename} + h.Write() return h } diff --git a/modules/app/app.go b/modules/app/app.go index 84461a4..71442d3 100644 --- a/modules/app/app.go +++ b/modules/app/app.go @@ -8,6 +8,7 @@ import ( "github.com/ouqiang/cron-scheduler/modules/crontask" "github.com/ouqiang/cron-scheduler/models" "github.com/ouqiang/cron-scheduler/modules/ansible" + "github.com/ouqiang/cron-scheduler/service" ) var ( @@ -20,7 +21,8 @@ var ( Installed bool // 应用是否安装过 ) -func init() { +func InitEnv() { + CheckEnv() wd, err := os.Getwd() if err != nil { panic(err) @@ -79,14 +81,14 @@ func CreateInstallLock() error { // 初始化资源 func InitResource() { - // 初始化定时任务 - crontask.DefaultCronTask = crontask.NewCronTask() // 初始化DB models.Db = models.CreateDb(AppConfig) // 初始化ansible Hosts ansible.DefaultHosts = ansible.NewHosts(AnsibleHosts) - ansible.DefaultHosts.Write() - os.Exit(1) + // 初始化定时任务 + crontask.DefaultCronTask = crontask.NewCronTask() + serviceTask := new(service.Task) + serviceTask.Initialize() } // 检测目录是否存在 diff --git a/modules/crontask/cron_task.go b/modules/crontask/cron_task.go index 6346ab9..a322a23 100644 --- a/modules/crontask/cron_task.go +++ b/modules/crontask/cron_task.go @@ -34,10 +34,11 @@ func(cronTask *CronTask) Add(name string, spec string, cmd cron.FuncJob ) (err e return errors.New("任务已存在") } + spec = strings.TrimSpace(spec) cronTask.Lock() defer cronTask.Unlock() cronTask.tasks[name] = cron.New() - specs := strings.Split(spec, "\n") + specs := strings.Split(spec, "|||") for _, item := range(specs) { _, err = cron.Parse(item) if err != nil { @@ -93,7 +94,7 @@ func(cronTask *CronTask) Delete(name string) { } // 运行所有任务 -func(cronTask *CronTask) run() { +func(cronTask *CronTask) Run() { for _, cron := range cronTask.tasks { // cron内部有开启goroutine,此处不用新建goroutine cron.Start() diff --git a/routers/routers.go b/routers/routers.go index d728ae4..bb37a2b 100644 --- a/routers/routers.go +++ b/routers/routers.go @@ -1,13 +1,15 @@ package routers -import "gopkg.in/macaron.v1" +import ( + "gopkg.in/macaron.v1" +) // 路由注册 func Register(m *macaron.Macaron) { // 所有GET方法,自动注册HEAD方法 m.SetAutoHead(true) // 首页 - m.Get("/", func(ctx *macaron.Context) (string) { + m.Any("/", func(ctx *macaron.Context) (string) { return "go home" }) } \ No newline at end of file diff --git a/service/task.go b/service/task.go index 5d5d4d0..fd0f44d 100644 --- a/service/task.go +++ b/service/task.go @@ -9,15 +9,16 @@ import ( "time" "github.com/ouqiang/cron-scheduler/modules/crontask" "github.com/robfig/cron" - "errors" + "github.com/ouqiang/cron-scheduler/modules/ansible" + "fmt" ) type Task struct {} -// 初始化任务,从数据库取出所有任务添加到定时任务 -func(task *Task) Initialize() { +// 初始化任务, 从数据库取出所有任务, 添加到定时任务并运行 +func(task *Task) Initialize() { taskModel := new(models.Task) - taskList, err := taskModel.List() + taskList, err := taskModel.ActiveList() if err != nil { utils.RecordLog("获取任务列表错误-", err.Error()) return @@ -29,8 +30,11 @@ func(task *Task) Initialize() { for _, item := range(taskList) { task.Add(item) } + crontask.DefaultCronTask.Run() } + + // 添加任务 func(task *Task) Add(taskModel models.Task) { taskFunc := createHandlerJob(taskModel) @@ -40,21 +44,24 @@ func(task *Task) Add(taskModel models.Task) { } // 定时任务 if taskModel.Type == models.Timing { - crontask.DefaultCronTask.AddOrReplace(strconv.Itoa(taskModel.Id), taskModel.Spec, taskFunc) + err := crontask.DefaultCronTask.AddOrReplace(strconv.Itoa(taskModel.Id), taskModel.Spec, taskFunc) + if err != nil { + utils.RecordLog(err) + } } else if taskModel.Type == models.Delay { // 延时任务 - time.AfterFunc(time.Duration(taskModel.Timeout), taskFunc) + time.AfterFunc(time.Duration(taskModel.Delay) * time.Second, taskFunc) } } type Handler interface { - Run(taskModel models.Task) + Run(taskModel models.Task) (string, error) } // HTTP任务 type HTTPHandler struct {} -func(h *HTTPHandler) Run(taskModel models.Task) { +func(h *HTTPHandler) Run(taskModel models.Task) (result string, err error) { client := &http.Client{} if (taskModel.Timeout > 0) { client.Timeout = time.Duration(taskModel.Timeout) * time.Second @@ -68,7 +75,11 @@ func(h *HTTPHandler) Run(taskModel models.Task) { req.Header.Set("User-Agent", "golang-cron/scheduler") resp, err := client.Do(req) - defer resp.Body.Close() + defer func() { + if resp != nil { + resp.Body.Close() + } + }() if err != nil { utils.RecordLog("HTTP请求错误-", err.Error()) return @@ -78,18 +89,30 @@ func(h *HTTPHandler) Run(taskModel models.Task) { utils.RecordLog("读取HTTP请求返回值失败-", err.Error()) } + return string(body),err } // SSH任务 type SSHHandler struct {} -func(ssh *SSHHandler) Run(taskModel models.Task) { +func(ssh *SSHHandler) Run(taskModel models.Task) (string, error) { + var args []string = []string{ + "-m", "shell", + "-a", taskModel.Command, + } + if (taskModel.Timeout > 0) { + // -B 异步执行超时时间, -P 轮询时间 + args = append(args, "-B", strconv.Itoa(taskModel.Timeout), "-P", "10") + } + result, err := ansible.ExecCommand(taskModel.SshHosts, ansible.DefaultHosts.GetFilename(), args...) + + return result, err } -func createTaskLog(taskModel models.Task) (int64, error) { +func createTaskLog(taskId int) (int, error) { taskLogModel := new(models.TaskLog) - taskLogModel.TaskId = taskModel.Id + taskLogModel.TaskId = taskId taskLogModel.StartTime = time.Now() taskLogModel.Status = models.Running insertId, err := taskLogModel.Create() @@ -97,30 +120,44 @@ func createTaskLog(taskModel models.Task) (int64, error) { return insertId, err } -func updateTaskLog(taskModel models.Task, result string) { +func updateTaskLog(taskLogId int, result string, err error) (int64, error) { + fmt.Println(taskLogId) taskLogModel := new(models.TaskLog) - taskLogModel.TaskId= taskModel.Id - taskLogModel.StartTime = time.Now() + var status models.Status + if err != nil { + result = err.Error() + " " + result + status = models.Failure + } else { + status = models.Finish + } + return taskLogModel.Update(taskLogId, models.CommonMap{ + "status": status, + "result": result, + }); } func createHandlerJob(taskModel models.Task) cron.FuncJob { - var taskFunc cron.FuncJob = nil; + var handler Handler = nil switch taskModel.Protocol { case models.HTTP: - taskFunc = func() { - var handler Handler = new(HTTPHandler) - createTaskLog(taskModel) - handler.Run(taskModel) - } + handler = new(HTTPHandler) case models.SSH: - taskFunc = func() { - var handler Handler = new(SSHHandler) - createTaskLog(taskModel) - handler.Run(taskModel) - } + handler = new(SSHHandler) + } + taskFunc := func() { + taskLogId, err := createTaskLog(taskModel.Id) + if err != nil { + utils.RecordLog("写入任务日志失败-", err) + return + } + // err != nil 执行失败 + result, err := handler.Run(taskModel) + _, err = updateTaskLog(int(taskLogId), result, err) + if err != nil { + utils.RecordLog("更新任务日志失败-", err) + } } return taskFunc -} - +} \ No newline at end of file