diff --git a/models/job.go b/models/job.go index ba470ef..b858147 100644 --- a/models/job.go +++ b/models/job.go @@ -129,7 +129,7 @@ func GetJobs() (jobs map[string]*Job, err error) { } func WatchJobs() client.WatchChan { - return DefalutClient.Watch(conf.Config.Cmd, client.WithPrefix(), client.WithPrevKV()) + return DefalutClient.Watch(conf.Config.Cmd, client.WithPrefix()) } func GetJobFromKv(kv *mvccpb.KeyValue) (job *Job, err error) { @@ -223,6 +223,16 @@ func (j *Job) Run() { j.Success(t, string(out)) } +// 从 etcd 的 key 中取 job id +func GetJobID(key string) string { + index := strings.LastIndex(key, "/") + if index < 0 { + return "" + } + + return key[index+1:] +} + func JobKey(group, id string) string { return conf.Config.Cmd + group + "/" + id } diff --git a/node/group.go b/node/group.go index 034e6f7..61fdbc1 100644 --- a/node/group.go +++ b/node/group.go @@ -4,4 +4,4 @@ import ( "sunteng/cronsun/models" ) -type Group map[string]*models.Group +type Groups map[string]*models.Group diff --git a/node/job.go b/node/job.go index 55ac1c5..81b1b03 100644 --- a/node/job.go +++ b/node/job.go @@ -6,7 +6,7 @@ import ( type Jobs map[string]*models.Job -func loadJobs(id string, g Group) (j Jobs, err error) { +func loadJobs(id string, g Groups) (j Jobs, err error) { jobs, err := models.GetJobs() if err != nil { return diff --git a/node/node.go b/node/node.go index 0cb6bba..e186e91 100644 --- a/node/node.go +++ b/node/node.go @@ -24,7 +24,7 @@ type Node struct { *cron.Cron jobs Jobs - groups Group + groups Groups cmds map[string]*models.Cmd // map[group id]map[job id]bool // 用于 group 发生变化的时候修改相应的 job @@ -113,13 +113,20 @@ func (n *Node) addJob(job *models.Job, notice bool) { return } + n.jobs[job.ID] = job for _, cmd := range cmds { n.addCmd(cmd, notice) } return } -func (n *Node) delJob(job *models.Job) { +func (n *Node) delJob(id string) { + job, ok := n.jobs[id] + // 之前此任务没有在当前结点执行 + if !ok { + return + } + cmds := job.Cmds(n.ID, n.groups) if len(cmds) == 0 { return @@ -131,8 +138,17 @@ func (n *Node) delJob(job *models.Job) { return } -func (n *Node) modJob(job, prevJob *models.Job) { - cmds, prevCmds := job.Cmds(n.ID, n.groups), prevJob.Cmds(n.ID, n.groups) +func (n *Node) modJob(job *models.Job) { + oJob, ok := n.jobs[job.ID] + // 之前此任务没有在当前结点执行,直接增加任务 + if !ok { + n.addJob(job, true) + return + } + + prevCmds := oJob.Cmds(n.ID, n.groups) + *oJob = *job + cmds := oJob.Cmds(n.ID, n.groups) for id, cmd := range cmds { n.addCmd(cmd, true) @@ -251,21 +267,10 @@ func (n *Node) watchJobs() { log.Warnf(err.Error()) continue } - prevJob, err := models.GetJobFromKv(ev.PrevKv) - if err != nil { - log.Warnf(err.Error()) - continue - } - n.modJob(job, prevJob) + n.modJob(job) case ev.Type == client.EventTypeDelete: - prevJob, err := models.GetJobFromKv(ev.PrevKv) - if err != nil { - log.Warnf(err.Error()) - continue - } - - n.delJob(prevJob) + n.delJob(models.GetJobID(string(ev.Kv.Key))) default: log.Warnf("unknown event type[%v] from job[%s]", ev.Type, string(ev.Kv.Key)) }