node: 优化 keepAlive 策略

pull/1/head
miraclesu 2017-03-14 15:23:53 +08:00
parent 6287def1b1
commit b79b248190
4 changed files with 58 additions and 52 deletions

View File

@ -104,6 +104,9 @@ func (c *Conf) parse() error {
if c.Etcd.DialTimeout > 0 {
c.Etcd.DialTimeout *= time.Second
}
if c.Ttl <= 0 {
c.Ttl = 10
}
log.InitConf(c.Log)
c.Cmd = cleanKeyPrefix(c.Cmd)

View File

@ -258,13 +258,12 @@ func (j *Job) Run() {
Time: t,
}
proc.Start()
defer proc.Stop()
if err := cmd.Wait(); err != nil {
proc.Stop()
j.Fail(t, fmt.Sprintf("%s", err.Error()))
return
}
proc.Stop()
j.Success(t, b.String())
}

View File

@ -102,23 +102,20 @@ func (l *leaseID) keepAlive() {
}
id := l.get()
if id < 0 {
if err := l.set(); err != nil {
log.Warnf("proc lease id set err: %s, try to reset after %d seconds...", err.Error(), l.ttl)
if id > 0 {
_, err := DefalutClient.KeepAliveOnce(context.TODO(), l.ID)
if err == nil {
timer.Reset(duration)
continue
}
timer.Reset(duration)
continue
log.Warnf("proc lease id[%x] keepAlive err: %s, try to reset...", id, err.Error())
}
_, err := DefalutClient.KeepAliveOnce(context.TODO(), l.ID)
if err == nil {
timer.Reset(duration)
continue
}
log.Warnf("proc lease id keepAlive err: %s, try to reset...", err.Error())
if err = l.set(); err != nil {
if err := l.set(); err != nil {
log.Warnf("proc lease id set err: %s, try to reset after %d seconds...", err.Error(), l.ttl)
} else {
log.Noticef("proc set lease id[%x] success", l.get())
}
timer.Reset(duration)
}

View File

@ -31,11 +31,8 @@ type Node struct {
// 删除的 job id用于 group 更新
delIDs map[string]bool
ttl int64
lID client.LeaseID // lease id
lch <-chan *client.LeaseKeepAliveResponse
ttl int64
lID client.LeaseID // lease id
done chan struct{}
}
@ -53,8 +50,12 @@ func NewNode(cfg *conf.Conf) (n *Node, err error) {
},
Cron: cron.New(),
jobs: make(Jobs, 8),
cmds: make(map[string]*models.Cmd),
link: newLink(8),
delIDs: make(map[string]bool, 8),
ttl: cfg.Ttl,
done: make(chan struct{}),
}
@ -72,22 +73,51 @@ func (n *Node) Register() (err error) {
return fmt.Errorf("node[%s] pid[%d] exist", n.Node.ID, pid)
}
resp, err := n.Client.Grant(context.TODO(), n.ttl)
return n.set()
}
func (n *Node) set() error {
resp, err := n.Client.Grant(context.TODO(), n.ttl+2)
if err != nil {
return
return err
}
if _, err = n.Node.Put(client.WithLease(resp.ID)); err != nil {
return
return err
}
ch, err := n.Client.KeepAlive(context.TODO(), resp.ID)
if err != nil {
return
}
n.lID = resp.ID
return nil
}
n.lID, n.lch = resp.ID, ch
return
// 断网掉线重新注册
func (n *Node) keepAlive() {
duration := time.Duration(n.ttl) * time.Second
timer := time.NewTimer(duration)
for {
select {
case <-n.done:
return
case <-timer.C:
if n.lID > 0 {
_, err := n.Client.KeepAliveOnce(context.TODO(), n.lID)
if err == nil {
timer.Reset(duration)
continue
}
log.Warnf("%s lid[%x] keepAlive err: %s, try to reset...", n.String(), n.lID, err.Error())
n.lID = 0
}
if err := n.set(); err != nil {
log.Warnf("%s set lid err: %s, try to reset after %d seconds...", n.String(), err.Error(), n.ttl)
} else {
log.Noticef("%s set lid[%x] success", n.String(), n.lID)
}
timer.Reset(duration)
}
}
}
func (n *Node) loadJobs() (err error) {
@ -100,7 +130,6 @@ func (n *Node) loadJobs() (err error) {
return
}
n.jobs, n.link = make(Jobs, len(jobs)), newLink(len(n.groups))
if len(jobs) == 0 {
return
}
@ -433,28 +462,6 @@ func (n *Node) Run() (err error) {
return
}
// 断网掉线重新注册
func (n *Node) keepAlive() {
for {
for _ = range n.lch {
}
select {
case <-n.done:
return
default:
}
time.Sleep(time.Duration(n.ttl+1) * time.Second)
log.Noticef("%s has dropped, try to reconnect...", n.String())
if err := n.Register(); err != nil {
log.Warn(err.Error())
} else {
log.Noticef("%s reconnected", n.String())
}
}
}
// 停止服务
func (n *Node) Stop(i interface{}) {
n.Node.Down()