node: 修改 job 加载规则

只有结点需要执行的 job 才加载到内存
pull/1/head
miraclesu 2017-01-20 12:26:02 +08:00
parent db1692fe5e
commit c5bb3b4ce6
4 changed files with 94 additions and 55 deletions

View File

@ -32,7 +32,9 @@ func GetGroupById(gid string) (g *Group, err error) {
return return
} }
func GetGroups() (groups map[string]*Group, err error) { // GetGroups 获取包含 nid 的 group
// 如果 nid 为空,则获取所有的 group
func GetGroups(nid string) (groups map[string]*Group, err error) {
resp, err := DefalutClient.Get(conf.Config.Group, client.WithPrefix()) resp, err := DefalutClient.Get(conf.Config.Group, client.WithPrefix())
if err != nil { if err != nil {
return return
@ -50,11 +52,17 @@ func GetGroups() (groups map[string]*Group, err error) {
log.Warnf("group[%s] umarshal err: %s", string(g.Key), e.Error()) log.Warnf("group[%s] umarshal err: %s", string(g.Key), e.Error())
continue continue
} }
if len(nid) == 0 || group.Included(nid) {
groups[group.ID] = group groups[group.ID] = group
} }
}
return return
} }
func WatchGroups() client.WatchChan {
return DefalutClient.Watch(conf.Config.Group, client.WithPrefix())
}
func DeleteGroupById(id string) (*client.DeleteResponse, error) { func DeleteGroupById(id string) (*client.DeleteResponse, error) {
return DefalutClient.Delete(GroupKey(id)) return DefalutClient.Delete(GroupKey(id))
} }
@ -89,3 +97,13 @@ func (g *Group) Check() error {
return nil return nil
} }
func (g *Group) Included(nid string) bool {
for i, count := 0, len(g.NodeIDs); i < count; i++ {
if nid == g.NodeIDs[i] {
return true
}
}
return false
}

View File

