From f9dbc16d0f0d89f397f4e1d367251c04c2162f06 Mon Sep 17 00:00:00 2001 From: miraclesu Date: Mon, 20 Feb 2017 12:19:49 +0800 Subject: [PATCH] =?UTF-8?q?node:=20=E4=BF=AE=E5=A4=8D=20group=20=E6=9B=B4?= =?UTF-8?q?=E6=96=B0=E5=A4=84=E7=90=86=E4=B8=8D=E5=88=B0=20job=20=E7=9A=84?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- models/job.go | 12 +++---- node/group.go | 63 +++++++++++++++++++++++++++++++++++++ node/node.go | 86 ++++++++++++++++++--------------------------------- 3 files changed, 99 insertions(+), 62 deletions(-) diff --git a/models/job.go b/models/job.go index fdb999a..d2ea27f 100644 --- a/models/job.go +++ b/models/job.go @@ -56,20 +56,20 @@ func (c *Cmd) GetID() string { } // 优先取结点里的值,更新 group 时可用 gid 判断是否对 job 进行处理 -func (j *JobRule) included(nid string, gs map[string]*Group) (string, bool) { +func (j *JobRule) included(nid string, gs map[string]*Group) bool { for i, count := 0, len(j.NodeIDs); i < count; i++ { if nid == j.NodeIDs[i] { - return "", true + return true } } for _, gid := range j.GroupIDs { - if _, ok := gs[gid]; ok { - return gid, true + if g, ok := gs[gid]; ok && g.Included(nid) { + return true } } - return "", false + return false } func GetJob(group, id string) (job *Job, err error) { @@ -262,7 +262,7 @@ func (j *Job) Cmds(nid string, gs map[string]*Group) (cmds map[string]*Cmd) { } } - if _, ok := r.included(nid, gs); ok { + if r.included(nid, gs) { cmd := &Cmd{ Job: j, JobRule: r, diff --git a/node/group.go b/node/group.go index 61fdbc1..c1607a6 100644 --- a/node/group.go +++ b/node/group.go @@ -5,3 +5,66 @@ import ( ) type Groups map[string]*models.Group + +type jobLink struct { + gname string + // rule id + rules map[string]bool +} + +// map[group id]map[job id]*jobLink +// 用于 group 发生变化的时候修改相应的 job +type link map[string]map[string]*jobLink + +func newLink(size int) link { + return make(link, size) +} + +func (l link) add(gid, jid, rid, gname string) { + js, ok := l[gid] + if !ok { + js = make(map[string]*jobLink, 4) + l[gid] = js + } + + j, ok := js[jid] + if !ok { + j = &jobLink{ + gname: gname, + rules: make(map[string]bool), + } + js[jid] = j + } + + j.rules[rid] = true +} + +func (l link) del(gid, jid, rid string) { + js, ok := l[gid] + if !ok { + return + } + + j, ok := js[jid] + if !ok { + return + } + + delete(j.rules, rid) + if len(j.rules) == 0 { + delete(js, jid) + } +} + +func (l link) delJob(gid, jid string) { + js, ok := l[gid] + if !ok { + return + } + + delete(js, jid) +} + +func (l link) delGroup(gid string) { + delete(l, gid) +} diff --git a/node/node.go b/node/node.go index 8d005e8..048a69a 100644 --- a/node/node.go +++ b/node/node.go @@ -26,9 +26,8 @@ type Node struct { jobs Jobs groups Groups cmds map[string]*models.Cmd - // map[group id]map[job id](job group) - // 用于 group 发生变化的时候修改相应的 job - link map[string]map[string]string + + link // 删除的 job id,用于 group 更新 delIDs map[string]bool @@ -92,7 +91,7 @@ func (n *Node) Register() (err error) { } func (n *Node) loadJobs() (err error) { - if n.groups, err = models.GetGroups(n.ID); err != nil { + if n.groups, err = models.GetGroups(""); err != nil { return } @@ -101,7 +100,7 @@ func (n *Node) loadJobs() (err error) { return } - n.jobs, n.link = make(Jobs, len(jobs)), make(map[string]map[string]string, len(n.groups)) + n.jobs, n.link = make(Jobs, len(jobs)), newLink(len(n.groups)) if len(jobs) == 0 { return } @@ -116,7 +115,7 @@ func (n *Node) loadJobs() (err error) { func (n *Node) addJob(job *models.Job, notice bool) { for _, r := range job.Rules { for _, gid := range r.GroupIDs { - n.addLink(gid, job.ID, job.Group) + n.link.add(gid, job.ID, r.ID, job.Group) } } @@ -143,7 +142,7 @@ func (n *Node) delJob(id string) { delete(n.jobs, id) for _, r := range job.Rules { for _, gid := range r.GroupIDs { - n.delLink(gid, job.ID) + n.link.delJob(gid, job.ID) } } @@ -168,7 +167,7 @@ func (n *Node) modJob(job *models.Job) { for _, r := range oJob.Rules { for _, gid := range r.GroupIDs { - n.delLink(gid, oJob.ID) + n.link.del(gid, oJob.ID, r.ID) } } @@ -187,7 +186,7 @@ func (n *Node) modJob(job *models.Job) { for _, r := range oJob.Rules { for _, gid := range r.GroupIDs { - n.addLink(gid, oJob.ID, oJob.Group) + n.link.add(gid, oJob.ID, r.ID, oJob.Group) } } } @@ -229,38 +228,14 @@ func (n *Node) delCmd(cmd *models.Cmd) { log.Noticef("job[%s] rule[%s] timer[%s] has deleted", cmd.Job.ID, cmd.JobRule.ID, cmd.Schedule) } -func (n *Node) addLink(gid, jid, gname string) { - if len(gid) == 0 { - return - } - - js, ok := n.link[gid] - if !ok { - js = make(map[string]string, 4) - n.link[gid] = js - } - - js[jid] = gname -} - -func (n *Node) delLink(gid, jid string) { - if len(gid) == 0 { - return - } - - js, ok := n.link[gid] - if !ok { - return - } - - delete(js, jid) -} - func (n *Node) addGroup(g *models.Group) { n.groups[g.ID] = g } func (n *Node) delGroup(id string) { + delete(n.groups, id) + n.link.delGroup(id) + job, ok := n.jobs[id] // 之前此任务没有在当前结点执行 if !ok { @@ -294,29 +269,26 @@ func (n *Node) modGroup(g *models.Group) { // 增加当前节点 if !oGroup.Included(n.ID) && g.Included(n.ID) { n.groups[g.ID] = g - jids := n.link[g.ID] - if len(jids) == 0 { + jls := n.link[g.ID] + if len(jls) == 0 { return } var err error - for jid, gname := range jids { + for jid, jl := range jls { job, ok := n.jobs[jid] - // job 之前已运行,无需改动 - if ok { - continue - } + if !ok { + // job 已删除 + if n.delIDs[jid] { + n.link.delJob(g.ID, jid) + continue + } - // job 已删除 - if n.delIDs[jid] { - n.delLink(g.ID, jid) - continue - } - - if job, err = models.GetJob(gname, jid); err != nil { - log.Warnf("get job[%s][%s] err: %s", gname, jid, err.Error()) - n.delLink(g.ID, jid) - continue + if job, err = models.GetJob(jl.gname, jid); err != nil { + log.Warnf("get job[%s][%s] err: %s", jl.gname, jid, err.Error()) + n.link.delJob(g.ID, jid) + continue + } } cmds := job.Cmds(n.ID, n.groups) @@ -324,20 +296,22 @@ func (n *Node) modGroup(g *models.Group) { n.addCmd(cmd, true) } } + return } // 移除当前节点 - jids := n.link[g.ID] - if len(jids) == 0 { + jls := n.link[g.ID] + if len(jls) == 0 { n.groups[g.ID] = g return } - for jid, _ := range jids { + for jid, _ := range jls { job, ok := n.jobs[jid] if !ok { // 数据出错 log.Warnf("WTF! group[%s] job[%s]", g.ID, jid) + n.link.delJob(g.ID, jid) continue }