node: 修改 & 删除 job

pull/1/head
miraclesu 2017-01-19 19:03:18 +08:00
parent 89855291f3
commit db1692fe5e
3 changed files with 93 additions and 8 deletions

View File

@ -1,13 +1,13 @@
package models
import (
"strings"
"time"
"golang.org/x/net/context"
client "github.com/coreos/etcd/clientv3"
"strings"
"sunteng/cronsun/conf"
)

View File

@ -2,9 +2,11 @@ package models
import (
"encoding/json"
"fmt"
"strings"
client "github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
"sunteng/commons/log"
"sunteng/cronsun/conf"
@ -79,6 +81,18 @@ func GetJobs() (jobs map[string]*Job, err error) {
return
}
func WatchJobs() client.WatchChan {
return DefalutClient.Watch(conf.Config.Cmd, client.WithPrefix())
}
func GetJobsFromKv(kv *mvccpb.KeyValue) (job *Job, err error) {
job = new(Job)
if err = json.Unmarshal(kv.Value, job); err != nil {
err = fmt.Errorf("job[%s] umarshal err: %s", string(kv.Key), err.Error())
}
return
}
func (j *Job) BuildSchedules(gs map[string]*Group) {
j.Schedules = make(map[string]string)
for _, r := range j.Rule {

View File

@ -85,16 +85,86 @@ func (n *Node) Register() (err error) {
func (n *Node) addJobs() {
for _, job := range n.jobs {
sch, ok := job.Schedule(n.ID)
if !ok {
continue
}
if err := n.Cron.AddJob(sch, job); err != nil {
log.Warnf("job[%s] timer[%s] parse err: %s", job.ID, sch)
n.addJob(job)
}
}
func (n *Node) addJob(job *models.Job) bool {
sch, ok := job.Schedule(n.ID)
if !ok {
return false
}
if err := n.Cron.AddJob(sch, job); err != nil {
log.Warnf("job[%s] timer[%s] parse err: %s", job.ID, sch)
return false
}
return true
}
func (n *Node) delJob(job *models.Job) {
n.Cron.DelJob(job)
}
func (n *Node) watchJobs() {
rch := models.WatchJobs()
for wresp := range rch {
for _, ev := range wresp.Events {
switch {
case ev.IsCreate():
job, err := models.GetJobsFromKv(ev.Kv)
if err != nil {
log.Warnf(err.Error())
continue
}
job.BuildSchedules(n.groups)
n.addJob(job)
case ev.IsModify():
job, err := models.GetJobsFromKv(ev.Kv)
if err != nil {
log.Warnf(err.Error())
continue
}
prevJob, err := models.GetJobsFromKv(ev.PrevKv)
if err != nil {
log.Warnf(err.Error())
continue
}
job.BuildSchedules(n.groups)
prevJob.BuildSchedules(n.groups)
if n.addJob(job) {
continue
}
// 此结点不再执行此 job
if _, ok := prevJob.Schedule(n.ID); ok {
n.delJob(prevJob)
}
case ev.Type == client.EventTypeDelete:
prevJob, err := models.GetJobsFromKv(ev.PrevKv)
if err != nil {
log.Warnf(err.Error())
continue
}
prevJob.BuildSchedules(n.groups)
if _, ok := prevJob.Schedule(n.ID); ok {
n.delJob(prevJob)
}
default:
log.Warnf("unknown event type[%v] from job[%s]", ev.Type, string(ev.Kv.Key))
}
}
}
}
// TODO
func (n *Node) watchGroups() {}
// 启动服务
func (n *Node) Run() (err error) {
go n.keepAlive()
@ -113,8 +183,9 @@ func (n *Node) Run() (err error) {
}
n.addJobs()
// TODO add&del job
n.Cron.Start()
go n.watchJobs()
go n.watchGroups()
return
}