From 597054156201a540c8df55deda1117cbc0ddeef3 Mon Sep 17 00:00:00 2001 From: miraclesu Date: Thu, 5 Jan 2017 20:35:41 +0800 Subject: [PATCH] =?UTF-8?q?node:=20=E8=8A=82=E7=82=B9=E6=B3=A8=E5=86=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bin/node/server.go | 2 +- conf/conf.go | 2 + conf/files/base.json.sample | 3 +- conf/files/etcd.json.sample | 8 ++-- node/node.go | 81 ++++++++++++++++++++++++++++++++----- 5 files changed, 81 insertions(+), 15 deletions(-) diff --git a/bin/node/server.go b/bin/node/server.go index 9d9400e..921fa31 100644 --- a/bin/node/server.go +++ b/bin/node/server.go @@ -41,7 +41,7 @@ func main() { go n.Run() - log.Noticef("cronsun node[%s] pid[%d] service started, Ctrl+C or send kill sign to exit", n.IP, n.PID) + log.Noticef("cronsun node[%s] pid[%s] service started, Ctrl+C or send kill sign to exit", n.Key, n.PID) // 注册退出事件 event.On(event.EXIT, n.Stop) // 监听退出信号 diff --git a/conf/conf.go b/conf/conf.go index b1b5cbc..02b2e1e 100644 --- a/conf/conf.go +++ b/conf/conf.go @@ -38,6 +38,8 @@ type Conf struct { Proc string // proc 路径 Cmd string // cmd 路径 + Ttl int64 // 节点超时时间,单位秒 + Log log.Config Etcd client.Config } diff --git a/conf/files/base.json.sample b/conf/files/base.json.sample index 8300719..2a24cf9 100644 --- a/conf/files/base.json.sample +++ b/conf/files/base.json.sample @@ -1,6 +1,7 @@ { "Proc": "/cronsun/proc", - "Cmd": "cronsun/cmd", + "Cmd": "/cronsun/cmd", + "Timeout": 10, "Log": "@extend:log.json", "Etcd": "@extend:etcd.json" } diff --git a/conf/files/etcd.json.sample b/conf/files/etcd.json.sample index 95a5b80..60c4a26 100644 --- a/conf/files/etcd.json.sample +++ b/conf/files/etcd.json.sample @@ -1,11 +1,11 @@ { "Endpoints":[ - "http://192.168.11.27:2379", - "http://192.168.11.28:2379", - "http://192.168.11.29:2379" + "http://192.168.11.27:2389", + "http://192.168.11.28:2389", + "http://192.168.11.29:2389" ], "Username":"", "Password":"", "#DialTimeout":"单位秒", - "DialTimeout": 5 + "DialTimeout": 2 } diff --git a/node/node.go b/node/node.go index 4a17941..2ddd3c3 100644 --- a/node/node.go +++ b/node/node.go @@ -1,19 +1,33 @@ package node import ( + "fmt" "os" + "strconv" + "time" + + "golang.org/x/net/context" client "github.com/coreos/etcd/clientv3" "sunteng/commons/util" + "sunteng/cronsun/conf" +) + +const ( + ReqTimeout = 2 * time.Second + + Spliter = "/" ) // Node 执行 cron 命令服务的结构体 type Node struct { *client.Client - IP string - PID int + Key string + PID string + + lID client.LeaseID // lease id } func NewNode(cfg client.Config) (n *Node, err error) { @@ -30,21 +44,69 @@ func NewNode(cfg client.Config) (n *Node, err error) { n = &Node{ Client: cli, - IP: ip.String(), - PID: os.Getpid(), + Key: conf.Config.Proc + Spliter + ip.String(), + PID: strconv.Itoa(os.Getpid()), } return } // 注册到 /cronsun/proc/xx -func (n *Node) Register() error { - return nil +func (n *Node) Register() (err error) { + pid, err := n.Exist() + if err != nil { + return + } + + if pid != -1 { + return fmt.Errorf("node[%s] pid[%d] exist", n.Key, pid) + } + + resp, err := n.Client.Grant(context.TODO(), conf.Config.Ttl) + if err != nil { + return + } + + if _, err = n.Client.Put(context.TODO(), n.Key, n.PID, client.WithLease(resp.ID)); err != nil { + return + } + if _, err = n.Client.KeepAlive(context.TODO(), resp.ID); err != nil { + return + } + n.lID = resp.ID + + return } -// 更新 /cronsun/proc/xx/time -// 用于检查 node 是否存活 -func (n *Node) Heartbeat() { +// 判断 node 是否已注册到 etcd +// 存在则返回进行 pid,不存在返回 -1 +func (n *Node) Exist() (pid int, err error) { + ctx, cancel := context.WithTimeout(context.Background(), ReqTimeout) + resp, err := n.Client.Get(ctx, n.Key, client.WithFromKey()) + defer cancel() + if err != nil { + return + } + if len(resp.Kvs) == 0 { + return -1, nil + } + + if pid, err = strconv.Atoi(string(resp.Kvs[0].Value)); err != nil { + if _, err = n.Client.Delete(ctx, n.Key, client.WithFromKey()); err != nil { + return + } + } + + p, err := os.FindProcess(pid) + if err != nil { + return + } + + if p != nil { + return + } + + return -1, nil } // 启动服务 @@ -54,5 +116,6 @@ func (n *Node) Run() { // 启动服务 func (n *Node) Stop(i interface{}) { + n.Client.Revoke(context.TODO(), n.lID) n.Client.Close() }