node: 调整 node etcd 路径

增加当前执行任务的 etcd 配置
pull/1/head
miraclesu 2017-03-06 16:09:52 +08:00
parent 2718dd22e0
commit 40f3fe087b
5 changed files with 13 additions and 10 deletions

View File

@ -41,7 +41,8 @@ func Init() error {
}
type Conf struct {
Proc string // proc 路径
Node string // node 进程路径
Proc string // 当前执行任务路径
Cmd string // cmd 路径
Group string // 节点分组
@ -98,6 +99,7 @@ func (c *Conf) parse() error {
log.InitConf(c.Log)
c.Cmd = cleanKeyPrefix(c.Cmd)
c.Node = cleanKeyPrefix(c.Node)
c.Proc = cleanKeyPrefix(c.Proc)
c.Group = cleanKeyPrefix(c.Group)
@ -121,9 +123,9 @@ func (c *Conf) watch() error {
case event := <-watcher.Events:
// 保存文件时会产生多个事件
if event.Op&(fsnotify.Write|fsnotify.Chmod) > 0 {
timer.Reset(duration)
update = true
}
timer.Reset(duration)
case <-timer.C:
if update {
c.reload()

View File

@ -1,5 +1,6 @@
{
"Web": "@extend:web.json",
"Node": "/cronsun/node/",
"Proc": "/cronsun/proc/",
"Cmd": "/cronsun/cmd/",
"Group": "/cronsun/group/",

View File

@ -18,7 +18,7 @@ const (
)
// 执行 cron cmd 的进程
// 注册到 /cronsun/proc/<id>
// 注册到 /cronsun/node/<id>
type Node struct {
ID string `bson:"_id" json:"id"` // ip
PID string `bson:"pid" json:"pid"` // 进程 pid
@ -32,17 +32,17 @@ func (n *Node) String() string {
}
func (n *Node) Put(opts ...client.OpOption) (*client.PutResponse, error) {
return DefalutClient.Put(conf.Config.Proc+n.ID, n.PID, opts...)
return DefalutClient.Put(conf.Config.Node+n.ID, n.PID, opts...)
}
func (n *Node) Del() (*client.DeleteResponse, error) {
return DefalutClient.Delete(conf.Config.Proc + n.ID)
return DefalutClient.Delete(conf.Config.Node + n.ID)
}
// 判断 node 是否已注册到 etcd
// 存在则返回进行 pid不存在返回 -1
func (n *Node) Exist() (pid int, err error) {
resp, err := DefalutClient.Get(conf.Config.Proc + n.ID)
resp, err := DefalutClient.Get(conf.Config.Node + n.ID)
if err != nil {
return
}
@ -52,7 +52,7 @@ func (n *Node) Exist() (pid int, err error) {
}
if pid, err = strconv.Atoi(string(resp.Kvs[0].Value)); err != nil {
if _, err = DefalutClient.Delete(conf.Config.Proc + n.ID); err != nil {
if _, err = DefalutClient.Delete(conf.Config.Node + n.ID); err != nil {
return
}
return -1, nil

View File

@ -61,7 +61,7 @@ func NewNode(cfg *conf.Conf) (n *Node, err error) {
return
}
// 注册到 /cronsun/proc/xx
// 注册到 /cronsun/node/xx
func (n *Node) Register() (err error) {
pid, err := n.Node.Exist()
if err != nil {

View File

@ -157,7 +157,7 @@ func (n *Node) GetNodes(w http.ResponseWriter, r *http.Request) {
return
}
gresp, err := models.DefalutClient.Get(conf.Config.Proc, v3.WithPrefix(), v3.WithKeysOnly())
gresp, err := models.DefalutClient.Get(conf.Config.Node, v3.WithPrefix(), v3.WithKeysOnly())
if err == nil {
connecedMap := make(map[string]bool, gresp.Count)
for i := range gresp.Kvs {
@ -170,7 +170,7 @@ func (n *Node) GetNodes(w http.ResponseWriter, r *http.Request) {
nodes[i].Connected = connecedMap[nodes[i].ID]
}
} else {
log.Errorf("failed to fetch key[%s] from etcd: %s", conf.Config.Proc, err.Error())
log.Errorf("failed to fetch key[%s] from etcd: %s", conf.Config.Node, err.Error())
}
outJSONWithCode(w, http.StatusOK, nodes)