Fixed update job issue

when update job, count is not sync. #17
pull/19/head
miraclesu 2017-06-14 16:20:09 +08:00
parent 93832d66a8
commit f236df65aa
2 changed files with 15 additions and 11 deletions

17
job.go
View File

@ -70,7 +70,7 @@ type Job struct {
// 用于存储分隔后的任务 // 用于存储分隔后的任务
cmd []string cmd []string
// 控制同时执行任务数 // 控制同时执行任务数
count int64 Count *int64 `json:"-"`
} }
type JobRule struct { type JobRule struct {
@ -169,13 +169,13 @@ func (j *Job) limit() bool {
// 更精确的控制是加锁 // 更精确的控制是加锁
// 两次运行时间极为接近的任务才可能出现控制不精确的情况 // 两次运行时间极为接近的任务才可能出现控制不精确的情况
count := atomic.LoadInt64(&j.count) count := atomic.LoadInt64(j.Count)
if j.Parallels <= count { if j.Parallels <= count {
j.Fail(time.Now(), fmt.Sprintf("job[%s] running on[%s] running:[%d]", j.Key(), j.runOn, count)) j.Fail(time.Now(), fmt.Sprintf("job[%s] running on[%s] running:[%d]", j.Key(), j.runOn, count))
return true return true
} }
atomic.AddInt64(&j.count, 1) atomic.AddInt64(j.Count, 1)
return false return false
} }
@ -183,7 +183,12 @@ func (j *Job) unlimit() {
if j.Parallels == 0 { if j.Parallels == 0 {
return return
} }
atomic.AddInt64(&j.count, -1) atomic.AddInt64(j.Count, -1)
}
func (j *Job) Init(n string) {
var c int64
j.Count, j.runOn = &c, n
} }
func (c *Cmd) lockTtl() int64 { func (c *Cmd) lockTtl() int64 {
@ -377,10 +382,6 @@ func GetJobFromKv(kv *mvccpb.KeyValue) (job *Job, err error) {
return return
} }
func (j *Job) RunOn(n string) {
j.runOn = n
}
func (j *Job) alone() { func (j *Job) alone() {
if j.Kind == KindAlone { if j.Kind == KindAlone {
j.Parallels = 1 j.Parallels = 1

View File

@ -133,7 +133,7 @@ func (n *Node) loadJobs() (err error) {
} }
for _, job := range jobs { for _, job := range jobs {
job.RunOn(n.ID) job.Init(n.ID)
n.addJob(job, false) n.addJob(job, false)
} }
@ -189,6 +189,8 @@ func (n *Node) modJob(job *cronsun.Job) {
n.link.delJob(oJob) n.link.delJob(oJob)
prevCmds := oJob.Cmds(n.ID, n.groups) prevCmds := oJob.Cmds(n.ID, n.groups)
job.Count = oJob.Count
*oJob = *job *oJob = *job
cmds := oJob.Cmds(n.ID, n.groups) cmds := oJob.Cmds(n.ID, n.groups)
@ -312,6 +314,7 @@ func (n *Node) groupAddNode(g *cronsun.Group) {
n.link.delGroupJob(g.ID, jid) n.link.delGroupJob(g.ID, jid)
continue continue
} }
job.Init(n.ID)
} }
cmds := job.Cmds(n.ID, n.groups) cmds := job.Cmds(n.ID, n.groups)
@ -367,7 +370,7 @@ func (n *Node) watchJobs() {
continue continue
} }
job.RunOn(n.ID) job.Init(n.ID)
n.addJob(job, true) n.addJob(job, true)
case ev.IsModify(): case ev.IsModify():
job, err := cronsun.GetJobFromKv(ev.Kv) job, err := cronsun.GetJobFromKv(ev.Kv)
@ -376,7 +379,7 @@ func (n *Node) watchJobs() {
continue continue
} }
job.RunOn(n.ID) job.Init(n.ID)
n.modJob(job) n.modJob(job)
case ev.Type == client.EventTypeDelete: case ev.Type == client.EventTypeDelete:
n.delJob(cronsun.GetIDFromKey(string(ev.Kv.Key))) n.delJob(cronsun.GetIDFromKey(string(ev.Kv.Key)))