增加异步任务回调接口

pull/21/merge
ouqiang 2017-05-04 10:47:14 +08:00
parent bc56743a63
commit 90322c1230
12 changed files with 144 additions and 97 deletions

View File

@ -39,7 +39,8 @@
2. `go get -d https://github.com/ouqiang/gocron`
3. 编译 `go build`
4. 启动、访问方式同上
5. 生成压缩包Windows: gocron.zip, 其他平台: gocron.tar.gz ./build.sh -p 平台 -a CPU架构 例 ./build.sh -p darwin -a amd64
5. 生成压缩包Windows: gocron.zip, 其他平台: gocron.tar.gz
> ./build.sh -p 平台 -a CPU架构 例 ./build.sh -p darwin -a amd64
### 启动可选参数

View File

@ -26,6 +26,7 @@ const (
Running Status = 1 // 运行中
Finish Status = 2 // 完成
Cancel Status = 3 // 取消
Background Status = 4 // 后台运行
)
const (

View File

@ -21,7 +21,8 @@ type TaskLog struct {
Hostname string `xorm:"varchar(128) notnull defalut '' "` // SSH主机名逗号分隔
StartTime time.Time `xorm:"datetime created"` // 开始执行时间
EndTime time.Time `xorm:"datetime updated"` // 执行完成(失败)时间
Status Status `xorm:"tinyint notnull default 1"` // 状态 0:执行失败 1:执行中 2:执行完毕 3:任务取消(上次任务未执行完成)
Status Status `xorm:"tinyint notnull default 1"` // 状态 0:执行失败 1:执行中 2:执行完毕 3:任务取消(上次任务未执行完成) 4:异步执行
NotifyId string `xorm:"varchar(32) notnull default '' "` // 回调通知ID
Result string `xorm:"mediumtext notnull defalut '' "` // 执行结果
TotalTime int `xorm:"-"` // 执行总时长
BaseModel `xorm:"-"`
@ -41,6 +42,12 @@ func (taskLog *TaskLog) Update(id int64, data CommonMap) (int64, error) {
return Db.Table(taskLog).ID(id).Update(data)
}
func (taskLog *TaskLog) UpdateStatus(notifyId string, status Status, result string) (int64, error) {
taskLog.Status = status
taskLog.Result = result
return Db.Cols("status,result").Where("notify_id = ?", notifyId).Update(taskLog)
}
func (taskLog *TaskLog) setStatus(id int64, status Status) (int64, error) {
return taskLog.Update(id, CommonMap{"status": status})
}

View File

@ -86,8 +86,14 @@ func Exec(sshConfig SSHConfig, cmd string) (output string, err error) {
return "", err
}
defer session.Close()
if sshConfig.ExecTimeout <= 0 {
// 后台运行
if sshConfig.ExecTimeout < 0 {
go session.CombinedOutput(cmd)
time.Sleep(5 * time.Second)
return "", nil
}
// 不限制超时
if sshConfig.ExecTimeout == 0 {
outputByte, execErr := session.CombinedOutput(cmd)
output = string(outputByte)
err = execErr

View File

@ -87,6 +87,11 @@ func Register(m *macaron.Macaron) {
m.Get("/login-log", loginlog.Index)
})
// API
m.Group("/api/v1", func() {
m.Get("/tasklog/update-status", tasklog.UpdateStatus)
});
// 404错误
m.NotFound(func(ctx *macaron.Context) {
if isGetRequest(ctx) && !isAjaxRequest(ctx) {
@ -162,7 +167,7 @@ func userAuth(m *macaron.Macaron) {
}
uri := ctx.Req.URL.Path
found := false
excludePaths := []string{"/install", "/user/login"}
excludePaths := []string{"/install", "/user/login", "/"}
for _, path := range excludePaths {
if strings.HasPrefix(uri, path) {
found = true

View File

@ -20,7 +20,7 @@ type TaskForm struct {
Spec string `binding:"Required;MaxSize(64)"`
Protocol models.TaskProtocol `binding:"In(1,2,3)"`
Command string `binding:"Required;MaxSize(512)"`
Timeout int `binding:"Range(0,86400)"`
Timeout int `binding:"Range(-1,86400)"`
Multi int8 `binding:"In(1,2)"`
RetryTimes int8
HostId int16

View File

@ -49,6 +49,30 @@ func Clear(ctx *macaron.Context) string {
return json.Success(utils.SuccessContent, nil)
}
// 更新任务状态
func UpdateStatus(ctx *macaron.Context) string {
id := ctx.QueryTrim("id")
status := ctx.QueryInt("status")
result := ctx.QueryTrim("result")
json := utils.JsonResponse{}
if id == "" {
return json.CommonFailure("任务ID不能为空")
}
if status != 1 && status != 2 {
return json.CommonFailure("status值错误")
}
if status == 1 {
status -= 1
}
taskLogModel := new(models.TaskLog)
affectRows, err := taskLogModel.UpdateStatus(id, models.Status(status), result)
if err != nil || affectRows == 0 {
return json.CommonFailure("更新任务状态失败")
}
return json.Success("success", nil)
}
// 解析查询参数
func parseQueryParams(ctx *macaron.Context) (models.CommonMap) {
var params models.CommonMap = models.CommonMap{}

View File

@ -46,6 +46,7 @@ type TaskResult struct {
Result string
Err error
RetryTimes int8
IsAsync bool
}
// 初始化任务, 从数据库取出所有任务, 添加到定时任务并运行
@ -180,7 +181,7 @@ func (h *SSHCommandHandler) Run(taskModel models.TaskHost) (string, error) {
}
// 创建任务日志
func createTaskLog(taskModel models.TaskHost, status models.Status) (int64, error) {
func createTaskLog(taskModel models.TaskHost, status models.Status) (int64, string, error) {
taskLogModel := new(models.TaskLog)
taskLogModel.TaskId = taskModel.Id
taskLogModel.Name = taskModel.Task.Name
@ -193,9 +194,15 @@ func createTaskLog(taskModel models.TaskHost, status models.Status) (int64, erro
}
taskLogModel.StartTime = time.Now()
taskLogModel.Status = status
// SSH执行远程命令后台运行
var notifyId string = ""
if taskModel.Timeout == -1 && taskModel.Protocol == models.TaskSSH {
notifyId = utils.RandString(32);
taskLogModel.NotifyId = notifyId;
}
insertId, err := taskLogModel.Create()
return insertId, err
return insertId, notifyId, err
}
// 更新任务日志
@ -205,6 +212,8 @@ func updateTaskLog(taskLogId int64, taskResult TaskResult) (int64, error) {
var result string = taskResult.Result
if taskResult.Err != nil {
status = models.Failure
} else if taskResult.IsAsync {
status = models.Background
} else {
status = models.Finish
}
@ -222,55 +231,12 @@ func createJob(taskModel models.TaskHost) cron.FuncJob {
return nil
}
taskFunc := func() {
if taskModel.Multi == 0 && runInstance.has(taskModel.Id) {
createTaskLog(taskModel, models.Cancel)
return
}
if taskModel.Multi == 0 {
runInstance.add(taskModel.Id)
defer runInstance.done(taskModel.Id)
}
taskLogId, err := createTaskLog(taskModel, models.Running)
if err != nil {
logger.Error("任务开始执行#写入任务日志失败-", err)
taskLogId := beforeExecJob(&taskModel)
if taskLogId <= 0 {
return
}
taskResult := execJob(handler, taskModel)
if taskResult.Err != nil {
taskResult.Result = taskResult.Err.Error() + "\n" + taskResult.Result
}
_, err = updateTaskLog(taskLogId, taskResult)
if err != nil {
logger.Error("任务结束#更新任务日志失败-", err)
}
var statusName string
var enableNotify bool = true
// 未开启通知
if taskModel.NotifyStatus == 0 {
enableNotify = false;
} else if taskModel.NotifyStatus == 1 && taskResult.Err == nil {
// 执行失败才发送通知
enableNotify = false
}
if taskResult.Err != nil {
statusName = "失败"
} else {
statusName = "成功"
}
if !enableNotify {
return
}
// 发送通知
msg := notify.Message{
"task_type": taskModel.NotifyType,
"task_receiver_id": taskModel.NotifyReceiverId,
"name": taskModel.Task.Name,
"output": taskResult.Result,
"status": statusName,
"taskId": taskModel.Id,
};
notify.Push(msg)
afterExecJob(taskModel, taskResult, taskLogId)
}
return taskFunc
@ -290,7 +256,78 @@ func createHandler(taskModel models.TaskHost) Handler {
return handler;
}
func beforeExecJob(taskModel *models.TaskHost) (taskLogId int64) {
if taskModel.Multi == 0 && runInstance.has(taskModel.Id) {
createTaskLog(*taskModel, models.Cancel)
return
}
if taskModel.Multi == 0 {
runInstance.add(taskModel.Id)
}
taskLogId, notifyId, err := createTaskLog(*taskModel, models.Running)
if err != nil {
logger.Error("任务开始执行#写入任务日志失败-", err)
return
}
// 设置notifyId到环境变量中
if notifyId != "" {
taskModel.Command = fmt.Sprintf("export GOCRON_TASK_ID=%s;%s", notifyId, taskModel.Command)
}
return taskLogId
}
func afterExecJob(taskModel models.TaskHost, taskResult TaskResult, taskLogId int64) {
if taskResult.Err != nil {
taskResult.Result = taskResult.Err.Error() + "\n" + taskResult.Result
}
if taskModel.Protocol == models.TaskSSH && taskModel.Timeout == -1 {
taskResult.IsAsync = true
}
_, err := updateTaskLog(taskLogId, taskResult)
if err != nil {
logger.Error("任务结束#更新任务日志失败-", err)
}
if taskResult.IsAsync {
return
}
sendNotification(taskModel, taskResult)
}
// 发送任务结果通知
func sendNotification(taskModel models.TaskHost, taskResult TaskResult) {
var statusName string
// 未开启通知
if taskModel.NotifyStatus == 0 {
return
}
if taskModel.NotifyStatus == 1 && taskResult.Err == nil {
// 执行失败才发送通知
return
}
if taskResult.Err != nil {
statusName = "失败"
} else {
statusName = "成功"
}
// 发送通知
msg := notify.Message{
"task_type": taskModel.NotifyType,
"task_receiver_id": taskModel.NotifyReceiverId,
"name": taskModel.Task.Name,
"output": taskResult.Result,
"status": statusName,
"taskId": taskModel.Id,
};
notify.Push(msg)
}
// 执行具体任务
func execJob(handler Handler, taskModel models.TaskHost) TaskResult {
if taskModel.Multi == 0 {
defer runInstance.done(taskModel.Id)
}
// 默认只运行任务一次
var execTimes int8 = 1
if (taskModel.RetryTimes > 0) {

View File

@ -65,6 +65,7 @@
{{{if gt .LoginUid 0}}}
<a class="item {{{if eq .Controller "manage"}}}active{{{end}}}" href="/manage/slack/edit"><i class="settings icon"></i></a>
{{{end}}}
<a class="item" href="https://github.com/ouqiang/gocron/wiki" target="_blank"><i class="file text icon"></i></a>
</div>
</div>
</div>

View File

@ -32,7 +32,7 @@
</div>
<div class="field">
<select name="protocol" id="protocol">
<option value="0"></option>
<option value="0"></option>
<option value="3" {{{if eq .Params.Protocol 3}}}selected{{{end}}}></option>
<option value="2" {{{if eq .Params.Protocol 2}}}selected{{{end}}} data-match="host_id" data-validate-type="selectProtocol">SSH</option>
<option value="1" {{{if eq .Params.Protocol 1}}}selected{{{end}}}>HTTP</option>
@ -59,7 +59,7 @@
</h5>
<p>ID <span class="stress">{{{.Id}}}</span></p>
<p>cron {{{.Spec}}}</p>
<p>: {{{if eq .Protocol 1}}} HTTP {{{else if eq .Protocol 2}}} SSH {{{else if eq .Protocol 3}}}{{{end}}}</p>
<p>: {{{if eq .Protocol 1}}} HTTP {{{else if eq .Protocol 2}}} SSH {{{else if eq .Protocol 3}}}{{{end}}}</p>
<p class="sensorStatus">{{{.Command}}}</p>
<p class="sensorStatus">{{{if gt .Timeout 0}}}{{{.Timeout}}}{{{else}}}{{{end}}}</p>
<p>: {{{.RetryTimes}}}</p>

View File

@ -42,6 +42,7 @@
<option value="2" {{{if eq .Params.Status 1}}}selected{{{end}}}></option>
<option value="3" {{{if eq .Params.Status 2}}}selected{{{end}}}></option>
<option value="4" {{{if eq .Params.Status 3}}}selected{{{end}}}></option>
<option value="5" {{{if eq .Params.Status 3}}}selected{{{end}}}></option>
</select>
</div>
<div class="field">
@ -75,7 +76,7 @@
<td>{{{.RetryTimes}}}</td>
<td>{{{.Hostname}}}</td>
<td>
{{{if ne .Status 3}}}
{{{if and (ne .Status 3) (ne .Status 4)}}}
{{{if gt .TotalTime 0}}}{{{.TotalTime}}}{{{else}}}1{{{end}}}<br>
: {{{.StartTime.Format "2006-01-02 15:04:05" }}}<br>
{{{if ne .Status 1}}}
@ -92,6 +93,8 @@
<span style="color:red"></span>
{{{else if eq .Status 3}}}
<span style="color:#4499EE"></span>
{{{else if eq .Status 4}}}
<span style="color:#43A102"></span>
{{{end}}}
</td>
<td>

View File

@ -28,25 +28,6 @@
<label>
<div class="content">
crontab
<div class="ui blue message">
Linux-crontab, <br>
: <br>
<br>
1 * * * * * <br>
*/20 * * * * * 20 <br>
0 30 21 * * * 21:30:00 <br>
0 0 23 * * 6 23:00:00 <br>
: <br>
@yearly <br>
@monthly <br>
@weekly <br>
@daily <br>
@midnight <br>
@hourly <br>
@every 30s 30 <br>
@every 1m20s 120 <br>
@every 3h5m10s 3510 <br>
</div>
</div>
</label>
<div class="ui small input">
@ -56,12 +37,7 @@
</div>
<div class="three fields">
<div class="field">
<label></label>
<div class="ui blue message">
: <br>
SSH: SSH <br>
HTTP: HTTP-GET <br>
</div>
<label></label>
<select name="protocol" id="protocol">
<option value="3" {{{if .Task}}} {{{if eq .Task.Protocol 3}}}selected{{{end}}} {{{end}}}></option>
<option value="2" {{{if .Task}}} {{{if eq .Task.Protocol 2}}}selected{{{end}}} {{{end}}} data-match="host_id" data-validate-type="selectProtocol">SSH</option>
@ -87,32 +63,18 @@
<div class="two fields">
<div class="field">
<label></label>
<div class="ui blue message">
<br>
- ifconfig -a <br>
SSH - netstat -natpu <br>
HTTP - URL : http://golang.org <br>
</div>
<textarea rows="5" name="command">{{{.Task.Command}}}</textarea>
</div>
</div>
<div class="three fields">
<div class="field">
<label>()</label>
<div class="ui blue message">
0
</div>
<input type="text" name="timeout" value="{{{.Task.Timeout}}}">
</div>
</div>
<div class="two fields">
<div class="field">
<label></label>
<div class="ui blue message">
shell0, http200,
* , 123.....
1-10, 0
</div>
<label></label>
<input type="text" name="retry_times" value="{{{.Task.RetryTimes}}}">
</div>
</div>