From 7e12c60415c2648910cfa1bf00d63f4a90c01436 Mon Sep 17 00:00:00 2001 From: miraclesu Date: Tue, 7 Mar 2017 17:07:19 +0800 Subject: [PATCH] =?UTF-8?q?proc:=20=E6=89=A7=E8=A1=8C=E4=B8=AD=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E4=BF=A1=E6=81=AF=E8=BF=87=E6=9C=9F=E7=AD=96=E7=95=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- models/job.go | 2 +- models/proc.go | 50 ++++++++++++++++++++++++++++++++++++++++---------- 2 files changed, 41 insertions(+), 11 deletions(-) diff --git a/models/job.go b/models/job.go index be5dc84..484a9d3 100644 --- a/models/job.go +++ b/models/job.go @@ -206,7 +206,7 @@ func (j *Job) Run() { NodeID: j.runOn, Time: t, } - go p.Start() + p.Start() if err := cmd.Wait(); err != nil { p.Stop() diff --git a/models/proc.go b/models/proc.go index a7b3a53..7728a8c 100644 --- a/models/proc.go +++ b/models/proc.go @@ -6,9 +6,28 @@ import ( client "github.com/coreos/etcd/clientv3" + "sunteng/commons/log" "sunteng/cronsun/conf" ) +var ( + leaseID client.LeaseID +) + +func ProcessKeepAlive() error { + if conf.Config.ProcTtl == 0 { + return nil + } + + resp, err := DefalutClient.Grant(context.TODO(), conf.Config.ProcTtl+5) + if err != nil { + return err + } + + leaseID = resp.ID + return nil +} + // 当前执行中的任务信息 // key: /cronsun/proc/node/job id/pid // value: 开始执行时间 @@ -18,6 +37,8 @@ type Process struct { JobID string `json:"job_id"` NodeID string `json:"node_id"` Time time.Time `json:"name"` // 开始执行时间 + + running bool } func (p *Process) Key() string { @@ -39,24 +60,33 @@ func (p *Process) Count() (int64, error) { } func (p *Process) Put() error { - if conf.Config.ProcTtl == 0 { + if leaseID == 0 { _, err := DefalutClient.Put(p.Key(), p.Val()) return err } - resp, err := DefalutClient.Grant(context.TODO(), conf.Config.ProcTtl) - if err != nil { - return err - } - - _, err = DefalutClient.Put(p.Key(), p.Val(), client.WithLease(resp.ID)) + _, err := DefalutClient.Put(p.Key(), p.Val(), client.WithLease(leaseID)) return err } -func (p *Process) Start() error { - return nil +func (p *Process) Del() error { + _, err := DefalutClient.Delete(p.Key()) + return err +} + +func (p *Process) Start() { + if err := p.Put(); err != nil { + log.Warnf("proc put err: %s", err.Error()) + return + } + + p.running = true } func (p *Process) Stop() error { - return nil + if !p.running { + return nil + } + + return p.Del() }