From e11e8cc1cf2ff814416c4cfe22d63e70f7851f4b Mon Sep 17 00:00:00 2001 From: ouqiang Date: Fri, 24 Mar 2017 13:06:53 +0800 Subject: [PATCH] =?UTF-8?q?ansilbe=20host=E6=A8=A1=E5=9D=97=E9=87=8D?= =?UTF-8?q?=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 - cmd/web.go | 12 +----- conf/ansible_hosts.ini | 0 main.go | 2 +- models/model.go | 2 +- models/task.go | 9 ++++- modules/ansible/ansible.go | 14 +------ modules/ansible/host.go | 56 +++++++++++++------------- modules/app/app.go | 20 +++++---- modules/crontask/cron_task.go | 26 +++++++++--- routers/routers.go | 13 ++++++ service/task.go | 76 ++++++++++++++++++++++------------- 12 files changed, 132 insertions(+), 99 deletions(-) delete mode 100644 conf/ansible_hosts.ini create mode 100644 routers/routers.go diff --git a/.gitignore b/.gitignore index 62633b8..b667ebe 100644 --- a/.gitignore +++ b/.gitignore @@ -28,5 +28,4 @@ data/* log/* conf/install.lock conf/app.ini -conf/ansible_hosts.ini public/js/vue.js diff --git a/cmd/web.go b/cmd/web.go index 4e12761..25b0783 100644 --- a/cmd/web.go +++ b/cmd/web.go @@ -9,6 +9,7 @@ import ( "github.com/ouqiang/cron-scheduler/modules/app" "fmt" "os" + "github.com/ouqiang/cron-scheduler/routers" ) // web服务器默认端口 @@ -36,7 +37,7 @@ func run(ctx *cli.Context) { runScheduler() m := macaron.Classic() // 注册路由 - registerRouter(m) + routers.Register(m) // 注册中间件 registerMiddleware(m) port := parsePort(ctx) @@ -49,15 +50,6 @@ func runScheduler() { os.Exit(1) } -// 路由注册 -func registerRouter(m *macaron.Macaron) { - // 所有GET方法,自动注册HEAD方法 - m.SetAutoHead(true) - m.Get("/", func(ctx *macaron.Context) (string) { - return "go home" - }) -} - // 中间件注册 func registerMiddleware(m *macaron.Macaron) { m.Use(macaron.Logger()) diff --git a/conf/ansible_hosts.ini b/conf/ansible_hosts.ini deleted file mode 100644 index e69de29..0000000 diff --git a/main.go b/main.go index 11d5bb4..71cf34d 100644 --- a/main.go +++ b/main.go @@ -18,7 +18,7 @@ const AppVersion = "0.0.1" func main() { app := cli.NewApp() app.Name = "cron-scheduler" - app.Usage = "crons-scheduler service" + app.Usage = "cron-scheduler service" app.Version = AppVersion app.Commands = []cli.Command{ cmd.CmdWeb, diff --git a/models/model.go b/models/model.go index 4984876..034d298 100644 --- a/models/model.go +++ b/models/model.go @@ -25,7 +25,7 @@ const ( const ( Page = 1 // 当前页数 PageSize = 20 // 每页多少条数据 - MaxPageSize = 1000 // 每次最多取多少条 + MaxPageSize = 100000 // 每次最多取多少条 ) // 创建Db diff --git a/models/task.go b/models/task.go index 404bd69..99c5bd9 100644 --- a/models/task.go +++ b/models/task.go @@ -6,18 +6,25 @@ import ( type Protocol int8 +type TaskType int8 + const ( HTTP Protocol = 1 SSH Protocol = 2 ) +const ( + Timing TaskType = 1 + Delay TaskType = 2 +) + // 任务 type Task struct { Id int `xorm:"int pk autoincr"` Name string `xorm:"varchar(64) notnull"` // 任务名称 Spec string `xorm:"varchar(64) notnull"` // crontab 时间格式 Protocol Protocol `xorm:"tinyint notnull"` // 协议 1:http 2:ssh - Type int8 `xorm:"tinyint notnull default 1"` // 任务类型 1: 定时任务 2: 延时任务 + Type TaskType `xorm:"tinyint notnull default 1"` // 任务类型 1: 定时任务 2: 延时任务 Command string `xorm:"varchar(512) notnull"` // URL地址或shell命令 Timeout int `xorm:"mediumint notnull default 0"` // 定时任务:执行超时时间(单位秒),0不限制 延时任务: 延时timeout秒后执行 SshHosts string `xorm:"varchar(512) notnull defalut '' "` // SSH主机名, host id,逗号分隔 diff --git a/modules/ansible/ansible.go b/modules/ansible/ansible.go index 71f7065..69e4749 100644 --- a/modules/ansible/ansible.go +++ b/modules/ansible/ansible.go @@ -48,13 +48,7 @@ func ExecCommand(hosts string, module string, args... string) (output string, er err = errors.New("参数不完整") return } - hostFile, err := DefaultHosts.GetHostFile() - if err != nil { - return - } - defer func() { - os.Remove(hostFile) - }() + hostFile := DefaultHosts.GetFilename() commandArgs := []string{hosts, "-i", hostFile, "-m", module} if len(args) != 0 { commandArgs = append(commandArgs, "-a") @@ -76,14 +70,10 @@ func ExecPlaybook(playbook Playbook) (result string, err error) { if err != nil { return } - hostFile, err := DefaultHosts.GetHostFile() - if err != nil { - return - } + hostFile := DefaultHosts.GetFilename() defer func() { playbookFile.Close() os.Remove(playbookFile.Name()) - os.Remove(hostFile) }() _, err = playbookFile.Write(data) if err != nil { diff --git a/modules/ansible/host.go b/modules/ansible/host.go index ad8eef4..132f860 100644 --- a/modules/ansible/host.go +++ b/modules/ansible/host.go @@ -6,6 +6,7 @@ import ( "io/ioutil" "bytes" "strconv" + "github.com/ouqiang/cron-scheduler/modules/utils" ) // 主机名 @@ -13,27 +14,38 @@ var DefaultHosts *Hosts type Hosts struct { sync.RWMutex - hosts []models.Host + filename string } -func(h *Hosts) Get() []models.Host { - h.RLock() - defer h.RUnlock() +func NewHosts(hostFilename string) *Hosts { + h := &Hosts{sync.RWMutex{}, hostFilename} - return h.hosts -} - -func(h *Hosts) Set(hostsModel []models.Host) { - h.Lock() - defer h.Unlock() - - h.hosts = hostsModel + return h } // 获取hosts文件名 -func(h *Hosts) GetHostFile() (filename string ,err error) { +func(h *Hosts) GetFilename() string { + h.RLock() + defer h.RUnlock() + + return h.filename +} + + +// 写入hosts +func(h *Hosts) Write() { + host := new(models.Host) + hostModels, err := host.List() + if err != nil { + utils.RecordLog(err) + return + } + if len(hostModels) == 0 { + utils.RecordLog("hosts内容为空") + return + } buffer := bytes.Buffer{} - for _, hostModel := range(h.hosts) { + for _, hostModel := range(hostModels) { buffer.WriteString(strconv.Itoa(int(hostModel.Id))) buffer.WriteString(" ansible_ssh_host=") buffer.WriteString(hostModel.Name) @@ -47,19 +59,9 @@ func(h *Hosts) GetHostFile() (filename string ,err error) { } buffer.WriteString("\n") } - tmpFile, err := ioutil.TempFile(GetTmpDir(), "host") - if err != nil { - return - } - - defer func() { - tmpFile.Close() - }() - - _, err = tmpFile.WriteString(buffer.String()) - if err == nil { - filename = tmpFile.Name() - } + h.Lock() + defer h.Unlock() + err = ioutil.WriteFile(h.filename, buffer.Bytes(), 0644) return } diff --git a/modules/app/app.go b/modules/app/app.go index a5b5f55..84461a4 100644 --- a/modules/app/app.go +++ b/modules/app/app.go @@ -5,9 +5,9 @@ import ( "runtime" "github.com/ouqiang/cron-scheduler/modules/utils" - "github.com/ouqiang/cron-scheduler/modules/ansible" "github.com/ouqiang/cron-scheduler/modules/crontask" "github.com/ouqiang/cron-scheduler/models" + "github.com/ouqiang/cron-scheduler/modules/ansible" ) var ( @@ -36,7 +36,7 @@ func init() { os.Setenv("ANSIBLE_CONFIG", ConfDir) Installed = IsInstalled() if Installed { - initResource() + InitResource() } } @@ -78,17 +78,15 @@ func CreateInstallLock() error { // 初始化资源 -func initResource() { +func InitResource() { + // 初始化定时任务 crontask.DefaultCronTask = crontask.NewCronTask() + // 初始化DB models.Db = models.CreateDb(AppConfig) - ansible.DefaultHosts = &ansible.Hosts{} - hostModel := new(models.Host) - hosts, err := hostModel.List() - if err != nil { - utils.RecordLog(err) - } else { - ansible.DefaultHosts.Set(hosts) - } + // 初始化ansible Hosts + ansible.DefaultHosts = ansible.NewHosts(AnsibleHosts) + ansible.DefaultHosts.Write() + os.Exit(1) } // 检测目录是否存在 diff --git a/modules/crontask/cron_task.go b/modules/crontask/cron_task.go index 7eff1a5..6346ab9 100644 --- a/modules/crontask/cron_task.go +++ b/modules/crontask/cron_task.go @@ -4,24 +4,29 @@ import ( "github.com/robfig/cron" "errors" "sync" + "strings" ) var DefaultCronTask *CronTask +type CronMap map[string]*cron.Cron + type CronTask struct { sync.RWMutex - tasks map[string]*cron.Cron + tasks CronMap } func NewCronTask() *CronTask { return &CronTask { sync.RWMutex{}, - make(map[string]*cron.Cron), + make(CronMap), } } // 新增定时任务,如果name存在,则添加失败 -func(cronTask *CronTask) Add(name string, spec string, cmd func() ) error { +// name 任务名称 +// spec crontab时间格式定义 可定义多个时间\n分隔 +func(cronTask *CronTask) Add(name string, spec string, cmd cron.FuncJob ) (err error) { if name == "" || spec == "" || cmd == nil { return errors.New("参数不完整") } @@ -32,13 +37,22 @@ func(cronTask *CronTask) Add(name string, spec string, cmd func() ) error { cronTask.Lock() defer cronTask.Unlock() cronTask.tasks[name] = cron.New() - err := cronTask.tasks[name].AddFunc(spec, cmd) + specs := strings.Split(spec, "\n") + for _, item := range(specs) { + _, err = cron.Parse(item) + if err != nil { + return err + } + } + for _, item := range(specs) { + err = cronTask.tasks[name].AddFunc(item, cmd) + } return err } -// 任务不存在则新增,任务已存在则替换任务 -func(cronTask *CronTask) addOrReplace(name string, spec string, cmd func() ) error { +// 任务不存在则新增,任务已存在则删除后新增 +func(cronTask *CronTask) AddOrReplace(name string, spec string, cmd cron.FuncJob) error { if cronTask.IsExist(name) { cronTask.Delete(name) } diff --git a/routers/routers.go b/routers/routers.go new file mode 100644 index 0000000..d728ae4 --- /dev/null +++ b/routers/routers.go @@ -0,0 +1,13 @@ +package routers + +import "gopkg.in/macaron.v1" + +// 路由注册 +func Register(m *macaron.Macaron) { + // 所有GET方法,自动注册HEAD方法 + m.SetAutoHead(true) + // 首页 + m.Get("/", func(ctx *macaron.Context) (string) { + return "go home" + }) +} \ No newline at end of file diff --git a/service/task.go b/service/task.go index 0c076cf..5d5d4d0 100644 --- a/service/task.go +++ b/service/task.go @@ -8,6 +8,8 @@ import ( "strconv" "time" "github.com/ouqiang/cron-scheduler/modules/crontask" + "github.com/robfig/cron" + "errors" ) type Task struct {} @@ -18,9 +20,11 @@ func(task *Task) Initialize() { taskList, err := taskModel.List() if err != nil { utils.RecordLog("获取任务列表错误-", err.Error()) + return } if len(taskList) == 0 { utils.RecordLog("任务列表为空") + return } for _, item := range(taskList) { task.Add(item) @@ -28,24 +32,18 @@ func(task *Task) Initialize() { } // 添加任务 -func(task *Task) Add(taskModel models.Task) { - var taskFunc func() = nil; - switch taskModel.Protocol { - case models.HTTP: - taskFunc = func() { - var handler Handler = new(HTTPHandler) - handler.Run(taskModel) - } - case models.SSH: - taskFunc = func() { - var handler Handler = new(SSHHandler) - handler.Run(taskModel) - } - default: - utils.RecordLog("任务协议不存在-协议编号: ", taskModel.Protocol) +func(task *Task) Add(taskModel models.Task) { + taskFunc := createHandlerJob(taskModel) + if taskFunc == nil { + utils.RecordLog("添加任务#不存在的任务协议编号", taskModel.Protocol) + return } - if (taskFunc != nil) { - crontask.DefaultCronTask.Add(strconv.Itoa(taskModel.Id), taskModel.Spec, taskFunc) + // 定时任务 + if taskModel.Type == models.Timing { + crontask.DefaultCronTask.AddOrReplace(strconv.Itoa(taskModel.Id), taskModel.Spec, taskFunc) + } else if taskModel.Type == models.Delay { + // 延时任务 + time.AfterFunc(time.Duration(taskModel.Timeout), taskFunc) } } @@ -80,15 +78,6 @@ func(h *HTTPHandler) Run(taskModel models.Task) { utils.RecordLog("读取HTTP请求返回值失败-", err.Error()) } - _, err = taskModel.Update( - taskModel.Id, - models.CommonMap{ - "status": 0, - "result" : string(body), - }); - if err != nil { - utils.RecordLog("更新任务日志失败-", err.Error()) - } } // SSH任务 @@ -98,11 +87,40 @@ func(ssh *SSHHandler) Run(taskModel models.Task) { } -// 延时任务 -type DelayHandler struct {} +func createTaskLog(taskModel models.Task) (int64, error) { + taskLogModel := new(models.TaskLog) + taskLogModel.TaskId = taskModel.Id + taskLogModel.StartTime = time.Now() + taskLogModel.Status = models.Running + insertId, err := taskLogModel.Create() -func (handler *DelayHandler) Run(taskModel models.Task) { + return insertId, err +} + +func updateTaskLog(taskModel models.Task, result string) { + taskLogModel := new(models.TaskLog) + taskLogModel.TaskId= taskModel.Id + taskLogModel.StartTime = time.Now() } +func createHandlerJob(taskModel models.Task) cron.FuncJob { + var taskFunc cron.FuncJob = nil; + switch taskModel.Protocol { + case models.HTTP: + taskFunc = func() { + var handler Handler = new(HTTPHandler) + createTaskLog(taskModel) + handler.Run(taskModel) + } + case models.SSH: + taskFunc = func() { + var handler Handler = new(SSHHandler) + createTaskLog(taskModel) + handler.Run(taskModel) + } + } + + return taskFunc +}