From 45e7a8b1f576e7cd931700714cba927834562f78 Mon Sep 17 00:00:00 2001 From: miraclesu Date: Mon, 9 Jan 2017 18:51:52 +0800 Subject: [PATCH] =?UTF-8?q?node:=20=E8=B0=83=E6=95=B4=20node=20=E7=BB=93?= =?UTF-8?q?=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bin/node/server.go | 3 +- bin/web/server.go | 6 ++++ models/client.go | 2 +- models/node.go | 54 +++++++++++++++++++++++++++++++ node/node.go | 80 ++++++++-------------------------------------- 5 files changed, 77 insertions(+), 68 deletions(-) diff --git a/bin/node/server.go b/bin/node/server.go index 90f1a1c..6baf171 100644 --- a/bin/node/server.go +++ b/bin/node/server.go @@ -10,6 +10,7 @@ import ( "sunteng/commons/log" "sunteng/cronsun/conf" + "sunteng/cronsun/models" "sunteng/cronsun/node" ) @@ -23,7 +24,7 @@ func main() { //set cpu usage runtime.GOMAXPROCS(*gomax) - if err := conf.Init(); err != nil { + if err := models.Init(); err != nil { log.Error(err.Error()) return } diff --git a/bin/web/server.go b/bin/web/server.go index daa579e..a3d8c0d 100644 --- a/bin/web/server.go +++ b/bin/web/server.go @@ -8,10 +8,16 @@ import ( "sunteng/commons/event" "sunteng/commons/log" "sunteng/cronsun/conf" + "sunteng/cronsun/models" "sunteng/cronsun/web" ) func main() { + if err := models.Init(); err != nil { + log.Error(err.Error()) + return + } + l, err := net.Listen("tcp", conf.Config.Web.BindAddr) if err != nil { log.Error(err.Error()) diff --git a/models/client.go b/models/client.go index 14094d6..a30503b 100644 --- a/models/client.go +++ b/models/client.go @@ -68,7 +68,7 @@ func (c *Client) Get(key string, opts ...client.OpOption) (*client.GetResponse, return c.Client.Get(ctx, key, opts...) } -func (c *Client) Del(key string, opts ...client.OpOption) (*client.DeleteResponse, error) { +func (c *Client) Delete(key string, opts ...client.OpOption) (*client.DeleteResponse, error) { ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout) defer cancel() return c.Client.Delete(ctx, key, opts...) diff --git a/models/node.go b/models/node.go index ad3e588..a8a9314 100644 --- a/models/node.go +++ b/models/node.go @@ -1,8 +1,62 @@ package models +import ( + "os" + "strconv" + "syscall" + + client "github.com/coreos/etcd/clientv3" + + "sunteng/cronsun/conf" +) + // 执行 cron cmd 的进程 // 注册到 /cronsun/proc/ type Node struct { ID string `json:"-"` // ip PID string `json:"pid"` // 进程 pid } + +func (n *Node) String() string { + return "node[" + n.ID + "] pid[" + n.PID + "]" +} + +func (n *Node) Put(opts ...client.OpOption) (*client.PutResponse, error) { + return DefalutClient.Put(conf.Config.Proc+n.ID, n.PID, opts...) +} + +func (n *Node) Del() (*client.DeleteResponse, error) { + return DefalutClient.Delete(conf.Config.Proc+n.ID, client.WithFromKey()) +} + +// 判断 node 是否已注册到 etcd +// 存在则返回进行 pid,不存在返回 -1 +func (n *Node) Exist() (pid int, err error) { + resp, err := DefalutClient.Get(conf.Config.Proc+n.ID, client.WithFromKey()) + if err != nil { + return + } + + if len(resp.Kvs) == 0 { + return -1, nil + } + + if pid, err = strconv.Atoi(string(resp.Kvs[0].Value)); err != nil { + if _, err = DefalutClient.Delete(conf.Config.Proc+n.ID, client.WithFromKey()); err != nil { + return + } + return -1, nil + } + + p, err := os.FindProcess(pid) + if err != nil { + return -1, nil + } + + // TODO: 暂时不考虑 linux/unix 以外的系统 + if p != nil && p.Signal(syscall.Signal(0)) == nil { + return + } + + return -1, nil +} diff --git a/node/node.go b/node/node.go index a4b9a99..2c2f854 100644 --- a/node/node.go +++ b/node/node.go @@ -4,7 +4,6 @@ import ( "fmt" "os" "strconv" - "syscall" "time" "golang.org/x/net/context" @@ -14,18 +13,15 @@ import ( "sunteng/commons/log" "sunteng/commons/util" "sunteng/cronsun/conf" + "sunteng/cronsun/models" ) // Node 执行 cron 命令服务的结构体 type Node struct { - *client.Client + *models.Client + *models.Node - ttl int64 - reqTimeout time.Duration - prefix string - - Key string - PID string + ttl int64 lID client.LeaseID // lease id lch <-chan *client.LeaseKeepAliveResponse @@ -39,39 +35,28 @@ func NewNode(cfg *conf.Conf) (n *Node, err error) { return } - cli, err := client.New(cfg.Etcd) - if err != nil { - return - } - n = &Node{ - Client: cli, - - ttl: cfg.Ttl, - reqTimeout: time.Duration(cfg.ReqTimeout) * time.Second, - prefix: cfg.Proc, - - Key: cfg.Proc + ip.String(), - PID: strconv.Itoa(os.Getpid()), + Client: models.DefalutClient, + Node: &models.Node{ + ID: ip.String(), + PID: strconv.Itoa(os.Getpid()), + }, + ttl: cfg.Ttl, done: make(chan struct{}), } return } -func (n *Node) String() string { - return "node[" + n.Key[len(n.prefix):] + "] pid[" + n.PID + "]" -} - // 注册到 /cronsun/proc/xx func (n *Node) Register() (err error) { - pid, err := n.Exist() + pid, err := n.Node.Exist() if err != nil { return } if pid != -1 { - return fmt.Errorf("node[%s] pid[%d] exist", n.Key[len(n.prefix):], pid) + return fmt.Errorf("node[%s] pid[%d] exist", n.Node.ID, pid) } resp, err := n.Client.Grant(context.TODO(), n.ttl) @@ -79,7 +64,7 @@ func (n *Node) Register() (err error) { return } - if _, err = n.Client.Put(context.TODO(), n.Key, n.PID, client.WithLease(resp.ID)); err != nil { + if _, err = n.Node.Put(client.WithLease(resp.ID)); err != nil { return } @@ -92,40 +77,6 @@ func (n *Node) Register() (err error) { return } -// 判断 node 是否已注册到 etcd -// 存在则返回进行 pid,不存在返回 -1 -func (n *Node) Exist() (pid int, err error) { - ctx, cancel := context.WithTimeout(context.Background(), n.reqTimeout) - resp, err := n.Client.Get(ctx, n.Key, client.WithFromKey()) - defer cancel() - if err != nil { - return - } - - if len(resp.Kvs) == 0 { - return -1, nil - } - - if pid, err = strconv.Atoi(string(resp.Kvs[0].Value)); err != nil { - if _, err = n.Client.Delete(ctx, n.Key, client.WithFromKey()); err != nil { - return - } - return -1, nil - } - - p, err := os.FindProcess(pid) - if err != nil { - return -1, nil - } - - // TODO: 暂时不考虑 linux/unix 以外的系统 - if p != nil && p.Signal(syscall.Signal(0)) == nil { - return - } - - return -1, nil -} - // 启动服务 func (n *Node) Run() { go n.keepAlive() @@ -156,9 +107,6 @@ func (n *Node) keepAlive() { // 启动服务 func (n *Node) Stop(i interface{}) { close(n.done) - // 防止断网时卡住 - ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) - n.Client.Delete(ctx, n.Key, client.WithFromKey()) - cancel() + n.Node.Del() n.Client.Close() }