proc: 实现配置更新

pull/1/head
miraclesu 2017-03-08 17:00:22 +08:00
parent 7e12c60415
commit a2c31890ef
4 changed files with 124 additions and 17 deletions

View File

@ -40,6 +40,10 @@ func main() {
return return
} }
if err = models.StartProc(); err != nil {
log.Warnf("[process key will not timeout]proc lease id set err: %s", err.Error())
}
if err = n.Run(); err != nil { if err = n.Run(); err != nil {
log.Error(err.Error()) log.Error(err.Error())
return return
@ -47,7 +51,9 @@ func main() {
log.Noticef("cronsun %s service started, Ctrl+C or send kill sign to exit", n.String()) log.Noticef("cronsun %s service started, Ctrl+C or send kill sign to exit", n.String())
// 注册退出事件 // 注册退出事件
event.On(event.EXIT, n.Stop, conf.Exit) event.On(event.EXIT, n.Stop, conf.Exit, models.Exit)
// 注册监听配置更新事件
event.On(event.WAIT, models.Reload)
// 监听退出信号 // 监听退出信号
event.Wait() event.Wait()
// 处理退出事件 // 处理退出事件

View File

@ -10,6 +10,7 @@ import (
"sunteng/commons/confutil" "sunteng/commons/confutil"
"sunteng/commons/db/imgo" "sunteng/commons/db/imgo"
"sunteng/commons/event"
"sunteng/commons/log" "sunteng/commons/log"
) )
@ -132,6 +133,7 @@ func (c *Conf) watch() error {
case <-timer.C: case <-timer.C:
if update { if update {
c.reload() c.reload()
event.Emit(event.WAIT, nil)
update = false update = false
} }
timer.Reset(duration) timer.Reset(duration)

View File

@ -203,6 +203,7 @@ func (j *Job) Run() {
p := &Process{ p := &Process{
ID: strconv.Itoa(cmd.Process.Pid), ID: strconv.Itoa(cmd.Process.Pid),
JobID: j.ID, JobID: j.ID,
Group: j.Group,
NodeID: j.runOn, NodeID: j.runOn,
Time: t, Time: t,
} }

View File

@ -2,6 +2,7 @@ package models
import ( import (
"context" "context"
"sync"
"time" "time"
client "github.com/coreos/etcd/clientv3" client "github.com/coreos/etcd/clientv3"
@ -11,21 +12,115 @@ import (
) )
var ( var (
leaseID client.LeaseID lID *leaseID
) )
func ProcessKeepAlive() error { // 维持 lease id 服务
if conf.Config.ProcTtl == 0 { func StartProc() error {
lID = &leaseID{
ttl: conf.Config.ProcTtl,
lk: new(sync.RWMutex),
done: make(chan struct{}),
}
if lID.ttl == 0 {
return nil return nil
} }
resp, err := DefalutClient.Grant(context.TODO(), conf.Config.ProcTtl+5) err := lID.set()
if err != nil { go lID.keepAlive()
return err return err
}
func Reload(i interface{}) {
if lID.ttl == conf.Config.ProcTtl {
return
} }
leaseID = resp.ID close(lID.done)
return nil lID.done, lID.ttl = make(chan struct{}), conf.Config.ProcTtl
if conf.Config.ProcTtl == 0 {
return
}
if err := lID.set(); err != nil {
log.Warnf("proc lease id set err: %s", err.Error())
}
go lID.keepAlive()
}
func Exit(i interface{}) {
if lID.done != nil {
close(lID.done)
}
}
type leaseID struct {
ttl int64
ID client.LeaseID
lk *sync.RWMutex
done chan struct{}
}
func (l *leaseID) get() client.LeaseID {
if l.ttl == 0 {
return -1
}
l.lk.RLock()
id := l.ID
l.lk.RUnlock()
return id
}
func (l *leaseID) set() error {
id := client.LeaseID(-1)
resp, err := DefalutClient.Grant(context.TODO(), l.ttl+2)
if err == nil {
id = resp.ID
}
l.lk.Lock()
l.ID = id
l.lk.Unlock()
return err
}
func (l *leaseID) keepAlive() {
duration := time.Duration(l.ttl)
timer := time.NewTimer(duration)
for {
select {
case <-l.done:
return
case <-timer.C:
if l.ttl == 0 {
return
}
id := l.get()
if id < 0 {
if err := l.set(); err != nil {
log.Warnf("proc lease id set err: %s, try to reset after %d seconds...", err.Error(), l.ttl)
}
timer.Reset(duration)
continue
}
_, err := DefalutClient.KeepAliveOnce(context.TODO(), l.ID)
if err == nil {
timer.Reset(duration)
continue
}
log.Warnf("proc lease id keepAlive err: %s, try to reset...", err.Error())
if err = l.set(); err != nil {
log.Warnf("proc lease id set err: %s, try to reset after %d seconds...", err.Error(), l.ttl)
}
timer.Reset(duration)
}
}
} }
// 当前执行中的任务信息 // 当前执行中的任务信息
@ -35,6 +130,7 @@ func ProcessKeepAlive() error {
type Process struct { type Process struct {
ID string `json:"id"` ID string `json:"id"`
JobID string `json:"job_id"` JobID string `json:"job_id"`
Group string `json:"group"`
NodeID string `json:"node_id"` NodeID string `json:"node_id"`
Time time.Time `json:"name"` // 开始执行时间 Time time.Time `json:"name"` // 开始执行时间
@ -42,7 +138,7 @@ type Process struct {
} }
func (p *Process) Key() string { func (p *Process) Key() string {
return conf.Config.Proc + p.NodeID + "/" + p.JobID + "/" + p.ID return conf.Config.Proc + p.NodeID + "/" + p.Group + "/" + p.JobID + "/" + p.ID
} }
func (p *Process) Val() string { func (p *Process) Val() string {
@ -51,7 +147,8 @@ func (p *Process) Val() string {
// 获取结点正在执行任务的数量 // 获取结点正在执行任务的数量
func (p *Process) Count() (int64, error) { func (p *Process) Count() (int64, error) {
resp, err := DefalutClient.Get(conf.Config.Proc + p.NodeID + "/" + p.JobID + "/") key := p.Key()
resp, err := DefalutClient.Get(key[:len(key)-len(p.ID)])
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -59,23 +156,24 @@ func (p *Process) Count() (int64, error) {
return resp.Count, nil return resp.Count, nil
} }
func (p *Process) Put() error { func (p *Process) put() error {
if leaseID == 0 { id := lID.get()
if id < 0 {
_, err := DefalutClient.Put(p.Key(), p.Val()) _, err := DefalutClient.Put(p.Key(), p.Val())
return err return err
} }
_, err := DefalutClient.Put(p.Key(), p.Val(), client.WithLease(leaseID)) _, err := DefalutClient.Put(p.Key(), p.Val(), client.WithLease(id))
return err return err
} }
func (p *Process) Del() error { func (p *Process) del() error {
_, err := DefalutClient.Delete(p.Key()) _, err := DefalutClient.Delete(p.Key())
return err return err
} }
func (p *Process) Start() { func (p *Process) Start() {
if err := p.Put(); err != nil { if err := p.put(); err != nil {
log.Warnf("proc put err: %s", err.Error()) log.Warnf("proc put err: %s", err.Error())
return return
} }
@ -88,5 +186,5 @@ func (p *Process) Stop() error {
return nil return nil
} }
return p.Del() return p.del()
} }