gocron/service/task.go

163 lines
3.9 KiB
Go
Raw Normal View History

2017-03-10 09:24:06 +00:00
package service
import (
2017-03-23 05:58:42 +00:00
"github.com/ouqiang/cron-scheduler/models"
"github.com/ouqiang/cron-scheduler/modules/utils"
2017-03-10 09:24:06 +00:00
"net/http"
"io/ioutil"
"strconv"
"time"
2017-03-23 05:58:42 +00:00
"github.com/ouqiang/cron-scheduler/modules/crontask"
2017-03-24 05:06:53 +00:00
"github.com/robfig/cron"
2017-03-24 09:55:44 +00:00
"github.com/ouqiang/cron-scheduler/modules/ansible"
"fmt"
2017-03-10 09:24:06 +00:00
)
type Task struct {}
2017-03-24 09:55:44 +00:00
// 初始化任务, 从数据库取出所有任务, 添加到定时任务并运行
func(task *Task) Initialize() {
2017-03-10 09:24:06 +00:00
taskModel := new(models.Task)
2017-03-24 09:55:44 +00:00
taskList, err := taskModel.ActiveList()
2017-03-10 09:24:06 +00:00
if err != nil {
utils.RecordLog("获取任务列表错误-", err.Error())
2017-03-24 05:06:53 +00:00
return
2017-03-10 09:24:06 +00:00
}
if len(taskList) == 0 {
utils.RecordLog("任务列表为空")
2017-03-24 05:06:53 +00:00
return
2017-03-10 09:24:06 +00:00
}
for _, item := range(taskList) {
task.Add(item)
}
2017-03-24 09:55:44 +00:00
crontask.DefaultCronTask.Run()
2017-03-10 09:24:06 +00:00
}
2017-03-24 09:55:44 +00:00
2017-03-10 09:24:06 +00:00
// 添加任务
2017-03-24 05:06:53 +00:00
func(task *Task) Add(taskModel models.Task) {
taskFunc := createHandlerJob(taskModel)
if taskFunc == nil {
utils.RecordLog("添加任务#不存在的任务协议编号", taskModel.Protocol)
return
2017-03-10 09:24:06 +00:00
}
2017-03-24 05:06:53 +00:00
// 定时任务
if taskModel.Type == models.Timing {
2017-03-24 09:55:44 +00:00
err := crontask.DefaultCronTask.AddOrReplace(strconv.Itoa(taskModel.Id), taskModel.Spec, taskFunc)
if err != nil {
utils.RecordLog(err)
}
2017-03-24 05:06:53 +00:00
} else if taskModel.Type == models.Delay {
// 延时任务
2017-03-24 09:55:44 +00:00
time.AfterFunc(time.Duration(taskModel.Delay) * time.Second, taskFunc)
2017-03-10 09:24:06 +00:00
}
}
type Handler interface {
2017-03-24 09:55:44 +00:00
Run(taskModel models.Task) (string, error)
2017-03-10 09:24:06 +00:00
}
2017-03-23 05:31:16 +00:00
// HTTP任务
2017-03-10 09:24:06 +00:00
type HTTPHandler struct {}
2017-03-24 09:55:44 +00:00
func(h *HTTPHandler) Run(taskModel models.Task) (result string, err error) {
2017-03-10 09:24:06 +00:00
client := &http.Client{}
if (taskModel.Timeout > 0) {
client.Timeout = time.Duration(taskModel.Timeout) * time.Second
}
req, err := http.NewRequest("POST", taskModel.Command, nil)
if err != nil {
utils.RecordLog("创建HTTP请求错误-", err.Error())
return
}
req.Header.Set("Content-type", "application/x-www-form-urlencoded")
req.Header.Set("User-Agent", "golang-cron/scheduler")
resp, err := client.Do(req)
2017-03-24 09:55:44 +00:00
defer func() {
if resp != nil {
resp.Body.Close()
}
}()
2017-03-10 09:24:06 +00:00
if err != nil {
utils.RecordLog("HTTP请求错误-", err.Error())
return
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
utils.RecordLog("读取HTTP请求返回值失败-", err.Error())
}
2017-03-24 09:55:44 +00:00
return string(body),err
2017-03-10 09:24:06 +00:00
}
2017-03-23 05:31:16 +00:00
// SSH任务
2017-03-10 09:24:06 +00:00
type SSHHandler struct {}
2017-03-24 09:55:44 +00:00
func(ssh *SSHHandler) Run(taskModel models.Task) (string, error) {
var args []string = []string{
"-m", "shell",
"-a", taskModel.Command,
}
if (taskModel.Timeout > 0) {
// -B 异步执行超时时间, -P 轮询时间
args = append(args, "-B", strconv.Itoa(taskModel.Timeout), "-P", "10")
}
result, err := ansible.ExecCommand(taskModel.SshHosts, ansible.DefaultHosts.GetFilename(), args...)
2017-03-14 06:31:46 +00:00
2017-03-24 09:55:44 +00:00
return result, err
2017-03-23 05:31:16 +00:00
}
2017-03-24 09:55:44 +00:00
func createTaskLog(taskId int) (int, error) {
2017-03-24 05:06:53 +00:00
taskLogModel := new(models.TaskLog)
2017-03-24 09:55:44 +00:00
taskLogModel.TaskId = taskId
2017-03-24 05:06:53 +00:00
taskLogModel.StartTime = time.Now()
taskLogModel.Status = models.Running
insertId, err := taskLogModel.Create()
2017-03-23 05:31:16 +00:00
2017-03-24 05:06:53 +00:00
return insertId, err
}
2017-03-24 09:55:44 +00:00
func updateTaskLog(taskLogId int, result string, err error) (int64, error) {
fmt.Println(taskLogId)
2017-03-24 05:06:53 +00:00
taskLogModel := new(models.TaskLog)
2017-03-24 09:55:44 +00:00
var status models.Status
if err != nil {
result = err.Error() + " " + result
status = models.Failure
} else {
status = models.Finish
}
return taskLogModel.Update(taskLogId, models.CommonMap{
"status": status,
"result": result,
});
2017-03-23 05:31:16 +00:00
}
2017-03-24 05:06:53 +00:00
func createHandlerJob(taskModel models.Task) cron.FuncJob {
2017-03-24 09:55:44 +00:00
var handler Handler = nil
2017-03-24 05:06:53 +00:00
switch taskModel.Protocol {
case models.HTTP:
2017-03-24 09:55:44 +00:00
handler = new(HTTPHandler)
2017-03-24 05:06:53 +00:00
case models.SSH:
2017-03-24 09:55:44 +00:00
handler = new(SSHHandler)
}
taskFunc := func() {
taskLogId, err := createTaskLog(taskModel.Id)
if err != nil {
utils.RecordLog("写入任务日志失败-", err)
return
}
// err != nil 执行失败
result, err := handler.Run(taskModel)
_, err = updateTaskLog(int(taskLogId), result, err)
if err != nil {
utils.RecordLog("更新任务日志失败-", err)
}
2017-03-24 05:06:53 +00:00
}
return taskFunc
2017-03-24 09:55:44 +00:00
}