From de9690930f0fc7779106d69cfff7b6b19b5f56c5 Mon Sep 17 00:00:00 2001 From: miraclesu <suchuangji@gmail.com> Date: Tue, 18 Apr 2017 17:12:40 +0800 Subject: [PATCH] =?UTF-8?q?proc:=20=E6=89=A7=E8=A1=8C=20etcd=20=E5=91=BD?= =?UTF-8?q?=E4=BB=A4=E8=B6=85=E6=97=B6=E6=97=B6=EF=BC=8C=E6=9C=89=E5=8F=AF?= =?UTF-8?q?=E8=83=BD=E5=B7=B2=E7=BB=8F=E6=89=A7=E8=A1=8C=E6=88=90=E5=8A=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- models/proc.go | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/models/proc.go b/models/proc.go index 6bc22f9..87f347d 100644 --- a/models/proc.go +++ b/models/proc.go @@ -174,28 +174,31 @@ func (j *Job) CountRunning() (int64, error) { return resp.Count, nil } -func (p *Process) put() error { +func (p *Process) put() (err error) { if atomic.LoadInt32(&p.running) != 1 { - return nil + return } if !atomic.CompareAndSwapInt32(&p.hasPut, 0, 1) { - return nil + return } + defer func() { + // 如果是超时,值可能已经写入 etcd + if err != nil && !strings.Contains(err.Error(), "deadline") { + atomic.SwapInt32(&p.hasPut, 0) + } + }() + id := lID.get() if id < 0 { - if _, err := DefalutClient.Put(p.Key(), p.Val()); err != nil { - atomic.SwapInt32(&p.hasPut, 0) - return err + if _, err = DefalutClient.Put(p.Key(), p.Val()); err != nil { + return } } - _, err := DefalutClient.Put(p.Key(), p.Val(), client.WithLease(id)) - if err != nil { - atomic.SwapInt32(&p.hasPut, 0) - } - return err + _, err = DefalutClient.Put(p.Key(), p.Val(), client.WithLease(id)) + return } func (p *Process) del() error { @@ -218,7 +221,7 @@ func (p *Process) Start() { if conf.Config.ProcReq == 0 { if err := p.put(); err != nil { - log.Warnf("proc put err: %s", err.Error()) + log.Warnf("proc put[%s] err: %s", p.Key(), err.Error()) } return } @@ -230,7 +233,7 @@ func (p *Process) Start() { case <-p.done: case <-time.After(time.Duration(conf.Config.ProcReq) * time.Second): if err := p.put(); err != nil { - log.Warnf("proc put err: %s", err.Error()) + log.Warnf("proc put[%s] err: %s", p.Key(), err.Error()) } } p.wg.Done()