node: 调整 job 更新

1. 更新删除方式,去掉 prev
2. 修复更新 job 时引用多个 *job 的问题
pull/1/head
miraclesu 2017-02-17 18:55:40 +08:00
parent b47a33687f
commit 7e1d4c1bd5
4 changed files with 35 additions and 20 deletions

View File

@ -129,7 +129,7 @@ func GetJobs() (jobs map[string]*Job, err error) {
} }
func WatchJobs() client.WatchChan { func WatchJobs() client.WatchChan {
return DefalutClient.Watch(conf.Config.Cmd, client.WithPrefix(), client.WithPrevKV()) return DefalutClient.Watch(conf.Config.Cmd, client.WithPrefix())
} }
func GetJobFromKv(kv *mvccpb.KeyValue) (job *Job, err error) { func GetJobFromKv(kv *mvccpb.KeyValue) (job *Job, err error) {
@ -223,6 +223,16 @@ func (j *Job) Run() {
j.Success(t, string(out)) j.Success(t, string(out))
} }
// 从 etcd 的 key 中取 job id
func GetJobID(key string) string {
index := strings.LastIndex(key, "/")
if index < 0 {
return ""
}
return key[index+1:]
}
func JobKey(group, id string) string { func JobKey(group, id string) string {
return conf.Config.Cmd + group + "/" + id return conf.Config.Cmd + group + "/" + id
} }

View File

@ -4,4 +4,4 @@ import (
"sunteng/cronsun/models" "sunteng/cronsun/models"
) )
type Group map[string]*models.Group type Groups map[string]*models.Group

View File

@ -6,7 +6,7 @@ import (
type Jobs map[string]*models.Job type Jobs map[string]*models.Job
func loadJobs(id string, g Group) (j Jobs, err error) { func loadJobs(id string, g Groups) (j Jobs, err error) {
jobs, err := models.GetJobs() jobs, err := models.GetJobs()
if err != nil { if err != nil {
return return

View File

@ -24,7 +24,7 @@ type Node struct {
*cron.Cron *cron.Cron
jobs Jobs jobs Jobs
groups Group groups Groups
cmds map[string]*models.Cmd cmds map[string]*models.Cmd
// map[group id]map[job id]bool // map[group id]map[job id]bool
// 用于 group 发生变化的时候修改相应的 job // 用于 group 发生变化的时候修改相应的 job
@ -113,13 +113,20 @@ func (n *Node) addJob(job *models.Job, notice bool) {
return return
} }
n.jobs[job.ID] = job
for _, cmd := range cmds { for _, cmd := range cmds {
n.addCmd(cmd, notice) n.addCmd(cmd, notice)
} }
return return
} }
func (n *Node) delJob(job *models.Job) { func (n *Node) delJob(id string) {
job, ok := n.jobs[id]
// 之前此任务没有在当前结点执行
if !ok {
return
}
cmds := job.Cmds(n.ID, n.groups) cmds := job.Cmds(n.ID, n.groups)
if len(cmds) == 0 { if len(cmds) == 0 {
return return
@ -131,8 +138,17 @@ func (n *Node) delJob(job *models.Job) {
return return
} }
func (n *Node) modJob(job, prevJob *models.Job) { func (n *Node) modJob(job *models.Job) {
cmds, prevCmds := job.Cmds(n.ID, n.groups), prevJob.Cmds(n.ID, n.groups) oJob, ok := n.jobs[job.ID]
// 之前此任务没有在当前结点执行,直接增加任务
if !ok {
n.addJob(job, true)
return
}
prevCmds := oJob.Cmds(n.ID, n.groups)
*oJob = *job
cmds := oJob.Cmds(n.ID, n.groups)
for id, cmd := range cmds { for id, cmd := range cmds {
n.addCmd(cmd, true) n.addCmd(cmd, true)
@ -251,21 +267,10 @@ func (n *Node) watchJobs() {
log.Warnf(err.Error()) log.Warnf(err.Error())
continue continue
} }
prevJob, err := models.GetJobFromKv(ev.PrevKv)
if err != nil {
log.Warnf(err.Error())
continue
}
n.modJob(job, prevJob) n.modJob(job)
case ev.Type == client.EventTypeDelete: case ev.Type == client.EventTypeDelete:
prevJob, err := models.GetJobFromKv(ev.PrevKv) n.delJob(models.GetJobID(string(ev.Kv.Key)))
if err != nil {
log.Warnf(err.Error())
continue
}
n.delJob(prevJob)
default: default:
log.Warnf("unknown event type[%v] from job[%s]", ev.Type, string(ev.Kv.Key)) log.Warnf("unknown event type[%v] from job[%s]", ev.Type, string(ev.Kv.Key))
} }