job: 任务失败可重试

pull/1/head
miraclesu 2017-03-17 11:20:13 +08:00
parent 8c8d0f12d5
commit 7a9647b8aa
1 changed files with 30 additions and 6 deletions

View File

@ -40,6 +40,12 @@ type Job struct {
// 设置任务在单个节点上可以同时允许多少个
// 针对两次任务执行间隔比任务执行时间要长的任务启用
Parallels int64 `json:"parallels"`
// 执行任务失败重试次数
// 默认为 0不重试
Retry int `json:"retry"`
// 执行任务失败重试时间间隔
// 单位秒,如果不大于 0 则马上重试
Interval int `json:"interval"`
// 平均执行时间,单位 ms
AvgTime int64 `json:"avg_time"`
@ -68,6 +74,23 @@ func (c *Cmd) GetID() string {
return c.Job.ID + c.JobRule.ID
}
func (c *Cmd) Run() {
if c.Job.Retry <= 0 {
c.Job.Run()
return
}
for i := 0; i < c.Job.Retry; i++ {
if c.Job.Run() {
return
}
if c.Job.Interval > 0 {
time.Sleep(time.Duration(c.Job.Interval) * time.Second)
}
}
}
// 优先取结点里的值,更新 group 时可用 gid 判断是否对 job 进行处理
func (j *JobRule) included(nid string, gs map[string]*Group) bool {
for i, count := 0, len(j.NodeIDs); i < count; i++ {
@ -197,10 +220,10 @@ func (j *Job) unlimit() {
}
// Run 执行任务
func (j *Job) Run() {
func (j *Job) Run() bool {
// 同时执行任务数限制
if j.limit() {
return
return true
}
defer j.unlimit()
@ -216,13 +239,13 @@ func (j *Job) Run() {
u, err := user.Lookup(j.User)
if err != nil {
j.Fail(t, err.Error())
return
return false
}
uid, err := strconv.Atoi(u.Uid)
if err != nil {
j.Fail(t, "not support run with user on windows")
return
return false
}
gid, _ := strconv.Atoi(u.Gid)
sysProcAttr = &syscall.SysProcAttr{
@ -247,7 +270,7 @@ func (j *Job) Run() {
cmd.Stderr = &b
if err := cmd.Start(); err != nil {
j.Fail(t, fmt.Sprintf("%s", err.Error()))
return
return false
}
proc = &Process{
@ -262,10 +285,11 @@ func (j *Job) Run() {
if err := cmd.Wait(); err != nil {
j.Fail(t, fmt.Sprintf("%s", err.Error()))
return
return false
}
j.Success(t, b.String())
return true
}
func (j *Job) RunWithRecovery() {