diff --git a/models/job.go b/models/job.go index be5dc84..484a9d3 100644 --- a/models/job.go +++ b/models/job.go @@ -206,7 +206,7 @@ func (j *Job) Run() { NodeID: j.runOn, Time: t, } - go p.Start() + p.Start() if err := cmd.Wait(); err != nil { p.Stop() diff --git a/models/proc.go b/models/proc.go index a7b3a53..7728a8c 100644 --- a/models/proc.go +++ b/models/proc.go @@ -6,9 +6,28 @@ import ( client "github.com/coreos/etcd/clientv3" + "sunteng/commons/log" "sunteng/cronsun/conf" ) +var ( + leaseID client.LeaseID +) + +func ProcessKeepAlive() error { + if conf.Config.ProcTtl == 0 { + return nil + } + + resp, err := DefalutClient.Grant(context.TODO(), conf.Config.ProcTtl+5) + if err != nil { + return err + } + + leaseID = resp.ID + return nil +} + // 当前执行中的任务信息 // key: /cronsun/proc/node/job id/pid // value: 开始执行时间 @@ -18,6 +37,8 @@ type Process struct { JobID string `json:"job_id"` NodeID string `json:"node_id"` Time time.Time `json:"name"` // 开始执行时间 + + running bool } func (p *Process) Key() string { @@ -39,24 +60,33 @@ func (p *Process) Count() (int64, error) { } func (p *Process) Put() error { - if conf.Config.ProcTtl == 0 { + if leaseID == 0 { _, err := DefalutClient.Put(p.Key(), p.Val()) return err } - resp, err := DefalutClient.Grant(context.TODO(), conf.Config.ProcTtl) - if err != nil { - return err - } - - _, err = DefalutClient.Put(p.Key(), p.Val(), client.WithLease(resp.ID)) + _, err := DefalutClient.Put(p.Key(), p.Val(), client.WithLease(leaseID)) return err } -func (p *Process) Start() error { - return nil +func (p *Process) Del() error { + _, err := DefalutClient.Delete(p.Key()) + return err +} + +func (p *Process) Start() { + if err := p.Put(); err != nil { + log.Warnf("proc put err: %s", err.Error()) + return + } + + p.running = true } func (p *Process) Stop() error { - return nil + if !p.running { + return nil + } + + return p.Del() }