From 1cdb7502450015963429866ab9a52456f1555dba Mon Sep 17 00:00:00 2001 From: miraclesu Date: Fri, 14 Apr 2017 15:33:14 +0800 Subject: [PATCH] =?UTF-8?q?noticer:=20=E5=A2=9E=E5=8A=A0=E6=95=85=E9=9A=9C?= =?UTF-8?q?=E8=8A=82=E7=82=B9=E9=80=9A=E7=9F=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- models/job.go | 7 +------ models/node.go | 21 ++++++++++++++++++++- models/noticer.go | 35 +++++++++++++++++++++++++++++++++++ 3 files changed, 56 insertions(+), 7 deletions(-) diff --git a/models/job.go b/models/job.go index 2a8708e..f81a81a 100644 --- a/models/job.go +++ b/models/job.go @@ -557,12 +557,7 @@ func (j *Job) Notify(t time.Time, msg string) { m := Message{ Subject: "node[" + j.runOn + "] job[" + j.ID + "] time[" + ts + "] exec failed", Body: body, - } - if len(conf.Config.Mail.To) > 0 { - m.To = append(m.To, conf.Config.Mail.To...) - } - if len(j.To) > 0 { - m.To = append(m.To, j.To...) + To: j.To, } data, err := json.Marshal(m) diff --git a/models/node.go b/models/node.go index 025ede8..187db57 100644 --- a/models/node.go +++ b/models/node.go @@ -74,13 +74,28 @@ func (n *Node) Exist() (pid int, err error) { } func GetNodes() (nodes []*Node, err error) { + return GetNodesBy(nil) +} + +func GetNodesBy(query interface{}) (nodes []*Node, err error) { err = mgoDB.WithC(Coll_Node, func(c *mgo.Collection) error { - return c.Find(nil).All(&nodes) + return c.Find(query).All(&nodes) }) return } +func ISNodeFault(id string) (bool, error) { + n := 0 + err := mgoDB.WithC(Coll_Node, func(c *mgo.Collection) error { + var e error + n, e = c.Find(bson.M{"_id": id, "alived": true}).Count() + return e + }) + + return n > 0, err +} + func GetNodeGroups() (list []*Group, err error) { resp, err := DefalutClient.Get(conf.Config.Group, client.WithPrefix(), client.WithSort(client.SortByKey, client.SortAscend)) if err != nil { @@ -101,6 +116,10 @@ func GetNodeGroups() (list []*Group, err error) { return } +func WatchNode() client.WatchChan { + return DefalutClient.Watch(conf.Config.Node, client.WithPrefix()) +} + // On 结点实例启动后,在 mongoDB 中记录存活信息 func (n *Node) On() { n.Alived = true diff --git a/models/noticer.go b/models/noticer.go index 7c90431..d3e0a87 100644 --- a/models/noticer.go +++ b/models/noticer.go @@ -146,6 +146,8 @@ func (h *HttpAPI) Send(msg *Message) { func StartNoticer(n Noticer) { go n.Serve() + go monitorNodes(n) + rch := DefalutClient.Watch(conf.Config.Noticer, client.WithPrefix()) var err error for wresp := range rch { @@ -157,8 +159,41 @@ func StartNoticer(n Noticer) { log.Warnf("msg[%s] umarshal err: %s", string(ev.Kv.Value), err.Error()) continue } + + if len(conf.Config.Mail.To) > 0 { + msg.To = append(msg.To, conf.Config.Mail.To...) + } n.Send(msg) } } } } + +func monitorNodes(n Noticer) { + var ( + err error + ok bool + id string + ) + rch := WatchNode() + + for wresp := range rch { + for _, ev := range wresp.Events { + switch { + case ev.Type == client.EventTypeDelete: + id = GetIDFromKey(string(ev.Kv.Key)) + ok, err = ISNodeFault(id) + if err != nil { + log.Warnf("query node[%s] err: %s", id, err.Error()) + continue + } + + if ok { + n.Send(&Message{ + Subject: "node[" + id + "] fault at time[" + time.Now().Format(time.RFC3339) + "]", + }) + } + } + } + } +}