@ -26,10 +26,11 @@ type Job struct {
Rule []*JobRule `json:"rule"` Rule []*JobRule `json:"rule"`
Pause bool `json:"pause"` // 可手工控制的状态 Pause bool `json:"pause"` // 可手工控制的状态
// map[ip]timer node 服务使用 // node 服务使用
// 每个任务在单个结点上只支持一个时间规则 // 每个任务在单个结点上只支持一个时间规则
// 如果需要多个时间规则,需建新的任务 // 如果需要多个时间规则,需建新的任务
Schedules map[string]string `json:"-"` schedule string
build bool
} }
type JobRule struct { type JobRule struct {
@ -39,6 +40,22 @@ type JobRule struct {
ExcludeNodeIDs []string `json:"exclude_nids"` ExcludeNodeIDs []string `json:"exclude_nids"`
} }
func (j *JobRule) included(nid string, gs map[string]*Group) bool {
for _, gid := range j.GroupIDs {
if _, ok := gs[gid]; ok {
return true
}
}
for i, count := 0, len(j.NodeIDs); i < count; i++ {
if nid == j.NodeIDs[i] {
return true
}
}
return false
}
func GetJob(group, id string) (job *Job, err error) { func GetJob(group, id string) (job *Job, err error) {
resp, err := DefalutClient.Get(JobKey(group, id)) resp, err := DefalutClient.Get(JobKey(group, id))
if err != nil { if err != nil {
@ -93,44 +110,32 @@ func GetJobsFromKv(kv *mvccpb.KeyValue) (job *Job, err error) {
return return
} }
func (j *Job) BuildSchedules(gs map[string]*Group) { func (j *Job) Schedule(nid string, gs map[string]*Group) string {
j.Schedules = make(map[string]string) if j.Pause {
return ""
}
if j.build {
return j.schedule
}
j.buildSchedule(nid, gs)
return j.schedule
}
func (j *Job) buildSchedule(nid string, gs map[string]*Group) {
for _, r := range j.Rule { for _, r := range j.Rule {
for _, gid := range r.GroupIDs {
g, ok := gs[gid]
if !ok {
continue
}
for _, id := range g.NodeIDs {
if t, ok := j.Schedules[id]; ok {
log.Warnf("job[%s] already exists timer[%s], timer[%s] will skip", j.ID, t, r.Timer)
continue
}
j.Schedules[id] = r.Timer
}
}
for _, id := range r.NodeIDs {
if t, ok := j.Schedules[id]; ok {
log.Warnf("job[%s] already exists timer[%s], timer[%s] will skip", j.ID, t, r.Timer)
continue
}
j.Schedules[id] = r.Timer
}
for _, id := range r.ExcludeNodeIDs { for _, id := range r.ExcludeNodeIDs {
delete(j.Schedules, id) if nid == id {
} return
} }
} }
func (j *Job) Schedule(id string) (string, bool) { if r.included(nid, gs) {
if len(j.Schedules) == 0 { j.schedule = r.Timer
return "", false return
}
} }
s, ok := j.Schedules[id]
return s, ok
} }
func (j *Job) GetID() string { func (j *Job) GetID() string {

View File

@ -18,9 +18,8 @@ func newJob(id string, g Group) (j Job, err error) {
continue continue
} }
job.BuildSchedules(g) if len(job.Schedule(id, g)) > 0 {
if _, ok := job.Schedule(id); ok { j[job.GetID()] = job
j[job.ID] = job
} }
} }
return return

View File

@ -90,18 +90,32 @@ func (n *Node) addJobs() {
} }
func (n *Node) addJob(job *models.Job) bool { func (n *Node) addJob(job *models.Job) bool {
sch, ok := job.Schedule(n.ID) sch := job.Schedule(n.ID, n.groups)
if !ok { if len(sch) == 0 {
return false return false
} }
if err := n.Cron.AddJob(sch, job); err != nil {
log.Warnf("job[%s] timer[%s] parse err: %s", job.ID, sch) j, ok := n.jobs[job.GetID()]
if ok {
if j != job {
*j = *job
}
} else {
j = job
n.jobs[j.GetID()] = j
}
if err := n.Cron.AddJob(sch, j); err != nil {
log.Warnf("job[%s] timer[%s] parse err: %s", j.GetID(), sch)
delete(n.jobs, j.GetID())
return false return false
} }
return true return true
} }
func (n *Node) delJob(job *models.Job) { func (n *Node) delJob(job *models.Job) {
delete(n.jobs, job.GetID())
n.Cron.DelJob(job) n.Cron.DelJob(job)
} }
@ -117,7 +131,6 @@ func (n *Node) watchJobs() {
continue continue
} }
job.BuildSchedules(n.groups)
n.addJob(job) n.addJob(job)
case ev.IsModify(): case ev.IsModify():
@ -132,14 +145,12 @@ func (n *Node) watchJobs() {
continue continue
} }
job.BuildSchedules(n.groups)
prevJob.BuildSchedules(n.groups)
if n.addJob(job) { if n.addJob(job) {
continue continue
} }
// 此结点不再执行此 job
if _, ok := prevJob.Schedule(n.ID); ok { // 此结点暂停或不再执行此 job
if len(prevJob.Schedule(n.ID, n.groups)) > 0 {
n.delJob(prevJob) n.delJob(prevJob)
} }
@ -150,8 +161,7 @@ func (n *Node) watchJobs() {
continue continue
} }
prevJob.BuildSchedules(n.groups) if len(prevJob.Schedule(n.ID, n.groups)) > 0 {
if _, ok := prevJob.Schedule(n.ID); ok {
n.delJob(prevJob) n.delJob(prevJob)
} }
@ -163,7 +173,14 @@ func (n *Node) watchJobs() {
} }
// TODO // TODO
func (n *Node) watchGroups() {} func (n *Node) watchGroups() {
rch := models.WatchJobs()
for wresp := range rch {
for _, ev := range wresp.Events {
_ = ev
}
}
}
// 启动服务 // 启动服务
func (n *Node) Run() (err error) { func (n *Node) Run() (err error) {
@ -175,7 +192,7 @@ func (n *Node) Run() (err error) {
} }
}() }()
if n.groups, err = models.GetGroups(); err != nil { if n.groups, err = models.GetGroups(n.ID); err != nil {
return return
} }
if n.jobs, err = newJob(n.ID, n.groups); err != nil { if n.jobs, err = newJob(n.ID, n.groups); err != nil {