From 2e907ece29b9e662c5b1e07a822114ed28a05c81 Mon Sep 17 00:00:00 2001 From: miraclesu Date: Fri, 17 Feb 2017 17:09:46 +0800 Subject: [PATCH] =?UTF-8?q?node:=20=E8=B0=83=E6=95=B4=20job=20rule=20?= =?UTF-8?q?=E8=A7=A3=E9=87=8A=E8=A7=84=E5=88=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 一个结点支持一个 job 多个执行时间 --- bin/node/server.go | 1 - models/job.go | 56 ++++++++++++++++++++++++++++++++++++++-------- node/job.go | 6 ++--- node/node.go | 55 ++++++++++++++++++++++++++++++++++++++------- 4 files changed, 97 insertions(+), 21 deletions(-) diff --git a/bin/node/server.go b/bin/node/server.go index 303f219..f8d5649 100644 --- a/bin/node/server.go +++ b/bin/node/server.go @@ -24,7 +24,6 @@ func main() { //set cpu usage runtime.GOMAXPROCS(*gomax) - // models.InitPwd() if err := models.Init(); err != nil { log.Error(err.Error()) return diff --git a/models/job.go b/models/job.go index 55ff051..ba470ef 100644 --- a/models/job.go +++ b/models/job.go @@ -44,26 +44,39 @@ type Job struct { } type JobRule struct { - Id string `json:"id"` + ID string `json:"id"` Timer string `json:"timer"` GroupIDs []string `json:"gids"` NodeIDs []string `json:"nids"` ExcludeNodeIDs []string `json:"exclude_nids"` } -func (j *JobRule) included(nid string, gs map[string]*Group) (string, bool) { - for _, gid := range j.GroupIDs { - if _, ok := gs[gid]; ok { - return gid, true - } - } +type Cmd struct { + *Job + *JobRule + + Schedule string + Gid string +} +func (c *Cmd) GetID() string { + return c.Job.ID + c.JobRule.ID +} + +// 优先取结点里的值,更新 group 时可用 gid 判断是否对 job 进行处理 +func (j *JobRule) included(nid string, gs map[string]*Group) (string, bool) { for i, count := 0, len(j.NodeIDs); i < count; i++ { if nid == j.NodeIDs[i] { return "", true } } + for _, gid := range j.GroupIDs { + if _, ok := gs[gid]; ok { + return gid, true + } + } + return "", false } @@ -241,9 +254,9 @@ func (j *Job) Check() error { j.User = strings.TrimSpace(j.User) for i := range j.Rules { - id := strings.TrimSpace(j.Rules[i].Id) + id := strings.TrimSpace(j.Rules[i].ID) if id == "" || strings.HasPrefix(id, "NEW") { - j.Rules[i].Id = NextID() + j.Rules[i].ID = NextID() } } @@ -263,3 +276,28 @@ func (j *Job) Success(t time.Time, out string) { func (j *Job) Fail(t time.Time, msg string) { CreateJobLog(j, t, msg, false) } + +func (j *Job) Cmds(nid string, gs map[string]*Group) (cmds map[string]*Cmd) { + cmds = make(map[string]*Cmd) + for _, r := range j.Rules { + for _, id := range r.ExcludeNodeIDs { + if nid == id { + continue + } + } + + if gid, ok := r.included(nid, gs); ok { + cmd := &Cmd{ + Job: j, + JobRule: r, + + Schedule: r.Timer, + Gid: gid, + } + j.RunOn(nid) + cmds[cmd.GetID()] = cmd + } + } + + return +} diff --git a/node/job.go b/node/job.go index 00218ca..55ac1c5 100644 --- a/node/job.go +++ b/node/job.go @@ -4,15 +4,15 @@ import ( "sunteng/cronsun/models" ) -type Job map[string]*models.Job +type Jobs map[string]*models.Job -func newJob(id string, g Group) (j Job, err error) { +func loadJobs(id string, g Group) (j Jobs, err error) { jobs, err := models.GetJobs() if err != nil { return } - j = make(Job, len(jobs)) + j = make(Jobs, len(jobs)) for _, job := range jobs { if sch, _ := job.Schedule(id, g, false); len(sch) > 0 { j[job.GetID()] = job diff --git a/node/node.go b/node/node.go index 73a8bf5..35e859b 100644 --- a/node/node.go +++ b/node/node.go @@ -23,8 +23,9 @@ type Node struct { *models.Node *cron.Cron - jobs Job + jobs Jobs groups Group + cmds map[string]*models.Cmd // map[group id]map[job id]bool // 用于 group 发生变化的时候修改相应的 job link map[string]map[string]bool @@ -51,6 +52,8 @@ func NewNode(cfg *conf.Conf) (n *Node, err error) { }, Cron: cron.New(), + cmds: make(map[string]*models.Cmd), + ttl: cfg.Ttl, done: make(chan struct{}), } @@ -86,17 +89,53 @@ func (n *Node) Register() (err error) { return } -func (n *Node) addJobs() (err error) { +func (n *Node) loadJobs() (err error) { if n.groups, err = models.GetGroups(n.ID); err != nil { return } - if n.jobs, err = newJob(n.ID, n.groups); err != nil { + if n.jobs, err = loadJobs(n.ID, n.groups); err != nil { + return + } + + n.addCmds() + return +} + +func (n *Node) addCmds() { + if len(n.jobs) == 0 { return } - n.link = make(map[string]map[string]bool, len(n.groups)) for _, job := range n.jobs { - n.addJob(job) + for _, cmd := range job.Cmds(n.ID, n.groups) { + n.addCmd(cmd) + } + } +} + +func (n *Node) addCmd(cmd *models.Cmd) { + c, ok := n.cmds[cmd.GetID()] + if ok { + sch := c.Schedule + *c = *cmd + + // 节点执行时间不变,不用更新 cron + if c.Schedule == sch { + return + } + } else { + c = cmd + } + + if err := n.Cron.AddJob(c.Schedule, c); err != nil { + msg := fmt.Sprintf("job[%s] rule[%s] timer[%s] parse err: %s", c.Job.ID, c.JobRule.ID, c.Schedule, err.Error()) + log.Warn(msg) + c.Fail(time.Now(), msg) + return + } + + if !ok { + n.cmds[c.GetID()] = c } return } @@ -298,13 +337,13 @@ func (n *Node) Run() (err error) { } }() - if err = n.addJobs(); err != nil { + if err = n.loadJobs(); err != nil { return } n.Cron.Start() - go n.watchJobs() - go n.watchGroups() + // go n.watchJobs() + // go n.watchGroups() n.Node.On() return }