node: 修复 group 更新处理不到 job 的问题

pull/1/head
miraclesu 2017-02-20 12:19:49 +08:00
parent 50ef9e0b8e
commit f9dbc16d0f
3 changed files with 99 additions and 62 deletions

View File

@ -56,20 +56,20 @@ func (c *Cmd) GetID() string {
}
// 优先取结点里的值,更新 group 时可用 gid 判断是否对 job 进行处理
func (j *JobRule) included(nid string, gs map[string]*Group) (string, bool) {
func (j *JobRule) included(nid string, gs map[string]*Group) bool {
for i, count := 0, len(j.NodeIDs); i < count; i++ {
if nid == j.NodeIDs[i] {
return "", true
return true
}
}
for _, gid := range j.GroupIDs {
if _, ok := gs[gid]; ok {
return gid, true
if g, ok := gs[gid]; ok && g.Included(nid) {
return true
}
}
return "", false
return false
}
func GetJob(group, id string) (job *Job, err error) {
@ -262,7 +262,7 @@ func (j *Job) Cmds(nid string, gs map[string]*Group) (cmds map[string]*Cmd) {
}
}
if _, ok := r.included(nid, gs); ok {
if r.included(nid, gs) {
cmd := &Cmd{
Job: j,
JobRule: r,

View File

@ -5,3 +5,66 @@ import (
)
type Groups map[string]*models.Group
type jobLink struct {
gname string
// rule id
rules map[string]bool
}
// map[group id]map[job id]*jobLink
// 用于 group 发生变化的时候修改相应的 job
type link map[string]map[string]*jobLink
func newLink(size int) link {
return make(link, size)
}
func (l link) add(gid, jid, rid, gname string) {
js, ok := l[gid]
if !ok {
js = make(map[string]*jobLink, 4)
l[gid] = js
}
j, ok := js[jid]
if !ok {
j = &jobLink{
gname: gname,
rules: make(map[string]bool),
}
js[jid] = j
}
j.rules[rid] = true
}
func (l link) del(gid, jid, rid string) {
js, ok := l[gid]
if !ok {
return
}
j, ok := js[jid]
if !ok {
return
}
delete(j.rules, rid)
if len(j.rules) == 0 {
delete(js, jid)
}
}
func (l link) delJob(gid, jid string) {
js, ok := l[gid]
if !ok {
return
}
delete(js, jid)
}
func (l link) delGroup(gid string) {
delete(l, gid)
}

View File

@ -26,9 +26,8 @@ type Node struct {
jobs Jobs
groups Groups
cmds map[string]*models.Cmd
// map[group id]map[job id](job group)
// 用于 group 发生变化的时候修改相应的 job
link map[string]map[string]string
link
// 删除的 job id用于 group 更新
delIDs map[string]bool
@ -92,7 +91,7 @@ func (n *Node) Register() (err error) {
}
func (n *Node) loadJobs() (err error) {
if n.groups, err = models.GetGroups(n.ID); err != nil {
if n.groups, err = models.GetGroups(""); err != nil {
return
}
@ -101,7 +100,7 @@ func (n *Node) loadJobs() (err error) {
return
}
n.jobs, n.link = make(Jobs, len(jobs)), make(map[string]map[string]string, len(n.groups))
n.jobs, n.link = make(Jobs, len(jobs)), newLink(len(n.groups))
if len(jobs) == 0 {
return
}
@ -116,7 +115,7 @@ func (n *Node) loadJobs() (err error) {
func (n *Node) addJob(job *models.Job, notice bool) {
for _, r := range job.Rules {
for _, gid := range r.GroupIDs {
n.addLink(gid, job.ID, job.Group)
n.link.add(gid, job.ID, r.ID, job.Group)
}
}
@ -143,7 +142,7 @@ func (n *Node) delJob(id string) {
delete(n.jobs, id)
for _, r := range job.Rules {
for _, gid := range r.GroupIDs {
n.delLink(gid, job.ID)
n.link.delJob(gid, job.ID)
}
}
@ -168,7 +167,7 @@ func (n *Node) modJob(job *models.Job) {
for _, r := range oJob.Rules {
for _, gid := range r.GroupIDs {
n.delLink(gid, oJob.ID)
n.link.del(gid, oJob.ID, r.ID)
}
}
@ -187,7 +186,7 @@ func (n *Node) modJob(job *models.Job) {
for _, r := range oJob.Rules {
for _, gid := range r.GroupIDs {
n.addLink(gid, oJob.ID, oJob.Group)
n.link.add(gid, oJob.ID, r.ID, oJob.Group)
}
}
}
@ -229,38 +228,14 @@ func (n *Node) delCmd(cmd *models.Cmd) {
log.Noticef("job[%s] rule[%s] timer[%s] has deleted", cmd.Job.ID, cmd.JobRule.ID, cmd.Schedule)
}
func (n *Node) addLink(gid, jid, gname string) {
if len(gid) == 0 {
return
}
js, ok := n.link[gid]
if !ok {
js = make(map[string]string, 4)
n.link[gid] = js
}
js[jid] = gname
}
func (n *Node) delLink(gid, jid string) {
if len(gid) == 0 {
return
}
js, ok := n.link[gid]
if !ok {
return
}
delete(js, jid)
}
func (n *Node) addGroup(g *models.Group) {
n.groups[g.ID] = g
}
func (n *Node) delGroup(id string) {
delete(n.groups, id)
n.link.delGroup(id)
job, ok := n.jobs[id]
// 之前此任务没有在当前结点执行
if !ok {
@ -294,29 +269,26 @@ func (n *Node) modGroup(g *models.Group) {
// 增加当前节点
if !oGroup.Included(n.ID) && g.Included(n.ID) {
n.groups[g.ID] = g
jids := n.link[g.ID]
if len(jids) == 0 {
jls := n.link[g.ID]
if len(jls) == 0 {
return
}
var err error
for jid, gname := range jids {
for jid, jl := range jls {
job, ok := n.jobs[jid]
// job 之前已运行,无需改动
if ok {
continue
}
if !ok {
// job 已删除
if n.delIDs[jid] {
n.link.delJob(g.ID, jid)
continue
}
// job 已删除
if n.delIDs[jid] {
n.delLink(g.ID, jid)
continue
}
if job, err = models.GetJob(gname, jid); err != nil {
log.Warnf("get job[%s][%s] err: %s", gname, jid, err.Error())
n.delLink(g.ID, jid)
continue
if job, err = models.GetJob(jl.gname, jid); err != nil {
log.Warnf("get job[%s][%s] err: %s", jl.gname, jid, err.Error())
n.link.delJob(g.ID, jid)
continue
}
}
cmds := job.Cmds(n.ID, n.groups)
@ -324,20 +296,22 @@ func (n *Node) modGroup(g *models.Group) {
n.addCmd(cmd, true)
}
}
return
}
// 移除当前节点
jids := n.link[g.ID]
if len(jids) == 0 {
jls := n.link[g.ID]
if len(jls) == 0 {
n.groups[g.ID] = g
return
}
for jid, _ := range jids {
for jid, _ := range jls {
job, ok := n.jobs[jid]
if !ok {
// 数据出错
log.Warnf("WTF! group[%s] job[%s]", g.ID, jid)
n.link.delJob(g.ID, jid)
continue
}