node: 监听 group 更新相应的 job

pull/1/head
miraclesu 2017-02-17 21:29:04 +08:00
parent 7e1d4c1bd5
commit 924838962c
2 changed files with 144 additions and 46 deletions

View File

@ -223,8 +223,8 @@ func (j *Job) Run() {
j.Success(t, string(out)) j.Success(t, string(out))
} }
// 从 etcd 的 key 中取 job id // 从 etcd 的 key 中取 id
func GetJobID(key string) string { func GetIDFromKey(key string) string {
index := strings.LastIndex(key, "/") index := strings.LastIndex(key, "/")
if index < 0 { if index < 0 {
return "" return ""

View File

@ -26,9 +26,11 @@ type Node struct {
jobs Jobs jobs Jobs
groups Groups groups Groups
cmds map[string]*models.Cmd cmds map[string]*models.Cmd
// map[group id]map[job id]bool // map[group id]map[job id](job group)
// 用于 group 发生变化的时候修改相应的 job // 用于 group 发生变化的时候修改相应的 job
link map[string]map[string]bool link map[string]map[string]string
// 删除的 job id用于 group 更新
delIDs map[string]bool
ttl int64 ttl int64
@ -96,18 +98,30 @@ func (n *Node) loadJobs() (err error) {
if n.jobs, err = loadJobs(n.ID, n.groups); err != nil { if n.jobs, err = loadJobs(n.ID, n.groups); err != nil {
return return
} }
jobs, err := models.GetJobs()
if len(n.jobs) == 0 { if err != nil {
return return
} }
for _, job := range n.jobs { n.jobs, n.link = make(Jobs, len(jobs)), make(map[string]map[string]string, len(n.groups))
if len(jobs) == 0 {
return
}
for _, job := range jobs {
n.addJob(job, false) n.addJob(job, false)
} }
return return
} }
func (n *Node) addJob(job *models.Job, notice bool) { func (n *Node) addJob(job *models.Job, notice bool) {
for _, r := range job.Rules {
for _, gid := range r.GroupIDs {
n.addLink(gid, job.ID, job.Group)
}
}
cmds := job.Cmds(n.ID, n.groups) cmds := job.Cmds(n.ID, n.groups)
if len(cmds) == 0 { if len(cmds) == 0 {
return return
@ -124,9 +138,17 @@ func (n *Node) delJob(id string) {
job, ok := n.jobs[id] job, ok := n.jobs[id]
// 之前此任务没有在当前结点执行 // 之前此任务没有在当前结点执行
if !ok { if !ok {
n.delIDs[id] = true
return return
} }
delete(n.jobs, id)
for _, r := range job.Rules {
for _, gid := range r.GroupIDs {
n.delLink(gid, job.ID)
}
}
cmds := job.Cmds(n.ID, n.groups) cmds := job.Cmds(n.ID, n.groups)
if len(cmds) == 0 { if len(cmds) == 0 {
return return
@ -146,6 +168,12 @@ func (n *Node) modJob(job *models.Job) {
return return
} }
for _, r := range oJob.Rules {
for _, gid := range r.GroupIDs {
n.delLink(gid, oJob.ID)
}
}
prevCmds := oJob.Cmds(n.ID, n.groups) prevCmds := oJob.Cmds(n.ID, n.groups)
*oJob = *job *oJob = *job
cmds := oJob.Cmds(n.ID, n.groups) cmds := oJob.Cmds(n.ID, n.groups)
@ -158,6 +186,12 @@ func (n *Node) modJob(job *models.Job) {
for _, cmd := range prevCmds { for _, cmd := range prevCmds {
n.delCmd(cmd) n.delCmd(cmd)
} }
for _, r := range oJob.Rules {
for _, gid := range r.GroupIDs {
n.addLink(gid, oJob.ID, oJob.Group)
}
}
} }
func (n *Node) addCmd(cmd *models.Cmd, notice bool) { func (n *Node) addCmd(cmd *models.Cmd, notice bool) {
@ -197,18 +231,18 @@ func (n *Node) delCmd(cmd *models.Cmd) {
log.Noticef("job[%s] rule[%s] timer[%s] has deleted", cmd.Job.ID, cmd.JobRule.ID, cmd.Schedule) log.Noticef("job[%s] rule[%s] timer[%s] has deleted", cmd.Job.ID, cmd.JobRule.ID, cmd.Schedule)
} }
func (n *Node) addLink(gid, jid string) { func (n *Node) addLink(gid, jid, gname string) {
if len(gid) == 0 { if len(gid) == 0 {
return return
} }
js, ok := n.link[gid] js, ok := n.link[gid]
if !ok { if !ok {
js = make(map[string]bool, 4) js = make(map[string]string, 4)
n.link[gid] = js n.link[gid] = js
} }
js[jid] = true js[jid] = gname
} }
func (n *Node) delLink(gid, jid string) { func (n *Node) delLink(gid, jid string) {
@ -224,28 +258,108 @@ func (n *Node) delLink(gid, jid string) {
delete(js, jid) delete(js, jid)
} }
func (n *Node) addGroup(g *models.Group) bool { func (n *Node) addGroup(g *models.Group) {
if !g.Included(n.ID) {
return false
}
if og, ok := n.groups[g.ID]; ok {
*og = *g
// TODO 处理相应的 jobs
return true
}
n.groups[g.ID] = g n.groups[g.ID] = g
return true
} }
func (n *Node) delGroup(g *models.Group) { func (n *Node) delGroup(id string) {
if !g.Included(n.ID) { job, ok := n.jobs[id]
// 之前此任务没有在当前结点执行
if !ok {
return return
} }
delete(n.groups, g.ID) cmds := job.Cmds(n.ID, n.groups)
// TODO 处理相应的 jobs if len(cmds) == 0 {
return
}
for _, cmd := range cmds {
n.delCmd(cmd)
}
return
}
func (n *Node) modGroup(g *models.Group) {
oGroup, ok := n.groups[g.ID]
if !ok {
n.addGroup(g)
return
}
// 都包含/都不包含当前节点,对当前节点任务无影响
if (oGroup.Included(n.ID) && g.Included(n.ID)) || (!oGroup.Included(n.ID) && !g.Included(n.ID)) {
*oGroup = *g
return
}
// 增加当前节点
if !oGroup.Included(n.ID) && g.Included(n.ID) {
n.groups[g.ID] = g
jids := n.link[g.ID]
if len(jids) == 0 {
return
}
var err error
for jid, gname := range jids {
job, ok := n.jobs[jid]
// job 之前已运行,无需改动
if ok {
continue
}
// job 已删除
if n.delIDs[jid] {
n.delLink(g.ID, jid)
continue
}
if job, err = models.GetJob(gname, jid); err != nil {
log.Warnf("get job[%s][%s] err: %s", gname, jid, err.Error())
n.delLink(g.ID, jid)
continue
}
cmds := job.Cmds(n.ID, n.groups)
for _, cmd := range cmds {
n.addCmd(cmd, true)
}
}
}
// 移除当前节点
jids := n.link[g.ID]
if len(jids) == 0 {
n.groups[g.ID] = g
return
}
for jid, _ := range jids {
job, ok := n.jobs[jid]
if !ok {
// 数据出错
log.Warnf("WTF! group[%s] job[%s]", g.ID, jid)
continue
}
n.groups[oGroup.ID] = oGroup
prevCmds := job.Cmds(n.ID, n.groups)
n.groups[g.ID] = g
cmds := job.Cmds(n.ID, n.groups)
for id, cmd := range cmds {
n.addCmd(cmd, true)
delete(prevCmds, id)
}
for _, cmd := range prevCmds {
n.delCmd(cmd)
}
}
n.groups[g.ID] = g
return
} }
func (n *Node) watchJobs() { func (n *Node) watchJobs() {
@ -270,7 +384,7 @@ func (n *Node) watchJobs() {
n.modJob(job) n.modJob(job)
case ev.Type == client.EventTypeDelete: case ev.Type == client.EventTypeDelete:
n.delJob(models.GetJobID(string(ev.Kv.Key))) n.delJob(models.GetIDFromKey(string(ev.Kv.Key)))
default: default:
log.Warnf("unknown event type[%v] from job[%s]", ev.Type, string(ev.Kv.Key)) log.Warnf("unknown event type[%v] from job[%s]", ev.Type, string(ev.Kv.Key))
} }
@ -297,26 +411,10 @@ func (n *Node) watchGroups() {
log.Warnf(err.Error()) log.Warnf(err.Error())
continue continue
} }
prevG, err := models.GetGroupFromKv(ev.PrevKv)
if err != nil {
log.Warnf(err.Error())
continue
}
if n.addGroup(g) { n.modGroup(g)
continue
}
// 此 group 已移除当前结点
n.delGroup(prevG)
case ev.Type == client.EventTypeDelete: case ev.Type == client.EventTypeDelete:
prevG, err := models.GetGroupFromKv(ev.PrevKv) n.delGroup(models.GetIDFromKey(string(ev.Kv.Key)))
if err != nil {
log.Warnf(err.Error())
continue
}
n.delGroup(prevG)
default: default:
log.Warnf("unknown event type[%v] from group[%s]", ev.Type, string(ev.Kv.Key)) log.Warnf("unknown event type[%v] from group[%s]", ev.Type, string(ev.Kv.Key))
} }
@ -340,7 +438,7 @@ func (n *Node) Run() (err error) {
n.Cron.Start() n.Cron.Start()
go n.watchJobs() go n.watchJobs()
// go n.watchGroups() go n.watchGroups()
n.Node.On() n.Node.On()
return return
} }