gocron/service/task.go

259 lines
7.1 KiB
Go
Raw Normal View History

2017-03-10 09:24:06 +00:00
package service
import (
2017-04-07 01:22:00 +00:00
"github.com/ouqiang/gocron/models"
2017-04-02 02:38:49 +00:00
"io/ioutil"
"net/http"
"strconv"
"time"
2017-04-07 01:22:00 +00:00
"github.com/ouqiang/gocron/modules/logger"
"github.com/ouqiang/gocron/modules/ssh"
2017-04-13 09:35:59 +00:00
"github.com/jakecoffman/cron"
"github.com/ouqiang/gocron/modules/utils"
"errors"
"fmt"
2017-03-10 09:24:06 +00:00
)
2017-04-13 09:35:59 +00:00
var Cron *cron.Cron
2017-04-02 02:19:52 +00:00
type Task struct{}
2017-03-10 09:24:06 +00:00
type TaskResult struct {
Result string
Err error
RetryTimes int8
}
2017-03-24 09:55:44 +00:00
// 初始化任务, 从数据库取出所有任务, 添加到定时任务并运行
2017-04-02 02:19:52 +00:00
func (task *Task) Initialize() {
2017-04-13 09:35:59 +00:00
Cron = cron.New()
Cron.Start()
2017-04-02 02:38:49 +00:00
taskModel := new(models.Task)
taskList, err := taskModel.ActiveList()
if err != nil {
2017-04-03 07:27:19 +00:00
logger.Error("获取任务列表错误-", err.Error())
2017-04-02 02:38:49 +00:00
return
}
if len(taskList) == 0 {
2017-04-03 07:27:19 +00:00
logger.Debug("任务列表为空")
2017-04-02 02:38:49 +00:00
return
}
2017-04-20 01:36:42 +00:00
task.BatchAdd(taskList)
}
// 批量添加任务
func (task *Task) BatchAdd(tasks []models.TaskHost) {
for _, item := range tasks {
2017-04-02 02:38:49 +00:00
task.Add(item)
}
2017-03-10 09:24:06 +00:00
}
// 添加任务
func (task *Task) Add(taskModel models.TaskHost) {
taskFunc := createJob(taskModel)
2017-04-02 02:38:49 +00:00
if taskFunc == nil {
logger.Error("创建任务处理Job失败,不支持的任务协议#", taskModel.Protocol)
2017-04-02 02:38:49 +00:00
return
}
2017-04-13 09:35:59 +00:00
cronName := strconv.Itoa(taskModel.Id)
Cron.RemoveJob(cronName)
err := Cron.AddFunc(taskModel.Spec, taskFunc, cronName)
2017-04-14 10:05:34 +00:00
if err != nil {
logger.Error("添加任务到调度器失败#", err)
}
2017-03-10 09:24:06 +00:00
}
2017-04-21 05:36:45 +00:00
// 直接运行任务
func (task *Task) Run(taskModel models.TaskHost) {
go createJob(taskModel)()
2017-04-21 05:36:45 +00:00
}
2017-03-10 09:24:06 +00:00
type Handler interface {
Run(taskModel models.TaskHost) (string, error)
2017-03-10 09:24:06 +00:00
}
// 本地命令
2017-04-16 18:01:41 +00:00
type LocalCommandHandler struct {}
// 运行本地命令
2017-04-16 18:01:41 +00:00
func (h *LocalCommandHandler) Run(taskModel models.TaskHost) (string, error) {
if taskModel.Command == "" {
return "", errors.New("invalid command")
2017-04-16 18:01:41 +00:00
}
2017-04-20 01:36:42 +00:00
if utils.IsWindows() {
return h.runOnWindows(taskModel)
}
return h.runOnUnix(taskModel)
}
// 执行Windows命令
func (h *LocalCommandHandler) runOnWindows(taskModel models.TaskHost) (string, error) {
outputGBK, err := utils.ExecShellWithTimeout(taskModel.Timeout, "cmd", "/C", taskModel.Command)
// windows平台编码为gbk需转换为utf8才能入库
outputUTF8, ok := utils.GBK2UTF8(outputGBK)
if ok {
return outputUTF8, err
}
return "命令输出转换编码失败(gbk to utf8)", err
}
2017-04-20 01:36:42 +00:00
// 执行Unix命令
func (h *LocalCommandHandler) runOnUnix(taskModel models.TaskHost) (string, error) {
return utils.ExecShellWithTimeout(taskModel.Timeout, "/bin/bash", "-c", taskModel.Command)
2017-04-16 18:01:41 +00:00
}
2017-03-23 05:31:16 +00:00
// HTTP任务
2017-04-02 02:19:52 +00:00
type HTTPHandler struct{}
2017-03-10 09:24:06 +00:00
func (h *HTTPHandler) Run(taskModel models.TaskHost) (result string, err error) {
2017-04-02 02:38:49 +00:00
client := &http.Client{}
if taskModel.Timeout > 0 {
client.Timeout = time.Duration(taskModel.Timeout) * time.Second
}
2017-04-20 01:36:42 +00:00
req, err := http.NewRequest("GET", taskModel.Command, nil)
2017-04-02 02:38:49 +00:00
if err != nil {
logger.Error("任务处理#创建HTTP请求错误-", err.Error())
2017-04-02 02:38:49 +00:00
return
}
req.Header.Set("Content-type", "application/x-www-form-urlencoded")
2017-04-07 01:22:00 +00:00
req.Header.Set("User-Agent", "golang/gocron")
2017-04-02 02:38:49 +00:00
resp, err := client.Do(req)
defer func() {
if resp != nil {
resp.Body.Close()
}
}()
if err != nil {
logger.Error("任务处理HTTP请求错误-", err.Error())
2017-04-02 02:38:49 +00:00
return
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
logger.Error("任务处理#读取HTTP请求返回值失败-", err.Error())
return
}
if resp.StatusCode != 200 {
2017-04-21 09:41:59 +00:00
return string(body), errors.New(fmt.Sprintf("HTTP状态码非200-->%d", resp.StatusCode))
2017-04-02 02:38:49 +00:00
}
return string(body), err
2017-03-10 09:24:06 +00:00
}
// SSH-command任务
2017-04-02 02:19:52 +00:00
type SSHCommandHandler struct{}
2017-03-10 09:24:06 +00:00
func (h *SSHCommandHandler) Run(taskModel models.TaskHost) (string, error) {
sshConfig := ssh.SSHConfig{
User: taskModel.Username,
Password: taskModel.Password,
Host: taskModel.Name,
Port: taskModel.Port,
ExecTimeout: taskModel.Timeout,
2017-04-20 01:36:42 +00:00
AuthType: taskModel.AuthType,
PrivateKey: taskModel.PrivateKey,
2017-04-02 02:38:49 +00:00
}
return ssh.Exec(sshConfig, taskModel.Command)
2017-03-23 05:31:16 +00:00
}
// 创建任务日志
2017-04-13 09:35:59 +00:00
func createTaskLog(taskModel models.TaskHost) (int64, error) {
2017-04-02 02:38:49 +00:00
taskLogModel := new(models.TaskLog)
2017-04-14 10:05:34 +00:00
taskLogModel.TaskId = taskModel.Id
taskLogModel.Name = taskModel.Task.Name
2017-04-06 06:40:48 +00:00
taskLogModel.Spec = taskModel.Spec
taskLogModel.Protocol = taskModel.Protocol
taskLogModel.Command = taskModel.Command
taskLogModel.Timeout = taskModel.Timeout
if taskModel.Protocol == models.TaskSSH {
taskLogModel.Hostname = taskModel.Alias + "-" + taskModel.Name
}
2017-04-02 02:38:49 +00:00
taskLogModel.StartTime = time.Now()
taskLogModel.Status = models.Running
insertId, err := taskLogModel.Create()
2017-03-23 05:31:16 +00:00
2017-04-02 02:38:49 +00:00
return insertId, err
2017-03-24 05:06:53 +00:00
}
// 更新任务日志
func updateTaskLog(taskLogId int64, taskResult TaskResult) (int64, error) {
2017-04-02 02:38:49 +00:00
taskLogModel := new(models.TaskLog)
var status models.Status
2017-04-21 09:41:59 +00:00
var result string = taskResult.Result
if taskResult.Err != nil {
result = taskResult.Err.Error() + " " + result
2017-04-02 02:38:49 +00:00
status = models.Failure
} else {
status = models.Finish
}
return taskLogModel.Update(taskLogId, models.CommonMap{
2017-04-21 09:41:59 +00:00
"retry_times": taskResult.RetryTimes,
2017-04-02 02:38:49 +00:00
"status": status,
"result": result,
})
2017-03-23 05:31:16 +00:00
}
func createJob(taskModel models.TaskHost) cron.FuncJob {
var handler Handler = createHandler(taskModel)
if handler == nil {
return nil
}
2017-04-02 02:38:49 +00:00
taskFunc := func() {
2017-04-06 06:40:48 +00:00
taskLogId, err := createTaskLog(taskModel)
2017-04-02 02:38:49 +00:00
if err != nil {
logger.Error("任务开始执行#写入任务日志失败-", err)
2017-04-02 02:38:49 +00:00
return
}
taskResult := execJob(handler, taskModel)
_, err = updateTaskLog(taskLogId, taskResult)
2017-04-02 02:38:49 +00:00
if err != nil {
logger.Error("任务结束#更新任务日志失败-", err)
2017-04-02 02:38:49 +00:00
}
}
return taskFunc
2017-04-02 02:19:52 +00:00
}
func createHandler(taskModel models.TaskHost) Handler {
var handler Handler = nil
switch taskModel.Protocol {
case models.TaskHTTP:
handler = new(HTTPHandler)
case models.TaskSSH:
handler = new(SSHCommandHandler)
case models.TaskLocalCommand:
handler = new(LocalCommandHandler)
}
return handler;
}
func execJob(handler Handler, taskModel models.TaskHost) TaskResult {
// 默认只运行任务一次
var execTimes int8 = 1
if (taskModel.RetryTimes > 0) {
execTimes += taskModel.RetryTimes
}
var i int8 = 0
var output string
var err error
for i < execTimes {
output, err = handler.Run(taskModel)
if err == nil {
return TaskResult{Result: output, Err: err, RetryTimes: i}
}
i++
// 重试规则每次递增1分钟
time.Sleep( time.Duration(i) * 60 * time.Second)
if i < execTimes {
2017-04-21 09:41:59 +00:00
logger.Warnf("任务执行失败#任务id-%d#重试第%d次#输出-%s#错误-%s", taskModel.Id, i, output, err.Error())
}
}
return TaskResult{Result: output, Err: err, RetryTimes: taskModel.RetryTimes}
}