node: 修复 watch bugs

context Background
OpOption WithPrevKV
pull/1/head
miraclesu 2017-02-11 17:59:19 +08:00
parent 4fe4808413
commit 952d4ee3d5
4 changed files with 26 additions and 9 deletions

View File

@ -100,9 +100,7 @@ func (c *Client) Delete(key string, opts ...client.OpOption) (*client.DeleteResp
}
func (c *Client) Watch(key string, opts ...client.OpOption) client.WatchChan {
ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout)
defer cancel()
return c.Client.Watch(ctx, key, opts...)
return c.Client.Watch(context.Background(), key, opts...)
}
func IsValidAsKeyPath(s string) bool {

View File

@ -62,7 +62,7 @@ func GetGroups(nid string) (groups map[string]*Group, err error) {
}
func WatchGroups() client.WatchChan {
return DefalutClient.Watch(conf.Config.Group, client.WithPrefix())
return DefalutClient.Watch(conf.Config.Group, client.WithPrefix(), client.WithPrevKV())
}
func GetGroupFromKv(kv *mvccpb.KeyValue) (g *Group, err error) {

View File

@ -3,6 +3,7 @@ package models
import (
"encoding/json"
"fmt"
"os/exec"
"strings"
client "github.com/coreos/etcd/clientv3"
@ -106,7 +107,7 @@ func GetJobs() (jobs map[string]*Job, err error) {
}
func WatchJobs() client.WatchChan {
return DefalutClient.Watch(conf.Config.Cmd, client.WithPrefix())
return DefalutClient.Watch(conf.Config.Cmd, client.WithPrefix(), client.WithPrevKV())
}
func GetJobFromKv(kv *mvccpb.KeyValue) (job *Job, err error) {
@ -151,7 +152,22 @@ func (j *Job) GetID() string {
return j.ID
}
func (j *Job) String() string {
data, err := json.Marshal(j)
if err != nil {
return err.Error()
}
return string(data)
}
func (j *Job) Run() {
cmd := strings.Split(j.Command, " ")
out, err := exec.Command(cmd[0], cmd[1:]...).Output()
if err != nil {
return
}
fmt.Printf("%s\n", out)
}
func JobKey(group, id string) string {

View File

@ -163,6 +163,7 @@ func (n *Node) delJob(job *models.Job) {
n.delLink(gid, job.GetID())
delete(n.jobs, job.GetID())
n.Cron.DelJob(job)
log.Noticef("job[%s] has deleted", job)
}
func (n *Node) addGroup(g *models.Group) bool {
@ -201,7 +202,9 @@ func (n *Node) watchJobs() {
continue
}
n.addJob(job)
if n.addJob(job) {
log.Noticef("job[%s] has added", job)
}
case ev.IsModify():
job, err := models.GetJobFromKv(ev.Kv)
if err != nil {
@ -215,6 +218,7 @@ func (n *Node) watchJobs() {
}
if n.addJob(job) {
log.Noticef("job[%s] has added", job)
continue
}
@ -235,9 +239,8 @@ func (n *Node) watchJobs() {
}
}
// TODO
func (n *Node) watchGroups() {
rch := models.WatchJobs()
rch := models.WatchGroups()
for wresp := range rch {
for _, ev := range wresp.Events {
switch {
@ -298,7 +301,7 @@ func (n *Node) Run() (err error) {
n.Cron.Start()
go n.watchJobs()
go n.watchGroups()
// go n.watchGroups()
return
}