proc: 对变量进行原子操作,减少不确定性

pull/1/head
miraclesu 8 years ago
parent e6e96bb9a6
commit b50478afcd

@ -163,6 +163,8 @@ func (j *Job) limit() bool {
return false
}
// 更精确的控制是加锁
// 两次运行时间极为接近的任务才可能出现控制不精确的情况
count := atomic.LoadInt64(&j.count)
if j.Parallels <= count {
j.Fail(time.Now(), fmt.Sprintf("job[%s] running on[%s] running:[%d]", j.Key(), j.runOn, count))

@ -3,6 +3,7 @@ package models
import (
"fmt"
"sync"
"sync/atomic"
"time"
client "github.com/coreos/etcd/clientv3"
@ -132,8 +133,8 @@ type Process struct {
NodeID string `json:"nodeId"`
Time time.Time `json:"time"` // 开始执行时间
running bool
hasPut bool
running int32
hasPut int32
wg sync.WaitGroup
done chan struct{}
}
@ -174,28 +175,31 @@ func (j *Job) CountRunning() (int64, error) {
}
func (p *Process) put() error {
if p.hasPut || !p.running {
if atomic.LoadInt32(&p.running) != 1 {
return nil
}
if !atomic.CompareAndSwapInt32(&p.hasPut, 0, 1) {
return nil
}
id := lID.get()
if id < 0 {
_, err := DefalutClient.Put(p.Key(), p.Val())
if err == nil {
p.hasPut = true
if _, err := DefalutClient.Put(p.Key(), p.Val()); err != nil {
atomic.SwapInt32(&p.hasPut, 0)
return err
}
return err
}
_, err := DefalutClient.Put(p.Key(), p.Val(), client.WithLease(id))
if err == nil {
p.hasPut = true
if err != nil {
atomic.SwapInt32(&p.hasPut, 0)
}
return err
}
func (p *Process) del() error {
if !p.hasPut {
if atomic.LoadInt32(&p.hasPut) != 1 {
return nil
}
@ -204,11 +208,14 @@ func (p *Process) del() error {
}
func (p *Process) Start() {
if p == nil || p.running {
if p == nil {
return
}
if !atomic.CompareAndSwapInt32(&p.running, 0, 1) {
return
}
p.running = true
if conf.Config.ProcReq == 0 {
if err := p.put(); err != nil {
log.Warnf("proc put err: %s", err.Error())
@ -231,17 +238,15 @@ func (p *Process) Start() {
}
func (p *Process) Stop() error {
if p == nil || !p.running {
if p == nil || atomic.LoadInt32(&p.running) != 1 {
return nil
}
p.running = false
if p.done != nil {
close(p.done)
}
p.wg.Wait()
err := p.del()
p.hasPut = false
return err
}

Loading…
Cancel
Save