diff --git a/models/job.go b/models/job.go index 9a624d0..cd4d550 100644 --- a/models/job.go +++ b/models/job.go @@ -163,6 +163,8 @@ func (j *Job) limit() bool { return false } + // 更精确的控制是加锁 + // 两次运行时间极为接近的任务才可能出现控制不精确的情况 count := atomic.LoadInt64(&j.count) if j.Parallels <= count { j.Fail(time.Now(), fmt.Sprintf("job[%s] running on[%s] running:[%d]", j.Key(), j.runOn, count)) diff --git a/models/proc.go b/models/proc.go index 24f21d8..6bc22f9 100644 --- a/models/proc.go +++ b/models/proc.go @@ -3,6 +3,7 @@ package models import ( "fmt" "sync" + "sync/atomic" "time" client "github.com/coreos/etcd/clientv3" @@ -132,8 +133,8 @@ type Process struct { NodeID string `json:"nodeId"` Time time.Time `json:"time"` // 开始执行时间 - running bool - hasPut bool + running int32 + hasPut int32 wg sync.WaitGroup done chan struct{} } @@ -174,28 +175,31 @@ func (j *Job) CountRunning() (int64, error) { } func (p *Process) put() error { - if p.hasPut || !p.running { + if atomic.LoadInt32(&p.running) != 1 { + return nil + } + + if !atomic.CompareAndSwapInt32(&p.hasPut, 0, 1) { return nil } id := lID.get() if id < 0 { - _, err := DefalutClient.Put(p.Key(), p.Val()) - if err == nil { - p.hasPut = true + if _, err := DefalutClient.Put(p.Key(), p.Val()); err != nil { + atomic.SwapInt32(&p.hasPut, 0) + return err } - return err } _, err := DefalutClient.Put(p.Key(), p.Val(), client.WithLease(id)) - if err == nil { - p.hasPut = true + if err != nil { + atomic.SwapInt32(&p.hasPut, 0) } return err } func (p *Process) del() error { - if !p.hasPut { + if atomic.LoadInt32(&p.hasPut) != 1 { return nil } @@ -204,11 +208,14 @@ func (p *Process) del() error { } func (p *Process) Start() { - if p == nil || p.running { + if p == nil { + return + } + + if !atomic.CompareAndSwapInt32(&p.running, 0, 1) { return } - p.running = true if conf.Config.ProcReq == 0 { if err := p.put(); err != nil { log.Warnf("proc put err: %s", err.Error()) @@ -231,17 +238,15 @@ func (p *Process) Start() { } func (p *Process) Stop() error { - if p == nil || !p.running { + if p == nil || atomic.LoadInt32(&p.running) != 1 { return nil } - p.running = false if p.done != nil { close(p.done) } - p.wg.Wait() + err := p.del() - p.hasPut = false return err }