node: 调整 job rule 解释规则

一个结点支持一个 job 多个执行时间
pull/1/head
miraclesu 8 years ago
parent cf70099fd2
commit 2e907ece29

@ -24,7 +24,6 @@ func main() {
//set cpu usage
runtime.GOMAXPROCS(*gomax)
// models.InitPwd()
if err := models.Init(); err != nil {
log.Error(err.Error())
return

@ -44,26 +44,39 @@ type Job struct {
}
type JobRule struct {
Id string `json:"id"`
ID string `json:"id"`
Timer string `json:"timer"`
GroupIDs []string `json:"gids"`
NodeIDs []string `json:"nids"`
ExcludeNodeIDs []string `json:"exclude_nids"`
}
func (j *JobRule) included(nid string, gs map[string]*Group) (string, bool) {
for _, gid := range j.GroupIDs {
if _, ok := gs[gid]; ok {
return gid, true
}
}
type Cmd struct {
*Job
*JobRule
Schedule string
Gid string
}
func (c *Cmd) GetID() string {
return c.Job.ID + c.JobRule.ID
}
// 优先取结点里的值,更新 group 时可用 gid 判断是否对 job 进行处理
func (j *JobRule) included(nid string, gs map[string]*Group) (string, bool) {
for i, count := 0, len(j.NodeIDs); i < count; i++ {
if nid == j.NodeIDs[i] {
return "", true
}
}
for _, gid := range j.GroupIDs {
if _, ok := gs[gid]; ok {
return gid, true
}
}
return "", false
}
@ -241,9 +254,9 @@ func (j *Job) Check() error {
j.User = strings.TrimSpace(j.User)
for i := range j.Rules {
id := strings.TrimSpace(j.Rules[i].Id)
id := strings.TrimSpace(j.Rules[i].ID)
if id == "" || strings.HasPrefix(id, "NEW") {
j.Rules[i].Id = NextID()
j.Rules[i].ID = NextID()
}
}
@ -263,3 +276,28 @@ func (j *Job) Success(t time.Time, out string) {
func (j *Job) Fail(t time.Time, msg string) {
CreateJobLog(j, t, msg, false)
}
func (j *Job) Cmds(nid string, gs map[string]*Group) (cmds map[string]*Cmd) {
cmds = make(map[string]*Cmd)
for _, r := range j.Rules {
for _, id := range r.ExcludeNodeIDs {
if nid == id {
continue
}
}
if gid, ok := r.included(nid, gs); ok {
cmd := &Cmd{
Job: j,
JobRule: r,
Schedule: r.Timer,
Gid: gid,
}
j.RunOn(nid)
cmds[cmd.GetID()] = cmd
}
}
return
}

@ -4,15 +4,15 @@ import (
"sunteng/cronsun/models"
)
type Job map[string]*models.Job
type Jobs map[string]*models.Job
func newJob(id string, g Group) (j Job, err error) {
func loadJobs(id string, g Group) (j Jobs, err error) {
jobs, err := models.GetJobs()
if err != nil {
return
}
j = make(Job, len(jobs))
j = make(Jobs, len(jobs))
for _, job := range jobs {
if sch, _ := job.Schedule(id, g, false); len(sch) > 0 {
j[job.GetID()] = job

@ -23,8 +23,9 @@ type Node struct {
*models.Node
*cron.Cron
jobs Job
jobs Jobs
groups Group
cmds map[string]*models.Cmd
// map[group id]map[job id]bool
// 用于 group 发生变化的时候修改相应的 job
link map[string]map[string]bool
@ -51,6 +52,8 @@ func NewNode(cfg *conf.Conf) (n *Node, err error) {
},
Cron: cron.New(),
cmds: make(map[string]*models.Cmd),
ttl: cfg.Ttl,
done: make(chan struct{}),
}
@ -86,17 +89,53 @@ func (n *Node) Register() (err error) {
return
}
func (n *Node) addJobs() (err error) {
func (n *Node) loadJobs() (err error) {
if n.groups, err = models.GetGroups(n.ID); err != nil {
return
}
if n.jobs, err = newJob(n.ID, n.groups); err != nil {
if n.jobs, err = loadJobs(n.ID, n.groups); err != nil {
return
}
n.addCmds()
return
}
func (n *Node) addCmds() {
if len(n.jobs) == 0 {
return
}
n.link = make(map[string]map[string]bool, len(n.groups))
for _, job := range n.jobs {
n.addJob(job)
for _, cmd := range job.Cmds(n.ID, n.groups) {
n.addCmd(cmd)
}
}
}
func (n *Node) addCmd(cmd *models.Cmd) {
c, ok := n.cmds[cmd.GetID()]
if ok {
sch := c.Schedule
*c = *cmd
// 节点执行时间不变,不用更新 cron
if c.Schedule == sch {
return
}
} else {
c = cmd
}
if err := n.Cron.AddJob(c.Schedule, c); err != nil {
msg := fmt.Sprintf("job[%s] rule[%s] timer[%s] parse err: %s", c.Job.ID, c.JobRule.ID, c.Schedule, err.Error())
log.Warn(msg)
c.Fail(time.Now(), msg)
return
}
if !ok {
n.cmds[c.GetID()] = c
}
return
}
@ -298,13 +337,13 @@ func (n *Node) Run() (err error) {
}
}()
if err = n.addJobs(); err != nil {
if err = n.loadJobs(); err != nil {
return
}
n.Cron.Start()
go n.watchJobs()
go n.watchGroups()
// go n.watchJobs()
// go n.watchGroups()
n.Node.On()
return
}

Loading…
Cancel
Save