noticer: 增加故障节点通知

pull/1/head
miraclesu 2017-04-14 15:33:14 +08:00
parent a10c01a391
commit 1cdb750245
3 changed files with 56 additions and 7 deletions

View File

@ -557,12 +557,7 @@ func (j *Job) Notify(t time.Time, msg string) {
m := Message{ m := Message{
Subject: "node[" + j.runOn + "] job[" + j.ID + "] time[" + ts + "] exec failed", Subject: "node[" + j.runOn + "] job[" + j.ID + "] time[" + ts + "] exec failed",
Body: body, Body: body,
} To: j.To,
if len(conf.Config.Mail.To) > 0 {
m.To = append(m.To, conf.Config.Mail.To...)
}
if len(j.To) > 0 {
m.To = append(m.To, j.To...)
} }
data, err := json.Marshal(m) data, err := json.Marshal(m)

View File

@ -74,13 +74,28 @@ func (n *Node) Exist() (pid int, err error) {
} }
func GetNodes() (nodes []*Node, err error) { func GetNodes() (nodes []*Node, err error) {
return GetNodesBy(nil)
}
func GetNodesBy(query interface{}) (nodes []*Node, err error) {
err = mgoDB.WithC(Coll_Node, func(c *mgo.Collection) error { err = mgoDB.WithC(Coll_Node, func(c *mgo.Collection) error {
return c.Find(nil).All(&nodes) return c.Find(query).All(&nodes)
}) })
return return
} }
func ISNodeFault(id string) (bool, error) {
n := 0
err := mgoDB.WithC(Coll_Node, func(c *mgo.Collection) error {
var e error
n, e = c.Find(bson.M{"_id": id, "alived": true}).Count()
return e
})
return n > 0, err
}
func GetNodeGroups() (list []*Group, err error) { func GetNodeGroups() (list []*Group, err error) {
resp, err := DefalutClient.Get(conf.Config.Group, client.WithPrefix(), client.WithSort(client.SortByKey, client.SortAscend)) resp, err := DefalutClient.Get(conf.Config.Group, client.WithPrefix(), client.WithSort(client.SortByKey, client.SortAscend))
if err != nil { if err != nil {
@ -101,6 +116,10 @@ func GetNodeGroups() (list []*Group, err error) {
return return
} }
func WatchNode() client.WatchChan {
return DefalutClient.Watch(conf.Config.Node, client.WithPrefix())
}
// On 结点实例启动后,在 mongoDB 中记录存活信息 // On 结点实例启动后,在 mongoDB 中记录存活信息
func (n *Node) On() { func (n *Node) On() {
n.Alived = true n.Alived = true

View File

@ -146,6 +146,8 @@ func (h *HttpAPI) Send(msg *Message) {
func StartNoticer(n Noticer) { func StartNoticer(n Noticer) {
go n.Serve() go n.Serve()
go monitorNodes(n)
rch := DefalutClient.Watch(conf.Config.Noticer, client.WithPrefix()) rch := DefalutClient.Watch(conf.Config.Noticer, client.WithPrefix())
var err error var err error
for wresp := range rch { for wresp := range rch {
@ -157,8 +159,41 @@ func StartNoticer(n Noticer) {
log.Warnf("msg[%s] umarshal err: %s", string(ev.Kv.Value), err.Error()) log.Warnf("msg[%s] umarshal err: %s", string(ev.Kv.Value), err.Error())
continue continue
} }
if len(conf.Config.Mail.To) > 0 {
msg.To = append(msg.To, conf.Config.Mail.To...)
}
n.Send(msg) n.Send(msg)
} }
} }
} }
} }
func monitorNodes(n Noticer) {
var (
err error
ok bool
id string
)
rch := WatchNode()
for wresp := range rch {
for _, ev := range wresp.Events {
switch {
case ev.Type == client.EventTypeDelete:
id = GetIDFromKey(string(ev.Kv.Key))
ok, err = ISNodeFault(id)
if err != nil {
log.Warnf("query node[%s] err: %s", id, err.Error())
continue
}
if ok {
n.Send(&Message{
Subject: "node[" + id + "] fault at time[" + time.Now().Format(time.RFC3339) + "]",
})
}
}
}
}
}