node: 调整 node 结构

方便 group 修改时处理相应的 jobs
pull/1/head
miraclesu 2017-01-20 17:11:26 +08:00
parent c5bb3b4ce6
commit d2640709bb
3 changed files with 105 additions and 30 deletions

View File

@ -2,9 +2,11 @@ package models
import (
"encoding/json"
"fmt"
"strings"
client "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
"sunteng/commons/log"
"sunteng/cronsun/conf"
@ -63,6 +65,14 @@ func WatchGroups() client.WatchChan {
return DefalutClient.Watch(conf.Config.Group, client.WithPrefix())
}
func GetGroupFromKv(kv *mvccpb.KeyValue) (g *Group, err error) {
g = new(Group)
if err = json.Unmarshal(kv.Value, g); err != nil {
err = fmt.Errorf("group[%s] umarshal err: %s", string(kv.Key), err.Error())
}
return
}
func DeleteGroupById(id string) (*client.DeleteResponse, error) {
return DefalutClient.Delete(GroupKey(id))
}

View File

@ -30,6 +30,7 @@ type Job struct {
// 每个任务在单个结点上只支持一个时间规则
// 如果需要多个时间规则,需建新的任务
schedule string
gid string
build bool
}
@ -40,20 +41,20 @@ type JobRule struct {
ExcludeNodeIDs []string `json:"exclude_nids"`
}
func (j *JobRule) included(nid string, gs map[string]*Group) bool {
func (j *JobRule) included(nid string, gs map[string]*Group) (string, bool) {
for _, gid := range j.GroupIDs {
if _, ok := gs[gid]; ok {
return true
return gid, true
}
}
for i, count := 0, len(j.NodeIDs); i < count; i++ {
if nid == j.NodeIDs[i] {
return true
return "", true
}
}
return false
return "", false
}
func GetJob(group, id string) (job *Job, err error) {
@ -102,7 +103,7 @@ func WatchJobs() client.WatchChan {
return DefalutClient.Watch(conf.Config.Cmd, client.WithPrefix())
}
func GetJobsFromKv(kv *mvccpb.KeyValue) (job *Job, err error) {
func GetJobFromKv(kv *mvccpb.KeyValue) (job *Job, err error) {
job = new(Job)
if err = json.Unmarshal(kv.Value, job); err != nil {
err = fmt.Errorf("job[%s] umarshal err: %s", string(kv.Key), err.Error())
@ -110,20 +111,21 @@ func GetJobsFromKv(kv *mvccpb.KeyValue) (job *Job, err error) {
return
}
func (j *Job) Schedule(nid string, gs map[string]*Group) string {
func (j *Job) Schedule(nid string, gs map[string]*Group, rebuild bool) (string, string) {
if j.Pause {
return ""
return "", ""
}
if j.build {
return j.schedule
if j.build && !rebuild {
return j.schedule, j.gid
}
j.buildSchedule(nid, gs)
return j.schedule
return j.schedule, j.gid
}
func (j *Job) buildSchedule(nid string, gs map[string]*Group) {
j.build = true
for _, r := range j.Rule {
for _, id := range r.ExcludeNodeIDs {
if nid == id {
@ -131,8 +133,8 @@ func (j *Job) buildSchedule(nid string, gs map[string]*Group) {
}
}
if r.included(nid, gs) {
j.schedule = r.Timer
if gid, ok := r.included(nid, gs); ok {
j.schedule, j.gid = r.Timer, gid
return
}
}

View File

@ -25,6 +25,9 @@ type Node struct {
jobs Job
groups Group
// map[group id]map[job id]bool
// 用于 group 发生变化的时候修改相应的 job
link map[string]map[string]bool
ttl int64
@ -83,14 +86,50 @@ func (n *Node) Register() (err error) {
return
}
func (n *Node) addJobs() {
func (n *Node) addJobs() (err error) {
if n.groups, err = models.GetGroups(n.ID); err != nil {
return
}
if n.jobs, err = newJob(n.ID, n.groups); err != nil {
return
}
n.link = make(map[string]map[string]bool, len(n.groups))
for _, job := range n.jobs {
n.addJob(job)
}
return
}
func (n *Node) addLink(gid, jid string) {
if len(gid) == 0 {
return
}
js, ok := n.link[gid]
if !ok {
js = make(map[string]bool, 4)
n.link[gid] = js
}
js[jid] = true
}
func (n *Node) delLink(gid, jid string) {
if len(gid) == 0 {
return
}
js, ok := n.link[gid]
if !ok {
return
}
delete(js, jid)
}
func (n *Node) addJob(job *models.Job) bool {
sch := job.Schedule(n.ID, n.groups)
sch, gid := job.Schedule(n.ID, n.groups, false)
if len(sch) == 0 {
return false
}
@ -111,21 +150,39 @@ func (n *Node) addJob(job *models.Job) bool {
return false
}
n.addLink(gid, j.GetID())
return true
}
func (n *Node) delJob(job *models.Job) {
sch, gid := job.Schedule(n.ID, n.groups, false)
if len(sch) == 0 {
return
}
n.delLink(gid, job.GetID())
delete(n.jobs, job.GetID())
n.Cron.DelJob(job)
}
func (n *Node) addGroup(g *models.Group) {
if !g.Included(n.ID) {
return
}
return
}
func (n *Node) delGroup(g *models.Group) {
delete(n.groups, g.ID)
}
func (n *Node) watchJobs() {
rch := models.WatchJobs()
for wresp := range rch {
for _, ev := range wresp.Events {
switch {
case ev.IsCreate():
job, err := models.GetJobsFromKv(ev.Kv)
job, err := models.GetJobFromKv(ev.Kv)
if err != nil {
log.Warnf(err.Error())
continue
@ -134,12 +191,12 @@ func (n *Node) watchJobs() {
n.addJob(job)
case ev.IsModify():
job, err := models.GetJobsFromKv(ev.Kv)
job, err := models.GetJobFromKv(ev.Kv)
if err != nil {
log.Warnf(err.Error())
continue
}
prevJob, err := models.GetJobsFromKv(ev.PrevKv)
prevJob, err := models.GetJobFromKv(ev.PrevKv)
if err != nil {
log.Warnf(err.Error())
continue
@ -150,20 +207,16 @@ func (n *Node) watchJobs() {
}
// 此结点暂停或不再执行此 job
if len(prevJob.Schedule(n.ID, n.groups)) > 0 {
n.delJob(prevJob)
}
case ev.Type == client.EventTypeDelete:
prevJob, err := models.GetJobsFromKv(ev.PrevKv)
prevJob, err := models.GetJobFromKv(ev.PrevKv)
if err != nil {
log.Warnf(err.Error())
continue
}
if len(prevJob.Schedule(n.ID, n.groups)) > 0 {
n.delJob(prevJob)
}
default:
log.Warnf("unknown event type[%v] from job[%s]", ev.Type, string(ev.Kv.Key))
@ -177,7 +230,21 @@ func (n *Node) watchGroups() {
rch := models.WatchJobs()
for wresp := range rch {
for _, ev := range wresp.Events {
_ = ev
switch {
case ev.IsCreate():
g, err := models.GetGroupFromKv(ev.Kv)
if err != nil {
log.Warnf(err.Error())
continue
}
n.addGroup(g)
case ev.IsModify():
case ev.Type == client.EventTypeDelete:
default:
log.Warnf("unknown event type[%v] from group[%s]", ev.Type, string(ev.Kv.Key))
}
}
}
}
@ -192,14 +259,10 @@ func (n *Node) Run() (err error) {
}
}()
if n.groups, err = models.GetGroups(n.ID); err != nil {
return
}
if n.jobs, err = newJob(n.ID, n.groups); err != nil {
if err = n.addJobs(); err != nil {
return
}
n.addJobs()
n.Cron.Start()
go n.watchJobs()
go n.watchGroups()