node: 节点注册

pull/1/head
miraclesu 2017-01-05 20:35:41 +08:00
parent 5302689406
commit 5970541562
5 changed files with 81 additions and 15 deletions

View File

@ -41,7 +41,7 @@ func main() {
go n.Run()
log.Noticef("cronsun node[%s] pid[%d] service started, Ctrl+C or send kill sign to exit", n.IP, n.PID)
log.Noticef("cronsun node[%s] pid[%s] service started, Ctrl+C or send kill sign to exit", n.Key, n.PID)
// 注册退出事件
event.On(event.EXIT, n.Stop)
// 监听退出信号

View File

@ -38,6 +38,8 @@ type Conf struct {
Proc string // proc 路径
Cmd string // cmd 路径
Ttl int64 // 节点超时时间,单位秒
Log log.Config
Etcd client.Config
}

View File

@ -1,6 +1,7 @@
{
"Proc": "/cronsun/proc",
"Cmd": "cronsun/cmd",
"Cmd": "/cronsun/cmd",
"Timeout": 10,
"Log": "@extend:log.json",
"Etcd": "@extend:etcd.json"
}

View File

@ -1,11 +1,11 @@
{
"Endpoints":[
"http://192.168.11.27:2379",
"http://192.168.11.28:2379",
"http://192.168.11.29:2379"
"http://192.168.11.27:2389",
"http://192.168.11.28:2389",
"http://192.168.11.29:2389"
],
"Username":"",
"Password":"",
"#DialTimeout":"单位秒",
"DialTimeout": 5
"DialTimeout": 2
}

View File

@ -1,19 +1,33 @@
package node
import (
"fmt"
"os"
"strconv"
"time"
"golang.org/x/net/context"
client "github.com/coreos/etcd/clientv3"
"sunteng/commons/util"
"sunteng/cronsun/conf"
)
const (
ReqTimeout = 2 * time.Second
Spliter = "/"
)
// Node 执行 cron 命令服务的结构体
type Node struct {
*client.Client
IP string
PID int
Key string
PID string
lID client.LeaseID // lease id
}
func NewNode(cfg client.Config) (n *Node, err error) {
@ -30,21 +44,69 @@ func NewNode(cfg client.Config) (n *Node, err error) {
n = &Node{
Client: cli,
IP: ip.String(),
PID: os.Getpid(),
Key: conf.Config.Proc + Spliter + ip.String(),
PID: strconv.Itoa(os.Getpid()),
}
return
}
// 注册到 /cronsun/proc/xx
func (n *Node) Register() error {
return nil
func (n *Node) Register() (err error) {
pid, err := n.Exist()
if err != nil {
return
}
if pid != -1 {
return fmt.Errorf("node[%s] pid[%d] exist", n.Key, pid)
}
resp, err := n.Client.Grant(context.TODO(), conf.Config.Ttl)
if err != nil {
return
}
if _, err = n.Client.Put(context.TODO(), n.Key, n.PID, client.WithLease(resp.ID)); err != nil {
return
}
if _, err = n.Client.KeepAlive(context.TODO(), resp.ID); err != nil {
return
}
n.lID = resp.ID
return
}
// 更新 /cronsun/proc/xx/time
// 用于检查 node 是否存活
func (n *Node) Heartbeat() {
// 判断 node 是否已注册到 etcd
// 存在则返回进行 pid不存在返回 -1
func (n *Node) Exist() (pid int, err error) {
ctx, cancel := context.WithTimeout(context.Background(), ReqTimeout)
resp, err := n.Client.Get(ctx, n.Key, client.WithFromKey())
defer cancel()
if err != nil {
return
}
if len(resp.Kvs) == 0 {
return -1, nil
}
if pid, err = strconv.Atoi(string(resp.Kvs[0].Value)); err != nil {
if _, err = n.Client.Delete(ctx, n.Key, client.WithFromKey()); err != nil {
return
}
}
p, err := os.FindProcess(pid)
if err != nil {
return
}
if p != nil {
return
}
return -1, nil
}
// 启动服务
@ -54,5 +116,6 @@ func (n *Node) Run() {
// 启动服务
func (n *Node) Stop(i interface{}) {
n.Client.Revoke(context.TODO(), n.lID)
n.Client.Close()
}