diff --git a/models/proc.go b/models/proc.go index 6bc22f9..87f347d 100644 --- a/models/proc.go +++ b/models/proc.go @@ -174,28 +174,31 @@ func (j *Job) CountRunning() (int64, error) { return resp.Count, nil } -func (p *Process) put() error { +func (p *Process) put() (err error) { if atomic.LoadInt32(&p.running) != 1 { - return nil + return } if !atomic.CompareAndSwapInt32(&p.hasPut, 0, 1) { - return nil + return } + defer func() { + // 如果是超时,值可能已经写入 etcd + if err != nil && !strings.Contains(err.Error(), "deadline") { + atomic.SwapInt32(&p.hasPut, 0) + } + }() + id := lID.get() if id < 0 { - if _, err := DefalutClient.Put(p.Key(), p.Val()); err != nil { - atomic.SwapInt32(&p.hasPut, 0) - return err + if _, err = DefalutClient.Put(p.Key(), p.Val()); err != nil { + return } } - _, err := DefalutClient.Put(p.Key(), p.Val(), client.WithLease(id)) - if err != nil { - atomic.SwapInt32(&p.hasPut, 0) - } - return err + _, err = DefalutClient.Put(p.Key(), p.Val(), client.WithLease(id)) + return } func (p *Process) del() error { @@ -218,7 +221,7 @@ func (p *Process) Start() { if conf.Config.ProcReq == 0 { if err := p.put(); err != nil { - log.Warnf("proc put err: %s", err.Error()) + log.Warnf("proc put[%s] err: %s", p.Key(), err.Error()) } return } @@ -230,7 +233,7 @@ func (p *Process) Start() { case <-p.done: case <-time.After(time.Duration(conf.Config.ProcReq) * time.Second): if err := p.put(); err != nil { - log.Warnf("proc put err: %s", err.Error()) + log.Warnf("proc put[%s] err: %s", p.Key(), err.Error()) } } p.wg.Done()