node: 监听 group

处理增删改
pull/1/head
miraclesu 2017-01-20 17:42:50 +08:00
parent d2640709bb
commit 4460c5f4c1
3 changed files with 44 additions and 14 deletions

View File

@ -111,9 +111,10 @@ func GetJobFromKv(kv *mvccpb.KeyValue) (job *Job, err error) {
return return
} }
func (j *Job) Schedule(nid string, gs map[string]*Group, rebuild bool) (string, string) { // Schedule return schedule and group id
func (j *Job) Schedule(nid string, gs map[string]*Group, rebuild bool) (sch string, gid string) {
if j.Pause { if j.Pause {
return "", "" return
} }
if j.build && !rebuild { if j.build && !rebuild {

View File

@ -14,11 +14,7 @@ func newJob(id string, g Group) (j Job, err error) {
j = make(Job, len(jobs)) j = make(Job, len(jobs))
for _, job := range jobs { for _, job := range jobs {
if job.Pause { if sch, _ := job.Schedule(id, g, false); len(sch) > 0 {
continue
}
if len(job.Schedule(id, g)) > 0 {
j[job.GetID()] = job j[job.GetID()] = job
} }
} }

View File

@ -165,15 +165,28 @@ func (n *Node) delJob(job *models.Job) {
n.Cron.DelJob(job) n.Cron.DelJob(job)
} }
func (n *Node) addGroup(g *models.Group) { func (n *Node) addGroup(g *models.Group) bool {
if !g.Included(n.ID) { if !g.Included(n.ID) {
return return false
} }
return
if og, ok := n.groups[g.ID]; ok {
*og = *g
// TODO 处理相应的 jobs
return true
}
n.groups[g.ID] = g
return true
} }
func (n *Node) delGroup(g *models.Group) { func (n *Node) delGroup(g *models.Group) {
if !g.Included(n.ID) {
return
}
delete(n.groups, g.ID) delete(n.groups, g.ID)
// TODO 处理相应的 jobs
} }
func (n *Node) watchJobs() { func (n *Node) watchJobs() {
@ -189,7 +202,6 @@ func (n *Node) watchJobs() {
} }
n.addJob(job) n.addJob(job)
case ev.IsModify(): case ev.IsModify():
job, err := models.GetJobFromKv(ev.Kv) job, err := models.GetJobFromKv(ev.Kv)
if err != nil { if err != nil {
@ -208,7 +220,6 @@ func (n *Node) watchJobs() {
// 此结点暂停或不再执行此 job // 此结点暂停或不再执行此 job
n.delJob(prevJob) n.delJob(prevJob)
case ev.Type == client.EventTypeDelete: case ev.Type == client.EventTypeDelete:
prevJob, err := models.GetJobFromKv(ev.PrevKv) prevJob, err := models.GetJobFromKv(ev.PrevKv)
if err != nil { if err != nil {
@ -217,7 +228,6 @@ func (n *Node) watchJobs() {
} }
n.delJob(prevJob) n.delJob(prevJob)
default: default:
log.Warnf("unknown event type[%v] from job[%s]", ev.Type, string(ev.Kv.Key)) log.Warnf("unknown event type[%v] from job[%s]", ev.Type, string(ev.Kv.Key))
} }
@ -239,9 +249,32 @@ func (n *Node) watchGroups() {
} }
n.addGroup(g) n.addGroup(g)
case ev.IsModify(): case ev.IsModify():
g, err := models.GetGroupFromKv(ev.Kv)
if err != nil {
log.Warnf(err.Error())
continue
}
prevG, err := models.GetGroupFromKv(ev.PrevKv)
if err != nil {
log.Warnf(err.Error())
continue
}
if n.addGroup(g) {
continue
}
// 此 group 已移除当前结点
n.delGroup(prevG)
case ev.Type == client.EventTypeDelete: case ev.Type == client.EventTypeDelete:
prevG, err := models.GetGroupFromKv(ev.PrevKv)
if err != nil {
log.Warnf(err.Error())
continue
}
n.delGroup(prevG)
default: default:
log.Warnf("unknown event type[%v] from group[%s]", ev.Type, string(ev.Kv.Key)) log.Warnf("unknown event type[%v] from group[%s]", ev.Type, string(ev.Kv.Key))
} }