job: 支持任务同一时间段内只在一台机上运行

pull/1/head
miraclesu 8 years ago
parent b76213d699
commit c566765cb9

@ -111,7 +111,7 @@ func (c *Conf) parse() error {
if c.Ttl <= 0 {
c.Ttl = 10
}
if c.LockTtl <= 0 {
if c.LockTtl < 2 {
c.LockTtl = 300
}
log.InitConf(c.Log)

@ -92,12 +92,12 @@ func (c *Client) KeepAliveOnce(id client.LeaseID) (*client.LeaseKeepAliveRespons
return c.Client.KeepAliveOnce(ctx, id)
}
func (c *Client) GetLock(key string) (bool, error) {
func (c *Client) GetLock(key string, id client.LeaseID) (bool, error) {
key = conf.Config.Lock + key
ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout)
resp, err := DefalutClient.Txn(ctx).
If(client.Compare(client.CreateRevision(key), "=", 0)).
Then(client.OpPut(key, "")).
Then(client.OpPut(key, "", client.WithLease(id))).
Commit()
cancel()

@ -79,6 +79,41 @@ type JobRule struct {
Schedule cron.Schedule `json:"-"`
}
// 单机任务锁
type locker struct {
ttl int64
lID client.LeaseID
timer *time.Timer
done chan struct{}
}
func (l *locker) keepAlive() {
duration := time.Duration(l.ttl)*time.Second - 500*time.Millisecond
l.timer = time.NewTimer(duration)
for {
select {
case <-l.done:
return
case <-l.timer.C:
_, err := DefalutClient.KeepAliveOnce(l.lID)
if err != nil {
log.Warnf("lock keep alive err: %s", err.Error())
return
}
l.timer.Reset(duration)
}
}
}
func (l *locker) unlock() {
close(l.done)
l.timer.Stop()
l.done = make(chan struct{})
if _, err := DefalutClient.KeepAliveOnce(l.lID); err != nil {
log.Warnf("unlock keep alive err: %s", err.Error())
}
}
type Cmd struct {
*Job
*JobRule
@ -89,6 +124,11 @@ func (c *Cmd) GetID() string {
}
func (c *Cmd) Run() {
// 单机任务只有第一个 rule 生效
if c.Job.Kind == KindAlone && c.Job.Rules[0].ID != c.JobRule.ID {
return
}
// 同时执行任务数限制
if c.Job.limit() {
return
@ -276,21 +316,62 @@ func (j *Job) lockTtl() int64 {
prev := j.Rules[0].Schedule.Next(now)
ttl := int64(j.Rules[0].Schedule.Next(prev).Sub(prev) / time.Second)
cost := j.AvgTime / 1e3
if j.AvgTime/1e3-cost*1e3 > 0 {
cost += 1
}
// 如果执行间隔时间不大于执行时间,把过期时间设置为执行时间的下限-1
// 以便下次执行的时候,能获取到 lock
if ttl >= cost {
ttl -= cost
}
if ttl > conf.Config.LockTtl {
ttl = conf.Config.LockTtl
}
// 支持的最小时间间隔 2s
if ttl < 2 {
ttl = 2
}
println("======ttll", ttl)
return ttl
}
func (j *Job) lock() bool {
ok, err := DefalutClient.GetLock(j.ID)
if err != nil {
log.Noticef("job[%s] didn't get a lock", j.Key())
func (j *Job) newLock() *locker {
return &locker{
ttl: j.lockTtl(),
done: make(chan struct{}),
}
return ok
}
func (j *Job) unlock() {
if err := DefalutClient.DelLock(j.ID); err != nil {
log.Noticef("job[%s] del a lock err: %s", j.Key(), err.Error())
func (j *Job) lock() *locker {
lk := j.newLock()
// 非法的 rule
if lk.ttl == 0 {
return nil
}
resp, err := DefalutClient.Grant(lk.ttl)
if err != nil {
log.Noticef("job[%s] didn't get a lock, err: %s", j.Key(), err.Error())
return nil
}
ok, err := DefalutClient.GetLock(j.ID, resp.ID)
if err != nil {
log.Noticef("job[%s] didn't get a lock, err: %s", j.Key(), err.Error())
return nil
}
if !ok {
return nil
}
lk.lID = resp.ID
go lk.keepAlive()
return lk
}
// Run 执行任务
@ -325,10 +406,11 @@ func (j *Job) Run() bool {
}
if j.Kind == KindAlone {
if !j.lock() {
return true
lk := j.lock()
if lk == nil {
return false
}
defer j.unlock()
defer lk.unlock()
}
// 超时控制

Loading…
Cancel
Save