diff --git a/models/group.go b/models/group.go index f470979..b39f59b 100644 --- a/models/group.go +++ b/models/group.go @@ -2,9 +2,11 @@ package models import ( "encoding/json" + "fmt" "strings" client "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/mvcc/mvccpb" "sunteng/commons/log" "sunteng/cronsun/conf" @@ -63,6 +65,14 @@ func WatchGroups() client.WatchChan { return DefalutClient.Watch(conf.Config.Group, client.WithPrefix()) } +func GetGroupFromKv(kv *mvccpb.KeyValue) (g *Group, err error) { + g = new(Group) + if err = json.Unmarshal(kv.Value, g); err != nil { + err = fmt.Errorf("group[%s] umarshal err: %s", string(kv.Key), err.Error()) + } + return +} + func DeleteGroupById(id string) (*client.DeleteResponse, error) { return DefalutClient.Delete(GroupKey(id)) } diff --git a/models/job.go b/models/job.go index ce53e92..661513c 100644 --- a/models/job.go +++ b/models/job.go @@ -30,6 +30,7 @@ type Job struct { // 每个任务在单个结点上只支持一个时间规则 // 如果需要多个时间规则,需建新的任务 schedule string + gid string build bool } @@ -40,20 +41,20 @@ type JobRule struct { ExcludeNodeIDs []string `json:"exclude_nids"` } -func (j *JobRule) included(nid string, gs map[string]*Group) bool { +func (j *JobRule) included(nid string, gs map[string]*Group) (string, bool) { for _, gid := range j.GroupIDs { if _, ok := gs[gid]; ok { - return true + return gid, true } } for i, count := 0, len(j.NodeIDs); i < count; i++ { if nid == j.NodeIDs[i] { - return true + return "", true } } - return false + return "", false } func GetJob(group, id string) (job *Job, err error) { @@ -102,7 +103,7 @@ func WatchJobs() client.WatchChan { return DefalutClient.Watch(conf.Config.Cmd, client.WithPrefix()) } -func GetJobsFromKv(kv *mvccpb.KeyValue) (job *Job, err error) { +func GetJobFromKv(kv *mvccpb.KeyValue) (job *Job, err error) { job = new(Job) if err = json.Unmarshal(kv.Value, job); err != nil { err = fmt.Errorf("job[%s] umarshal err: %s", string(kv.Key), err.Error()) @@ -110,20 +111,21 @@ func GetJobsFromKv(kv *mvccpb.KeyValue) (job *Job, err error) { return } -func (j *Job) Schedule(nid string, gs map[string]*Group) string { +func (j *Job) Schedule(nid string, gs map[string]*Group, rebuild bool) (string, string) { if j.Pause { - return "" + return "", "" } - if j.build { - return j.schedule + if j.build && !rebuild { + return j.schedule, j.gid } j.buildSchedule(nid, gs) - return j.schedule + return j.schedule, j.gid } func (j *Job) buildSchedule(nid string, gs map[string]*Group) { + j.build = true for _, r := range j.Rule { for _, id := range r.ExcludeNodeIDs { if nid == id { @@ -131,8 +133,8 @@ func (j *Job) buildSchedule(nid string, gs map[string]*Group) { } } - if r.included(nid, gs) { - j.schedule = r.Timer + if gid, ok := r.included(nid, gs); ok { + j.schedule, j.gid = r.Timer, gid return } } diff --git a/node/node.go b/node/node.go index 2abec13..c299767 100644 --- a/node/node.go +++ b/node/node.go @@ -25,6 +25,9 @@ type Node struct { jobs Job groups Group + // map[group id]map[job id]bool + // 用于 group 发生变化的时候修改相应的 job + link map[string]map[string]bool ttl int64 @@ -83,14 +86,50 @@ func (n *Node) Register() (err error) { return } -func (n *Node) addJobs() { +func (n *Node) addJobs() (err error) { + if n.groups, err = models.GetGroups(n.ID); err != nil { + return + } + if n.jobs, err = newJob(n.ID, n.groups); err != nil { + return + } + + n.link = make(map[string]map[string]bool, len(n.groups)) for _, job := range n.jobs { n.addJob(job) } + return +} + +func (n *Node) addLink(gid, jid string) { + if len(gid) == 0 { + return + } + + js, ok := n.link[gid] + if !ok { + js = make(map[string]bool, 4) + n.link[gid] = js + } + + js[jid] = true +} + +func (n *Node) delLink(gid, jid string) { + if len(gid) == 0 { + return + } + + js, ok := n.link[gid] + if !ok { + return + } + + delete(js, jid) } func (n *Node) addJob(job *models.Job) bool { - sch := job.Schedule(n.ID, n.groups) + sch, gid := job.Schedule(n.ID, n.groups, false) if len(sch) == 0 { return false } @@ -111,21 +150,39 @@ func (n *Node) addJob(job *models.Job) bool { return false } + n.addLink(gid, j.GetID()) return true } func (n *Node) delJob(job *models.Job) { + sch, gid := job.Schedule(n.ID, n.groups, false) + if len(sch) == 0 { + return + } + + n.delLink(gid, job.GetID()) delete(n.jobs, job.GetID()) n.Cron.DelJob(job) } +func (n *Node) addGroup(g *models.Group) { + if !g.Included(n.ID) { + return + } + return +} + +func (n *Node) delGroup(g *models.Group) { + delete(n.groups, g.ID) +} + func (n *Node) watchJobs() { rch := models.WatchJobs() for wresp := range rch { for _, ev := range wresp.Events { switch { case ev.IsCreate(): - job, err := models.GetJobsFromKv(ev.Kv) + job, err := models.GetJobFromKv(ev.Kv) if err != nil { log.Warnf(err.Error()) continue @@ -134,12 +191,12 @@ func (n *Node) watchJobs() { n.addJob(job) case ev.IsModify(): - job, err := models.GetJobsFromKv(ev.Kv) + job, err := models.GetJobFromKv(ev.Kv) if err != nil { log.Warnf(err.Error()) continue } - prevJob, err := models.GetJobsFromKv(ev.PrevKv) + prevJob, err := models.GetJobFromKv(ev.PrevKv) if err != nil { log.Warnf(err.Error()) continue @@ -150,20 +207,16 @@ func (n *Node) watchJobs() { } // 此结点暂停或不再执行此 job - if len(prevJob.Schedule(n.ID, n.groups)) > 0 { - n.delJob(prevJob) - } + n.delJob(prevJob) case ev.Type == client.EventTypeDelete: - prevJob, err := models.GetJobsFromKv(ev.PrevKv) + prevJob, err := models.GetJobFromKv(ev.PrevKv) if err != nil { log.Warnf(err.Error()) continue } - if len(prevJob.Schedule(n.ID, n.groups)) > 0 { - n.delJob(prevJob) - } + n.delJob(prevJob) default: log.Warnf("unknown event type[%v] from job[%s]", ev.Type, string(ev.Kv.Key)) @@ -177,7 +230,21 @@ func (n *Node) watchGroups() { rch := models.WatchJobs() for wresp := range rch { for _, ev := range wresp.Events { - _ = ev + switch { + case ev.IsCreate(): + g, err := models.GetGroupFromKv(ev.Kv) + if err != nil { + log.Warnf(err.Error()) + continue + } + + n.addGroup(g) + + case ev.IsModify(): + case ev.Type == client.EventTypeDelete: + default: + log.Warnf("unknown event type[%v] from group[%s]", ev.Type, string(ev.Kv.Key)) + } } } } @@ -192,14 +259,10 @@ func (n *Node) Run() (err error) { } }() - if n.groups, err = models.GetGroups(n.ID); err != nil { - return - } - if n.jobs, err = newJob(n.ID, n.groups); err != nil { + if err = n.addJobs(); err != nil { return } - n.addJobs() n.Cron.Start() go n.watchJobs() go n.watchGroups()