2017-03-10 09:24:06 +00:00
|
|
|
|
package service
|
|
|
|
|
|
|
|
|
|
import (
|
2017-04-07 01:22:00 +00:00
|
|
|
|
"github.com/ouqiang/gocron/models"
|
2017-04-02 02:38:49 +00:00
|
|
|
|
"strconv"
|
|
|
|
|
"time"
|
2017-04-07 01:22:00 +00:00
|
|
|
|
"github.com/ouqiang/gocron/modules/logger"
|
2017-04-10 09:37:16 +00:00
|
|
|
|
"github.com/ouqiang/gocron/modules/ssh"
|
2017-04-13 09:35:59 +00:00
|
|
|
|
"github.com/jakecoffman/cron"
|
2017-04-17 08:13:43 +00:00
|
|
|
|
"github.com/ouqiang/gocron/modules/utils"
|
|
|
|
|
"errors"
|
2017-04-21 06:50:40 +00:00
|
|
|
|
"fmt"
|
2017-04-28 03:54:46 +00:00
|
|
|
|
"github.com/ouqiang/gocron/modules/httpclient"
|
|
|
|
|
"github.com/ouqiang/gocron/modules/notify"
|
2017-03-10 09:24:06 +00:00
|
|
|
|
)
|
|
|
|
|
|
2017-04-13 09:35:59 +00:00
|
|
|
|
var Cron *cron.Cron
|
2017-04-25 17:47:38 +00:00
|
|
|
|
var runInstance Instance
|
|
|
|
|
|
2017-04-28 03:54:46 +00:00
|
|
|
|
// 任务ID作为Key, 不会出现并发写, 不加锁
|
2017-04-25 17:47:38 +00:00
|
|
|
|
type Instance struct {
|
|
|
|
|
Status map[int]bool
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 是否有任务处于运行中
|
|
|
|
|
func (i *Instance) has(key int) bool {
|
|
|
|
|
running, ok := i.Status[key]
|
|
|
|
|
if ok && running {
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (i *Instance) add(key int) {
|
|
|
|
|
i.Status[key] = true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (i *Instance) done(key int) {
|
|
|
|
|
i.Status[key] = false
|
|
|
|
|
}
|
2017-04-13 09:35:59 +00:00
|
|
|
|
|
2017-04-02 02:19:52 +00:00
|
|
|
|
type Task struct{}
|
2017-03-10 09:24:06 +00:00
|
|
|
|
|
2017-04-21 06:50:40 +00:00
|
|
|
|
type TaskResult struct {
|
|
|
|
|
Result string
|
|
|
|
|
Err error
|
|
|
|
|
RetryTimes int8
|
2017-05-04 02:47:14 +00:00
|
|
|
|
IsAsync bool
|
2017-04-21 06:50:40 +00:00
|
|
|
|
}
|
|
|
|
|
|
2017-03-24 09:55:44 +00:00
|
|
|
|
// 初始化任务, 从数据库取出所有任务, 添加到定时任务并运行
|
2017-04-02 02:19:52 +00:00
|
|
|
|
func (task *Task) Initialize() {
|
2017-04-13 09:35:59 +00:00
|
|
|
|
Cron = cron.New()
|
|
|
|
|
Cron.Start()
|
2017-04-25 17:47:38 +00:00
|
|
|
|
runInstance = Instance{make(map[int]bool)}
|
2017-04-02 02:38:49 +00:00
|
|
|
|
taskModel := new(models.Task)
|
|
|
|
|
taskList, err := taskModel.ActiveList()
|
|
|
|
|
if err != nil {
|
2017-05-04 06:02:50 +00:00
|
|
|
|
logger.Error("定时任务初始化#获取任务列表错误-", err.Error())
|
2017-04-02 02:38:49 +00:00
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if len(taskList) == 0 {
|
2017-04-03 07:27:19 +00:00
|
|
|
|
logger.Debug("任务列表为空")
|
2017-04-02 02:38:49 +00:00
|
|
|
|
return
|
|
|
|
|
}
|
2017-04-20 01:36:42 +00:00
|
|
|
|
task.BatchAdd(taskList)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 批量添加任务
|
|
|
|
|
func (task *Task) BatchAdd(tasks []models.TaskHost) {
|
|
|
|
|
for _, item := range tasks {
|
2017-04-02 02:38:49 +00:00
|
|
|
|
task.Add(item)
|
|
|
|
|
}
|
2017-03-10 09:24:06 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 添加任务
|
2017-04-10 09:37:16 +00:00
|
|
|
|
func (task *Task) Add(taskModel models.TaskHost) {
|
2017-04-21 06:50:40 +00:00
|
|
|
|
taskFunc := createJob(taskModel)
|
2017-04-02 02:38:49 +00:00
|
|
|
|
if taskFunc == nil {
|
2017-04-13 13:54:13 +00:00
|
|
|
|
logger.Error("创建任务处理Job失败,不支持的任务协议#", taskModel.Protocol)
|
2017-04-02 02:38:49 +00:00
|
|
|
|
return
|
|
|
|
|
}
|
2017-04-13 09:35:59 +00:00
|
|
|
|
|
|
|
|
|
cronName := strconv.Itoa(taskModel.Id)
|
2017-04-22 15:39:33 +00:00
|
|
|
|
// Cron任务采用数组存储, 删除任务需遍历数组, 并对数组重新赋值, 任务较多时,有性能问题
|
2017-04-13 09:35:59 +00:00
|
|
|
|
Cron.RemoveJob(cronName)
|
2017-04-13 13:54:13 +00:00
|
|
|
|
err := Cron.AddFunc(taskModel.Spec, taskFunc, cronName)
|
2017-04-14 10:05:34 +00:00
|
|
|
|
if err != nil {
|
|
|
|
|
logger.Error("添加任务到调度器失败#", err)
|
|
|
|
|
}
|
2017-03-10 09:24:06 +00:00
|
|
|
|
}
|
|
|
|
|
|
2017-04-21 05:36:45 +00:00
|
|
|
|
// 直接运行任务
|
|
|
|
|
func (task *Task) Run(taskModel models.TaskHost) {
|
2017-04-21 06:50:40 +00:00
|
|
|
|
go createJob(taskModel)()
|
2017-04-21 05:36:45 +00:00
|
|
|
|
}
|
|
|
|
|
|
2017-03-10 09:24:06 +00:00
|
|
|
|
type Handler interface {
|
2017-04-10 09:37:16 +00:00
|
|
|
|
Run(taskModel models.TaskHost) (string, error)
|
2017-03-10 09:24:06 +00:00
|
|
|
|
}
|
|
|
|
|
|
2017-04-17 08:13:43 +00:00
|
|
|
|
// 本地命令
|
2017-04-16 18:01:41 +00:00
|
|
|
|
type LocalCommandHandler struct {}
|
|
|
|
|
|
2017-04-21 06:50:40 +00:00
|
|
|
|
// 运行本地命令
|
2017-04-16 18:01:41 +00:00
|
|
|
|
func (h *LocalCommandHandler) Run(taskModel models.TaskHost) (string, error) {
|
2017-04-17 08:13:43 +00:00
|
|
|
|
if taskModel.Command == "" {
|
|
|
|
|
return "", errors.New("invalid command")
|
2017-04-16 18:01:41 +00:00
|
|
|
|
}
|
2017-04-20 01:36:42 +00:00
|
|
|
|
|
2017-04-20 08:59:03 +00:00
|
|
|
|
if utils.IsWindows() {
|
|
|
|
|
return h.runOnWindows(taskModel)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return h.runOnUnix(taskModel)
|
|
|
|
|
}
|
|
|
|
|
|
2017-04-21 06:50:40 +00:00
|
|
|
|
// 执行Windows命令
|
2017-04-20 08:59:03 +00:00
|
|
|
|
func (h *LocalCommandHandler) runOnWindows(taskModel models.TaskHost) (string, error) {
|
|
|
|
|
outputGBK, err := utils.ExecShellWithTimeout(taskModel.Timeout, "cmd", "/C", taskModel.Command)
|
|
|
|
|
// windows平台编码为gbk,需转换为utf8才能入库
|
|
|
|
|
outputUTF8, ok := utils.GBK2UTF8(outputGBK)
|
|
|
|
|
if ok {
|
|
|
|
|
return outputUTF8, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return "命令输出转换编码失败(gbk to utf8)", err
|
|
|
|
|
}
|
2017-04-20 01:36:42 +00:00
|
|
|
|
|
2017-04-21 06:50:40 +00:00
|
|
|
|
// 执行Unix命令
|
2017-04-20 08:59:03 +00:00
|
|
|
|
func (h *LocalCommandHandler) runOnUnix(taskModel models.TaskHost) (string, error) {
|
|
|
|
|
return utils.ExecShellWithTimeout(taskModel.Timeout, "/bin/bash", "-c", taskModel.Command)
|
2017-04-16 18:01:41 +00:00
|
|
|
|
}
|
|
|
|
|
|
2017-03-23 05:31:16 +00:00
|
|
|
|
// HTTP任务
|
2017-04-02 02:19:52 +00:00
|
|
|
|
type HTTPHandler struct{}
|
2017-03-10 09:24:06 +00:00
|
|
|
|
|
2017-04-10 09:37:16 +00:00
|
|
|
|
func (h *HTTPHandler) Run(taskModel models.TaskHost) (result string, err error) {
|
2017-04-28 03:54:46 +00:00
|
|
|
|
resp := httpclient.Get(taskModel.Command, taskModel.Timeout)
|
2017-04-22 15:39:33 +00:00
|
|
|
|
// 返回状态码非200,均为失败
|
2017-04-21 06:50:40 +00:00
|
|
|
|
if resp.StatusCode != 200 {
|
2017-04-28 03:54:46 +00:00
|
|
|
|
return resp.Body, errors.New(fmt.Sprintf("HTTP状态码非200-->%d", resp.StatusCode))
|
2017-04-02 02:38:49 +00:00
|
|
|
|
}
|
|
|
|
|
|
2017-04-28 03:54:46 +00:00
|
|
|
|
return resp.Body, err
|
2017-03-10 09:24:06 +00:00
|
|
|
|
}
|
|
|
|
|
|
2017-04-01 12:28:30 +00:00
|
|
|
|
// SSH-command任务
|
2017-04-02 02:19:52 +00:00
|
|
|
|
type SSHCommandHandler struct{}
|
2017-03-10 09:24:06 +00:00
|
|
|
|
|
2017-04-10 09:37:16 +00:00
|
|
|
|
func (h *SSHCommandHandler) Run(taskModel models.TaskHost) (string, error) {
|
2017-05-01 05:59:52 +00:00
|
|
|
|
hostModel := new(models.Host)
|
|
|
|
|
err := hostModel.Find(int(taskModel.HostId))
|
|
|
|
|
if err != nil {
|
|
|
|
|
return "", err
|
|
|
|
|
}
|
|
|
|
|
sshConfig := ssh.SSHConfig{}
|
|
|
|
|
sshConfig.User = hostModel.Username
|
|
|
|
|
sshConfig.Host = hostModel.Name
|
|
|
|
|
sshConfig.Port = hostModel.Port
|
2017-05-02 07:26:59 +00:00
|
|
|
|
sshConfig.ExecTimeout = taskModel.Timeout
|
2017-05-01 05:59:52 +00:00
|
|
|
|
sshConfig.AuthType = hostModel.AuthType
|
|
|
|
|
var password string
|
|
|
|
|
var privateKey string
|
|
|
|
|
if hostModel.AuthType == ssh.HostPassword {
|
|
|
|
|
password, err = hostModel.GetPasswordByHost(hostModel.Name)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return "", err
|
|
|
|
|
}
|
|
|
|
|
sshConfig.Password = password
|
|
|
|
|
} else {
|
|
|
|
|
privateKey, err = hostModel.GetPrivateKeyByHost(hostModel.Name)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return "", err
|
|
|
|
|
}
|
|
|
|
|
sshConfig.PrivateKey = privateKey
|
2017-04-02 02:38:49 +00:00
|
|
|
|
}
|
2017-05-01 05:59:52 +00:00
|
|
|
|
|
2017-04-10 09:37:16 +00:00
|
|
|
|
return ssh.Exec(sshConfig, taskModel.Command)
|
2017-03-23 05:31:16 +00:00
|
|
|
|
}
|
|
|
|
|
|
2017-04-21 06:50:40 +00:00
|
|
|
|
// 创建任务日志
|
2017-05-04 02:47:14 +00:00
|
|
|
|
func createTaskLog(taskModel models.TaskHost, status models.Status) (int64, string, error) {
|
2017-04-02 02:38:49 +00:00
|
|
|
|
taskLogModel := new(models.TaskLog)
|
2017-04-14 10:05:34 +00:00
|
|
|
|
taskLogModel.TaskId = taskModel.Id
|
2017-04-10 09:37:16 +00:00
|
|
|
|
taskLogModel.Name = taskModel.Task.Name
|
2017-04-06 06:40:48 +00:00
|
|
|
|
taskLogModel.Spec = taskModel.Spec
|
|
|
|
|
taskLogModel.Protocol = taskModel.Protocol
|
|
|
|
|
taskLogModel.Command = taskModel.Command
|
|
|
|
|
taskLogModel.Timeout = taskModel.Timeout
|
2017-04-20 08:59:03 +00:00
|
|
|
|
if taskModel.Protocol == models.TaskSSH {
|
|
|
|
|
taskLogModel.Hostname = taskModel.Alias + "-" + taskModel.Name
|
|
|
|
|
}
|
2017-04-02 02:38:49 +00:00
|
|
|
|
taskLogModel.StartTime = time.Now()
|
2017-04-25 17:47:38 +00:00
|
|
|
|
taskLogModel.Status = status
|
2017-05-04 02:47:14 +00:00
|
|
|
|
// SSH执行远程命令,后台运行
|
|
|
|
|
var notifyId string = ""
|
|
|
|
|
if taskModel.Timeout == -1 && taskModel.Protocol == models.TaskSSH {
|
|
|
|
|
notifyId = utils.RandString(32);
|
|
|
|
|
taskLogModel.NotifyId = notifyId;
|
|
|
|
|
}
|
2017-04-02 02:38:49 +00:00
|
|
|
|
insertId, err := taskLogModel.Create()
|
2017-03-23 05:31:16 +00:00
|
|
|
|
|
2017-05-04 02:47:14 +00:00
|
|
|
|
return insertId, notifyId, err
|
2017-03-24 05:06:53 +00:00
|
|
|
|
}
|
|
|
|
|
|
2017-04-21 06:50:40 +00:00
|
|
|
|
// 更新任务日志
|
|
|
|
|
func updateTaskLog(taskLogId int64, taskResult TaskResult) (int64, error) {
|
2017-04-02 02:38:49 +00:00
|
|
|
|
taskLogModel := new(models.TaskLog)
|
|
|
|
|
var status models.Status
|
2017-04-21 09:41:59 +00:00
|
|
|
|
var result string = taskResult.Result
|
2017-04-21 06:50:40 +00:00
|
|
|
|
if taskResult.Err != nil {
|
2017-04-02 02:38:49 +00:00
|
|
|
|
status = models.Failure
|
2017-05-04 02:47:14 +00:00
|
|
|
|
} else if taskResult.IsAsync {
|
|
|
|
|
status = models.Background
|
2017-04-02 02:38:49 +00:00
|
|
|
|
} else {
|
|
|
|
|
status = models.Finish
|
|
|
|
|
}
|
|
|
|
|
return taskLogModel.Update(taskLogId, models.CommonMap{
|
2017-04-21 09:41:59 +00:00
|
|
|
|
"retry_times": taskResult.RetryTimes,
|
2017-04-02 02:38:49 +00:00
|
|
|
|
"status": status,
|
|
|
|
|
"result": result,
|
|
|
|
|
})
|
2017-03-23 05:31:16 +00:00
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
2017-04-21 06:50:40 +00:00
|
|
|
|
func createJob(taskModel models.TaskHost) cron.FuncJob {
|
|
|
|
|
var handler Handler = createHandler(taskModel)
|
2017-04-04 09:22:48 +00:00
|
|
|
|
if handler == nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
2017-04-02 02:38:49 +00:00
|
|
|
|
taskFunc := func() {
|
2017-05-04 02:47:14 +00:00
|
|
|
|
taskLogId := beforeExecJob(&taskModel)
|
|
|
|
|
if taskLogId <= 0 {
|
2017-04-02 02:38:49 +00:00
|
|
|
|
return
|
|
|
|
|
}
|
2017-04-21 06:50:40 +00:00
|
|
|
|
taskResult := execJob(handler, taskModel)
|
2017-05-04 02:47:14 +00:00
|
|
|
|
afterExecJob(taskModel, taskResult, taskLogId)
|
2017-04-02 02:38:49 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return taskFunc
|
2017-04-02 02:19:52 +00:00
|
|
|
|
}
|
2017-04-21 06:50:40 +00:00
|
|
|
|
|
|
|
|
|
func createHandler(taskModel models.TaskHost) Handler {
|
|
|
|
|
var handler Handler = nil
|
|
|
|
|
switch taskModel.Protocol {
|
|
|
|
|
case models.TaskHTTP:
|
|
|
|
|
handler = new(HTTPHandler)
|
|
|
|
|
case models.TaskSSH:
|
|
|
|
|
handler = new(SSHCommandHandler)
|
|
|
|
|
case models.TaskLocalCommand:
|
|
|
|
|
handler = new(LocalCommandHandler)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return handler;
|
|
|
|
|
}
|
|
|
|
|
|
2017-05-04 02:47:14 +00:00
|
|
|
|
func beforeExecJob(taskModel *models.TaskHost) (taskLogId int64) {
|
|
|
|
|
if taskModel.Multi == 0 && runInstance.has(taskModel.Id) {
|
|
|
|
|
createTaskLog(*taskModel, models.Cancel)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if taskModel.Multi == 0 {
|
|
|
|
|
runInstance.add(taskModel.Id)
|
|
|
|
|
}
|
|
|
|
|
taskLogId, notifyId, err := createTaskLog(*taskModel, models.Running)
|
|
|
|
|
if err != nil {
|
|
|
|
|
logger.Error("任务开始执行#写入任务日志失败-", err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
// 设置notifyId到环境变量中
|
|
|
|
|
if notifyId != "" {
|
|
|
|
|
taskModel.Command = fmt.Sprintf("export GOCRON_TASK_ID=%s;%s", notifyId, taskModel.Command)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return taskLogId
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func afterExecJob(taskModel models.TaskHost, taskResult TaskResult, taskLogId int64) {
|
|
|
|
|
if taskResult.Err != nil {
|
|
|
|
|
taskResult.Result = taskResult.Err.Error() + "\n" + taskResult.Result
|
|
|
|
|
}
|
|
|
|
|
if taskModel.Protocol == models.TaskSSH && taskModel.Timeout == -1 {
|
|
|
|
|
taskResult.IsAsync = true
|
|
|
|
|
}
|
|
|
|
|
_, err := updateTaskLog(taskLogId, taskResult)
|
|
|
|
|
if err != nil {
|
|
|
|
|
logger.Error("任务结束#更新任务日志失败-", err)
|
|
|
|
|
}
|
|
|
|
|
if taskResult.IsAsync {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
sendNotification(taskModel, taskResult)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 发送任务结果通知
|
|
|
|
|
func sendNotification(taskModel models.TaskHost, taskResult TaskResult) {
|
|
|
|
|
var statusName string
|
|
|
|
|
// 未开启通知
|
|
|
|
|
if taskModel.NotifyStatus == 0 {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if taskModel.NotifyStatus == 1 && taskResult.Err == nil {
|
|
|
|
|
// 执行失败才发送通知
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if taskResult.Err != nil {
|
|
|
|
|
statusName = "失败"
|
|
|
|
|
} else {
|
|
|
|
|
statusName = "成功"
|
|
|
|
|
}
|
|
|
|
|
// 发送通知
|
|
|
|
|
msg := notify.Message{
|
|
|
|
|
"task_type": taskModel.NotifyType,
|
|
|
|
|
"task_receiver_id": taskModel.NotifyReceiverId,
|
|
|
|
|
"name": taskModel.Task.Name,
|
|
|
|
|
"output": taskResult.Result,
|
|
|
|
|
"status": statusName,
|
|
|
|
|
"taskId": taskModel.Id,
|
|
|
|
|
};
|
|
|
|
|
notify.Push(msg)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 执行具体任务
|
2017-04-21 06:50:40 +00:00
|
|
|
|
func execJob(handler Handler, taskModel models.TaskHost) TaskResult {
|
2017-05-04 02:47:14 +00:00
|
|
|
|
if taskModel.Multi == 0 {
|
|
|
|
|
defer runInstance.done(taskModel.Id)
|
|
|
|
|
}
|
2017-04-21 06:50:40 +00:00
|
|
|
|
// 默认只运行任务一次
|
|
|
|
|
var execTimes int8 = 1
|
|
|
|
|
if (taskModel.RetryTimes > 0) {
|
|
|
|
|
execTimes += taskModel.RetryTimes
|
|
|
|
|
}
|
|
|
|
|
var i int8 = 0
|
|
|
|
|
var output string
|
|
|
|
|
var err error
|
|
|
|
|
for i < execTimes {
|
|
|
|
|
output, err = handler.Run(taskModel)
|
|
|
|
|
if err == nil {
|
|
|
|
|
return TaskResult{Result: output, Err: err, RetryTimes: i}
|
|
|
|
|
}
|
|
|
|
|
i++
|
|
|
|
|
if i < execTimes {
|
2017-04-21 09:41:59 +00:00
|
|
|
|
logger.Warnf("任务执行失败#任务id-%d#重试第%d次#输出-%s#错误-%s", taskModel.Id, i, output, err.Error())
|
2017-04-22 15:39:33 +00:00
|
|
|
|
// 重试间隔时间,每次递增1分钟
|
|
|
|
|
time.Sleep( time.Duration(i) * time.Minute)
|
2017-04-21 06:50:40 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return TaskResult{Result: output, Err: err, RetryTimes: taskModel.RetryTimes}
|
|
|
|
|
}
|