From 952d4ee3d5e1fbe586b4521ef58180b536c1b906 Mon Sep 17 00:00:00 2001
From: miraclesu <suchuangji@gmail.com>
Date: Sat, 11 Feb 2017 17:59:19 +0800
Subject: [PATCH] =?UTF-8?q?node:=20=E4=BF=AE=E5=A4=8D=20watch=20bugs?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

context Background
OpOption WithPrevKV
---
 models/client.go |  4 +---
 models/group.go  |  2 +-
 models/job.go    | 18 +++++++++++++++++-
 node/node.go     | 11 +++++++----
 4 files changed, 26 insertions(+), 9 deletions(-)

diff --git a/models/client.go b/models/client.go
index 3787461..45c5542 100644
--- a/models/client.go
+++ b/models/client.go
@@ -100,9 +100,7 @@ func (c *Client) Delete(key string, opts ...client.OpOption) (*client.DeleteResp
 }
 
 func (c *Client) Watch(key string, opts ...client.OpOption) client.WatchChan {
-	ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout)
-	defer cancel()
-	return c.Client.Watch(ctx, key, opts...)
+	return c.Client.Watch(context.Background(), key, opts...)
 }
 
 func IsValidAsKeyPath(s string) bool {
diff --git a/models/group.go b/models/group.go
index 03edf7a..528c6db 100644
--- a/models/group.go
+++ b/models/group.go
@@ -62,7 +62,7 @@ func GetGroups(nid string) (groups map[string]*Group, err error) {
 }
 
 func WatchGroups() client.WatchChan {
-	return DefalutClient.Watch(conf.Config.Group, client.WithPrefix())
+	return DefalutClient.Watch(conf.Config.Group, client.WithPrefix(), client.WithPrevKV())
 }
 
 func GetGroupFromKv(kv *mvccpb.KeyValue) (g *Group, err error) {
diff --git a/models/job.go b/models/job.go
index 4b43493..e6ad79a 100644
--- a/models/job.go
+++ b/models/job.go
@@ -3,6 +3,7 @@ package models
 import (
 	"encoding/json"
 	"fmt"
+	"os/exec"
 	"strings"
 
 	client "github.com/coreos/etcd/clientv3"
@@ -106,7 +107,7 @@ func GetJobs() (jobs map[string]*Job, err error) {
 }
 
 func WatchJobs() client.WatchChan {
-	return DefalutClient.Watch(conf.Config.Cmd, client.WithPrefix())
+	return DefalutClient.Watch(conf.Config.Cmd, client.WithPrefix(), client.WithPrevKV())
 }
 
 func GetJobFromKv(kv *mvccpb.KeyValue) (job *Job, err error) {
@@ -151,7 +152,22 @@ func (j *Job) GetID() string {
 	return j.ID
 }
 
+func (j *Job) String() string {
+	data, err := json.Marshal(j)
+	if err != nil {
+		return err.Error()
+	}
+	return string(data)
+}
+
 func (j *Job) Run() {
+	cmd := strings.Split(j.Command, " ")
+	out, err := exec.Command(cmd[0], cmd[1:]...).Output()
+	if err != nil {
+		return
+	}
+
+	fmt.Printf("%s\n", out)
 }
 
 func JobKey(group, id string) string {
diff --git a/node/node.go b/node/node.go
index 8e737ba..e52665c 100644
--- a/node/node.go
+++ b/node/node.go
@@ -163,6 +163,7 @@ func (n *Node) delJob(job *models.Job) {
 	n.delLink(gid, job.GetID())
 	delete(n.jobs, job.GetID())
 	n.Cron.DelJob(job)
+	log.Noticef("job[%s] has deleted", job)
 }
 
 func (n *Node) addGroup(g *models.Group) bool {
@@ -201,7 +202,9 @@ func (n *Node) watchJobs() {
 					continue
 				}
 
-				n.addJob(job)
+				if n.addJob(job) {
+					log.Noticef("job[%s] has added", job)
+				}
 			case ev.IsModify():
 				job, err := models.GetJobFromKv(ev.Kv)
 				if err != nil {
@@ -215,6 +218,7 @@ func (n *Node) watchJobs() {
 				}
 
 				if n.addJob(job) {
+					log.Noticef("job[%s] has added", job)
 					continue
 				}
 
@@ -235,9 +239,8 @@ func (n *Node) watchJobs() {
 	}
 }
 
-// TODO
 func (n *Node) watchGroups() {
-	rch := models.WatchJobs()
+	rch := models.WatchGroups()
 	for wresp := range rch {
 		for _, ev := range wresp.Events {
 			switch {
@@ -298,7 +301,7 @@ func (n *Node) Run() (err error) {
 
 	n.Cron.Start()
 	go n.watchJobs()
-	go n.watchGroups()
+	// go n.watchGroups()
 	return
 }