From fa8a6451ee5cca0e2be78aaa216d16e714073078 Mon Sep 17 00:00:00 2001 From: miraclesu Date: Tue, 21 Mar 2017 17:24:49 +0800 Subject: [PATCH] =?UTF-8?q?job:=20=E4=B8=80=E4=B8=AA=E6=97=B6=E9=97=B4?= =?UTF-8?q?=E9=97=B4=E9=9A=94=E5=86=85=E5=8F=AA=E5=85=81=E8=AE=B8=E8=BF=90?= =?UTF-8?q?=E8=A1=8C=E4=B8=80=E6=AC=A1=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 增加此类型支持 --- conf/files/base.json.sample | 4 +- models/job.go | 225 +++++++++++++++++++----------------- 2 files changed, 120 insertions(+), 109 deletions(-) diff --git a/conf/files/base.json.sample b/conf/files/base.json.sample index 69cf022..ab4e4df 100644 --- a/conf/files/base.json.sample +++ b/conf/files/base.json.sample @@ -14,8 +14,8 @@ "ProcTtl": 600, "#ProcReq": "记录任务执行中的信息的执行时间阀值,单位秒,0 为不限制", "ProcReq": 5, - "#LockTtl": "单机任务锁过期时间,单位秒,默认 300", - "LockTtl": 300, + "#LockTtl": "任务锁最大过期时间,单位秒,默认 600", + "LockTtl": 600, "Log": "@extend:log.json", "Etcd": "@extend:etcd.json", "Mgo": "@extend:db.json", diff --git a/models/job.go b/models/job.go index 1a7b4bf..9a624d0 100644 --- a/models/job.go +++ b/models/job.go @@ -28,8 +28,9 @@ const ( ) const ( - KindCommon = iota - KindAlone // 单机执行 + KindCommon = iota + KindAlone // 任何时间段只允许单机执行 + KindInterval // 一个任务执行间隔内允许执行一次 ) // 需要执行的 cron cmd 命令 @@ -56,7 +57,6 @@ type Job struct { // 0: 普通任务 // 1: 单机任务 // 如果为单机任务,node 加载任务的时候 Parallels 设置 1 - // 只支持 1 个 JobRule Kind int `json:"kind"` // 平均执行时间,单位 ms AvgTime int64 `json:"avg_time"` @@ -79,8 +79,9 @@ type JobRule struct { Schedule cron.Schedule `json:"-"` } -// 单机任务锁 +// 任务锁 type locker struct { + kind int ttl int64 lID client.LeaseID timer *time.Timer @@ -106,9 +107,12 @@ func (l *locker) keepAlive() { } func (l *locker) unlock() { + if l.kind != KindAlone { + return + } + close(l.done) l.timer.Stop() - l.done = make(chan struct{}) if _, err := DefalutClient.KeepAliveOnce(l.lID); err != nil { log.Warnf("unlock keep alive err: %s", err.Error()) } @@ -124,17 +128,20 @@ func (c *Cmd) GetID() string { } func (c *Cmd) Run() { - // 单机任务只有第一个 rule 生效 - if c.Job.Kind == KindAlone && c.Job.Rules[0].ID != c.JobRule.ID { - return - } - // 同时执行任务数限制 if c.Job.limit() { return } defer c.Job.unlimit() + if c.Job.Kind != KindCommon { + lk := c.lock() + if lk == nil { + return + } + defer lk.unlock() + } + if c.Job.Retry <= 0 { c.Job.Run() return @@ -151,6 +158,107 @@ func (c *Cmd) Run() { } } +func (j *Job) limit() bool { + if j.Parallels == 0 { + return false + } + + count := atomic.LoadInt64(&j.count) + if j.Parallels <= count { + j.Fail(time.Now(), fmt.Sprintf("job[%s] running on[%s] running:[%d]", j.Key(), j.runOn, count)) + return true + } + + atomic.AddInt64(&j.count, 1) + return false +} + +func (j *Job) unlimit() { + if j.Parallels == 0 { + return + } + atomic.AddInt64(&j.count, -1) +} + +func (c *Cmd) lockTtl() int64 { + now := time.Now() + prev := c.JobRule.Schedule.Next(now) + ttl := int64(c.JobRule.Schedule.Next(prev).Sub(prev) / time.Second) + if ttl == 0 { + return 0 + } + + if c.Job.Kind == KindInterval { + ttl -= 2 + if ttl > conf.Config.LockTtl { + ttl = conf.Config.LockTtl + } + if ttl < 1 { + ttl = 1 + } + return ttl + } + + cost := c.Job.AvgTime / 1e3 + if c.Job.AvgTime/1e3-cost*1e3 > 0 { + cost += 1 + } + // 如果执行间隔时间不大于执行时间,把过期时间设置为执行时间的下限-1 + // 以便下次执行的时候,能获取到 lock + if ttl >= cost { + ttl -= cost + } + + if ttl > conf.Config.LockTtl { + ttl = conf.Config.LockTtl + } + + // 支持的最小时间间隔 2s + if ttl < 2 { + ttl = 2 + } + + return ttl +} + +func (c *Cmd) newLock() *locker { + return &locker{ + kind: c.Job.Kind, + ttl: c.lockTtl(), + done: make(chan struct{}), + } +} + +func (c *Cmd) lock() *locker { + lk := c.newLock() + // 非法的 rule + if lk.ttl == 0 { + return nil + } + + resp, err := DefalutClient.Grant(lk.ttl) + if err != nil { + log.Noticef("job[%s] didn't get a lock, err: %s", c.Job.Key(), err.Error()) + return nil + } + + ok, err := DefalutClient.GetLock(c.Job.ID, resp.ID) + if err != nil { + log.Noticef("job[%s] didn't get a lock, err: %s", c.Job.Key(), err.Error()) + return nil + } + + if !ok { + return nil + } + + lk.lID = resp.ID + if lk.kind == KindAlone { + go lk.keepAlive() + } + return lk +} + // 优先取结点里的值,更新 group 时可用 gid 判断是否对 job 进行处理 func (j *JobRule) included(nid string, gs map[string]*Group) bool { for i, count := 0, len(j.NodeIDs); i < count; i++ { @@ -285,95 +393,6 @@ func (j *Job) String() string { return string(data) } -func (j *Job) limit() bool { - if j.Parallels == 0 { - return false - } - - count := atomic.LoadInt64(&j.count) - if j.Parallels <= count { - j.Fail(time.Now(), fmt.Sprintf("job[%s] running on[%s] running:[%d]", j.Key(), j.runOn, count)) - return true - } - - atomic.AddInt64(&j.count, 1) - return false -} - -func (j *Job) unlimit() { - if j.Parallels == 0 { - return - } - atomic.AddInt64(&j.count, -1) -} - -func (j *Job) lockTtl() int64 { - if len(j.Rules) == 0 { - return 0 - } - - now := time.Now() - prev := j.Rules[0].Schedule.Next(now) - ttl := int64(j.Rules[0].Schedule.Next(prev).Sub(prev) / time.Second) - - cost := j.AvgTime / 1e3 - if j.AvgTime/1e3-cost*1e3 > 0 { - cost += 1 - } - // 如果执行间隔时间不大于执行时间,把过期时间设置为执行时间的下限-1 - // 以便下次执行的时候,能获取到 lock - if ttl >= cost { - ttl -= cost - } - - if ttl > conf.Config.LockTtl { - ttl = conf.Config.LockTtl - } - - // 支持的最小时间间隔 2s - if ttl < 2 { - ttl = 2 - } - println("======ttll", ttl) - - return ttl -} - -func (j *Job) newLock() *locker { - return &locker{ - ttl: j.lockTtl(), - done: make(chan struct{}), - } -} - -func (j *Job) lock() *locker { - lk := j.newLock() - // 非法的 rule - if lk.ttl == 0 { - return nil - } - - resp, err := DefalutClient.Grant(lk.ttl) - if err != nil { - log.Noticef("job[%s] didn't get a lock, err: %s", j.Key(), err.Error()) - return nil - } - - ok, err := DefalutClient.GetLock(j.ID, resp.ID) - if err != nil { - log.Noticef("job[%s] didn't get a lock, err: %s", j.Key(), err.Error()) - return nil - } - - if !ok { - return nil - } - - lk.lID = resp.ID - go lk.keepAlive() - return lk -} - // Run 执行任务 func (j *Job) Run() bool { var ( @@ -405,14 +424,6 @@ func (j *Job) Run() bool { } } - if j.Kind == KindAlone { - lk := j.lock() - if lk == nil { - return false - } - defer lk.unlock() - } - // 超时控制 if j.Timeout > 0 { ctx, cancel := context.WithTimeout(context.Background(), time.Duration(j.Timeout)*time.Second)