job: 一个时间间隔内只允许运行一次任务

增加此类型支持
pull/1/head
miraclesu 8 years ago
parent c566765cb9
commit fa8a6451ee

@ -14,8 +14,8 @@
"ProcTtl": 600,
"#ProcReq": "记录任务执行中的信息的执行时间阀值单位秒0 为不限制",
"ProcReq": 5,
"#LockTtl": "单机任务锁过期时间,单位秒,默认 300",
"LockTtl": 300,
"#LockTtl": "任务锁最大过期时间,单位秒,默认 600",
"LockTtl": 600,
"Log": "@extend:log.json",
"Etcd": "@extend:etcd.json",
"Mgo": "@extend:db.json",

@ -28,8 +28,9 @@ const (
)
const (
KindCommon = iota
KindAlone // 单机执行
KindCommon = iota
KindAlone // 任何时间段只允许单机执行
KindInterval // 一个任务执行间隔内允许执行一次
)
// 需要执行的 cron cmd 命令
@ -56,7 +57,6 @@ type Job struct {
// 0: 普通任务
// 1: 单机任务
// 如果为单机任务node 加载任务的时候 Parallels 设置 1
// 只支持 1 个 JobRule
Kind int `json:"kind"`
// 平均执行时间,单位 ms
AvgTime int64 `json:"avg_time"`
@ -79,8 +79,9 @@ type JobRule struct {
Schedule cron.Schedule `json:"-"`
}
// 单机任务锁
// 任务锁
type locker struct {
kind int
ttl int64
lID client.LeaseID
timer *time.Timer
@ -106,9 +107,12 @@ func (l *locker) keepAlive() {
}
func (l *locker) unlock() {
if l.kind != KindAlone {
return
}
close(l.done)
l.timer.Stop()
l.done = make(chan struct{})
if _, err := DefalutClient.KeepAliveOnce(l.lID); err != nil {
log.Warnf("unlock keep alive err: %s", err.Error())
}
@ -124,17 +128,20 @@ func (c *Cmd) GetID() string {
}
func (c *Cmd) Run() {
// 单机任务只有第一个 rule 生效
if c.Job.Kind == KindAlone && c.Job.Rules[0].ID != c.JobRule.ID {
return
}
// 同时执行任务数限制
if c.Job.limit() {
return
}
defer c.Job.unlimit()
if c.Job.Kind != KindCommon {
lk := c.lock()
if lk == nil {
return
}
defer lk.unlock()
}
if c.Job.Retry <= 0 {
c.Job.Run()
return
@ -151,6 +158,107 @@ func (c *Cmd) Run() {
}
}
func (j *Job) limit() bool {
if j.Parallels == 0 {
return false
}
count := atomic.LoadInt64(&j.count)
if j.Parallels <= count {
j.Fail(time.Now(), fmt.Sprintf("job[%s] running on[%s] running:[%d]", j.Key(), j.runOn, count))
return true
}
atomic.AddInt64(&j.count, 1)
return false
}
func (j *Job) unlimit() {
if j.Parallels == 0 {
return
}
atomic.AddInt64(&j.count, -1)
}
func (c *Cmd) lockTtl() int64 {
now := time.Now()
prev := c.JobRule.Schedule.Next(now)
ttl := int64(c.JobRule.Schedule.Next(prev).Sub(prev) / time.Second)
if ttl == 0 {
return 0
}
if c.Job.Kind == KindInterval {
ttl -= 2
if ttl > conf.Config.LockTtl {
ttl = conf.Config.LockTtl
}
if ttl < 1 {
ttl = 1
}
return ttl
}
cost := c.Job.AvgTime / 1e3
if c.Job.AvgTime/1e3-cost*1e3 > 0 {
cost += 1
}
// 如果执行间隔时间不大于执行时间,把过期时间设置为执行时间的下限-1
// 以便下次执行的时候,能获取到 lock
if ttl >= cost {
ttl -= cost
}
if ttl > conf.Config.LockTtl {
ttl = conf.Config.LockTtl
}
// 支持的最小时间间隔 2s
if ttl < 2 {
ttl = 2
}
return ttl
}
func (c *Cmd) newLock() *locker {
return &locker{
kind: c.Job.Kind,
ttl: c.lockTtl(),
done: make(chan struct{}),
}
}
func (c *Cmd) lock() *locker {
lk := c.newLock()
// 非法的 rule
if lk.ttl == 0 {
return nil
}
resp, err := DefalutClient.Grant(lk.ttl)
if err != nil {
log.Noticef("job[%s] didn't get a lock, err: %s", c.Job.Key(), err.Error())
return nil
}
ok, err := DefalutClient.GetLock(c.Job.ID, resp.ID)
if err != nil {
log.Noticef("job[%s] didn't get a lock, err: %s", c.Job.Key(), err.Error())
return nil
}
if !ok {
return nil
}
lk.lID = resp.ID
if lk.kind == KindAlone {
go lk.keepAlive()
}
return lk
}
// 优先取结点里的值,更新 group 时可用 gid 判断是否对 job 进行处理
func (j *JobRule) included(nid string, gs map[string]*Group) bool {
for i, count := 0, len(j.NodeIDs); i < count; i++ {
@ -285,95 +393,6 @@ func (j *Job) String() string {
return string(data)
}
func (j *Job) limit() bool {
if j.Parallels == 0 {
return false
}
count := atomic.LoadInt64(&j.count)
if j.Parallels <= count {
j.Fail(time.Now(), fmt.Sprintf("job[%s] running on[%s] running:[%d]", j.Key(), j.runOn, count))
return true
}
atomic.AddInt64(&j.count, 1)
return false
}
func (j *Job) unlimit() {
if j.Parallels == 0 {
return
}
atomic.AddInt64(&j.count, -1)
}
func (j *Job) lockTtl() int64 {
if len(j.Rules) == 0 {
return 0
}
now := time.Now()
prev := j.Rules[0].Schedule.Next(now)
ttl := int64(j.Rules[0].Schedule.Next(prev).Sub(prev) / time.Second)
cost := j.AvgTime / 1e3
if j.AvgTime/1e3-cost*1e3 > 0 {
cost += 1
}
// 如果执行间隔时间不大于执行时间,把过期时间设置为执行时间的下限-1
// 以便下次执行的时候,能获取到 lock
if ttl >= cost {
ttl -= cost
}
if ttl > conf.Config.LockTtl {
ttl = conf.Config.LockTtl
}
// 支持的最小时间间隔 2s
if ttl < 2 {
ttl = 2
}
println("======ttll", ttl)
return ttl
}
func (j *Job) newLock() *locker {
return &locker{
ttl: j.lockTtl(),
done: make(chan struct{}),
}
}
func (j *Job) lock() *locker {
lk := j.newLock()
// 非法的 rule
if lk.ttl == 0 {
return nil
}
resp, err := DefalutClient.Grant(lk.ttl)
if err != nil {
log.Noticef("job[%s] didn't get a lock, err: %s", j.Key(), err.Error())
return nil
}
ok, err := DefalutClient.GetLock(j.ID, resp.ID)
if err != nil {
log.Noticef("job[%s] didn't get a lock, err: %s", j.Key(), err.Error())
return nil
}
if !ok {
return nil
}
lk.lID = resp.ID
go lk.keepAlive()
return lk
}
// Run 执行任务
func (j *Job) Run() bool {
var (
@ -405,14 +424,6 @@ func (j *Job) Run() bool {
}
}
if j.Kind == KindAlone {
lk := j.lock()
if lk == nil {
return false
}
defer lk.unlock()
}
// 超时控制
if j.Timeout > 0 {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(j.Timeout)*time.Second)

Loading…
Cancel
Save