cronsun/node/node.go

443 lines
7.6 KiB
Go
Raw Normal View History

2017-01-04 09:27:36 +00:00
package node
import (
2017-01-05 12:35:41 +00:00
"fmt"
2017-01-05 02:37:48 +00:00
"os"
2017-01-05 12:35:41 +00:00
"strconv"
"time"
"golang.org/x/net/context"
2017-01-05 02:37:48 +00:00
2017-01-04 09:27:36 +00:00
client "github.com/coreos/etcd/clientv3"
2017-01-05 02:37:48 +00:00
2017-01-06 08:44:42 +00:00
"sunteng/commons/log"
2017-01-05 02:37:48 +00:00
"sunteng/commons/util"
2017-01-05 12:35:41 +00:00
"sunteng/cronsun/conf"
2017-01-09 10:51:52 +00:00
"sunteng/cronsun/models"
"sunteng/cronsun/node/cron"
2017-01-05 12:35:41 +00:00
)
2017-01-04 09:27:36 +00:00
// Node 执行 cron 命令服务的结构体
type Node struct {
2017-01-09 10:51:52 +00:00
*models.Client
*models.Node
*cron.Cron
jobs Jobs
groups Groups
cmds map[string]*models.Cmd
link
2017-02-17 13:29:04 +00:00
// 删除的 job id用于 group 更新
delIDs map[string]bool
2017-01-05 02:37:48 +00:00
2017-01-09 10:51:52 +00:00
ttl int64
2017-01-05 12:35:41 +00:00
lID client.LeaseID // lease id
2017-01-06 08:44:42 +00:00
lch <-chan *client.LeaseKeepAliveResponse
done chan struct{}
2017-01-04 09:27:36 +00:00
}
2017-01-06 08:44:42 +00:00
func NewNode(cfg *conf.Conf) (n *Node, err error) {
2017-01-05 02:37:48 +00:00
ip, err := util.GetLocalIP()
if err != nil {
return
}
n = &Node{
2017-01-09 10:51:52 +00:00
Client: models.DefalutClient,
Node: &models.Node{
ID: ip.String(),
PID: strconv.Itoa(os.Getpid()),
},
Cron: cron.New(),
2017-01-06 08:44:42 +00:00
cmds: make(map[string]*models.Cmd),
2017-01-09 10:51:52 +00:00
ttl: cfg.Ttl,
2017-01-06 08:44:42 +00:00
done: make(chan struct{}),
2017-01-05 02:37:48 +00:00
}
return
2017-01-04 09:27:36 +00:00
}
// 注册到 /cronsun/proc/xx
2017-01-05 12:35:41 +00:00
func (n *Node) Register() (err error) {
2017-01-09 10:51:52 +00:00
pid, err := n.Node.Exist()
2017-01-05 12:35:41 +00:00
if err != nil {
return
}
if pid != -1 {
2017-01-09 10:51:52 +00:00
return fmt.Errorf("node[%s] pid[%d] exist", n.Node.ID, pid)
2017-01-05 12:35:41 +00:00
}
2017-01-06 08:59:31 +00:00
resp, err := n.Client.Grant(context.TODO(), n.ttl)
2017-01-05 12:35:41 +00:00
if err != nil {
return
}
2017-01-09 10:51:52 +00:00
if _, err = n.Node.Put(client.WithLease(resp.ID)); err != nil {
2017-01-05 12:35:41 +00:00
return
}
2017-01-06 08:44:42 +00:00
ch, err := n.Client.KeepAlive(context.TODO(), resp.ID)
if err != nil {
2017-01-05 12:35:41 +00:00
return
}
2017-01-06 08:44:42 +00:00
n.lID, n.lch = resp.ID, ch
2017-01-05 12:35:41 +00:00
return
2017-01-04 09:27:36 +00:00
}
func (n *Node) loadJobs() (err error) {
if n.groups, err = models.GetGroups(""); err != nil {
return
}
2017-02-17 13:29:04 +00:00
jobs, err := models.GetJobs()
if err != nil {
return
}
n.jobs, n.link = make(Jobs, len(jobs)), newLink(len(n.groups))
2017-02-17 13:29:04 +00:00
if len(jobs) == 0 {
2017-02-17 10:16:47 +00:00
return
}
2017-02-17 13:29:04 +00:00
for _, job := range jobs {
job.RunOn(n.ID)
2017-02-17 10:16:47 +00:00
n.addJob(job, false)
}
2017-02-17 13:29:04 +00:00
return
}
2017-02-17 10:16:47 +00:00
func (n *Node) addJob(job *models.Job, notice bool) {
2017-02-23 04:13:14 +00:00
n.link.addJob(job)
2017-02-17 13:29:04 +00:00
2017-02-17 10:16:47 +00:00
cmds := job.Cmds(n.ID, n.groups)
if len(cmds) == 0 {
return
}
n.jobs[job.ID] = job
2017-02-17 10:16:47 +00:00
for _, cmd := range cmds {
n.addCmd(cmd, notice)
}
2017-02-17 10:16:47 +00:00
return
}
func (n *Node) delJob(id string) {
job, ok := n.jobs[id]
// 之前此任务没有在当前结点执行
if !ok {
2017-02-17 13:29:04 +00:00
n.delIDs[id] = true
return
}
2017-02-17 13:29:04 +00:00
delete(n.jobs, id)
2017-02-23 04:13:14 +00:00
n.link.delJob(job)
2017-02-17 13:29:04 +00:00
2017-02-17 10:16:47 +00:00
cmds := job.Cmds(n.ID, n.groups)
if len(cmds) == 0 {
return
}
for _, cmd := range cmds {
n.delCmd(cmd)
}
return
}
func (n *Node) modJob(job *models.Job) {
oJob, ok := n.jobs[job.ID]
// 之前此任务没有在当前结点执行,直接增加任务
if !ok {
n.addJob(job, true)
return
}
2017-02-23 04:13:14 +00:00
n.link.delJob(oJob)
prevCmds := oJob.Cmds(n.ID, n.groups)
*oJob = *job
cmds := oJob.Cmds(n.ID, n.groups)
2017-02-17 10:16:47 +00:00
for id, cmd := range cmds {
n.addCmd(cmd, true)
delete(prevCmds, id)
}
for _, cmd := range prevCmds {
n.delCmd(cmd)
}
2017-02-17 13:29:04 +00:00
2017-02-23 04:13:14 +00:00
n.link.addJob(oJob)
2017-02-17 10:16:47 +00:00
}
func (n *Node) addCmd(cmd *models.Cmd, notice bool) {
c, ok := n.cmds[cmd.GetID()]
if ok {
sch := c.JobRule.Timer
*c = *cmd
// 节点执行时间不变,不用更新 cron
if c.JobRule.Timer == sch {
return
}
} else {
c = cmd
}
if err := n.Cron.AddJob(c.JobRule.Timer, c); err != nil {
msg := fmt.Sprintf("job[%s] rule[%s] timer[%s] parse err: %s", c.Job.ID, c.JobRule.ID, c.JobRule.Timer, err.Error())
log.Warn(msg)
c.Fail(time.Now(), msg)
return
}
if !ok {
n.cmds[c.GetID()] = c
2017-01-19 11:03:18 +00:00
}
2017-02-17 10:16:47 +00:00
if notice {
log.Noticef("job[%s] rule[%s] timer[%s] has added", c.Job.ID, c.JobRule.ID, c.JobRule.Timer)
2017-02-17 10:16:47 +00:00
}
return
}
2017-02-17 10:16:47 +00:00
func (n *Node) delCmd(cmd *models.Cmd) {
delete(n.cmds, cmd.GetID())
n.Cron.DelJob(cmd)
log.Noticef("job[%s] rule[%s] timer[%s] has deleted", cmd.Job.ID, cmd.JobRule.ID, cmd.JobRule.Timer)
2017-02-17 10:16:47 +00:00
}
2017-02-17 13:29:04 +00:00
func (n *Node) addGroup(g *models.Group) {
n.groups[g.ID] = g
}
func (n *Node) delGroup(id string) {
delete(n.groups, id)
n.link.delGroup(id)
2017-02-17 13:29:04 +00:00
job, ok := n.jobs[id]
// 之前此任务没有在当前结点执行
if !ok {
return
}
2017-01-20 09:42:50 +00:00
2017-02-17 13:29:04 +00:00
cmds := job.Cmds(n.ID, n.groups)
if len(cmds) == 0 {
return
2017-01-20 09:42:50 +00:00
}
2017-02-17 13:29:04 +00:00
for _, cmd := range cmds {
n.delCmd(cmd)
}
return
}
2017-02-17 13:29:04 +00:00
func (n *Node) modGroup(g *models.Group) {
oGroup, ok := n.groups[g.ID]
if !ok {
n.addGroup(g)
2017-01-20 09:42:50 +00:00
return
}
2017-02-17 13:29:04 +00:00
// 都包含/都不包含当前节点,对当前节点任务无影响
if (oGroup.Included(n.ID) && g.Included(n.ID)) || (!oGroup.Included(n.ID) && !g.Included(n.ID)) {
*oGroup = *g
return
}
// 增加当前节点
if !oGroup.Included(n.ID) && g.Included(n.ID) {
2017-02-23 07:59:21 +00:00
n.groupAddNode(g)
return
}
// 移除当前节点
n.groupRmNode(g, oGroup)
return
}
func (n *Node) groupAddNode(g *models.Group) {
n.groups[g.ID] = g
jls := n.link[g.ID]
if len(jls) == 0 {
return
}
2017-02-17 13:29:04 +00:00
2017-02-23 07:59:21 +00:00
var err error
for jid, jl := range jls {
job, ok := n.jobs[jid]
if !ok {
// job 已删除
if n.delIDs[jid] {
n.link.delGroupJob(g.ID, jid)
continue
2017-02-17 13:29:04 +00:00
}
2017-02-23 07:59:21 +00:00
if job, err = models.GetJob(jl.gname, jid); err != nil {
log.Warnf("get job[%s][%s] err: %s", jl.gname, jid, err.Error())
n.link.delGroupJob(g.ID, jid)
continue
2017-02-17 13:29:04 +00:00
}
}
2017-02-23 07:59:21 +00:00
cmds := job.Cmds(n.ID, n.groups)
for _, cmd := range cmds {
n.addCmd(cmd, true)
}
2017-02-17 13:29:04 +00:00
}
2017-02-23 07:59:21 +00:00
return
}
2017-02-17 13:29:04 +00:00
2017-02-23 07:59:21 +00:00
func (n *Node) groupRmNode(g, og *models.Group) {
jls := n.link[g.ID]
if len(jls) == 0 {
2017-02-17 13:29:04 +00:00
n.groups[g.ID] = g
return
}
for jid, _ := range jls {
2017-02-17 13:29:04 +00:00
job, ok := n.jobs[jid]
if !ok {
// 数据出错
log.Warnf("WTF! group[%s] job[%s]", g.ID, jid)
2017-02-23 04:13:14 +00:00
n.link.delGroupJob(g.ID, jid)
2017-02-17 13:29:04 +00:00
continue
}
2017-02-23 07:59:21 +00:00
n.groups[og.ID] = og
2017-02-17 13:29:04 +00:00
prevCmds := job.Cmds(n.ID, n.groups)
n.groups[g.ID] = g
cmds := job.Cmds(n.ID, n.groups)
for id, cmd := range cmds {
n.addCmd(cmd, true)
delete(prevCmds, id)
}
for _, cmd := range prevCmds {
n.delCmd(cmd)
}
}
n.groups[g.ID] = g
}
2017-01-19 11:03:18 +00:00
func (n *Node) watchJobs() {
rch := models.WatchJobs()
for wresp := range rch {
for _, ev := range wresp.Events {
switch {
case ev.IsCreate():
job, err := models.GetJobFromKv(ev.Kv)
2017-01-19 11:03:18 +00:00
if err != nil {
log.Warnf(err.Error())
continue
}
job.RunOn(n.ID)
2017-02-17 10:16:47 +00:00
n.addJob(job, true)
2017-01-19 11:03:18 +00:00
case ev.IsModify():
job, err := models.GetJobFromKv(ev.Kv)
2017-01-19 11:03:18 +00:00
if err != nil {
log.Warnf(err.Error())
continue
}
job.RunOn(n.ID)
n.modJob(job)
2017-01-19 11:03:18 +00:00
case ev.Type == client.EventTypeDelete:
2017-02-17 13:29:04 +00:00
n.delJob(models.GetIDFromKey(string(ev.Kv.Key)))
2017-01-19 11:03:18 +00:00
default:
log.Warnf("unknown event type[%v] from job[%s]", ev.Type, string(ev.Kv.Key))
}
}
}
}
func (n *Node) watchGroups() {
rch := models.WatchGroups()
for wresp := range rch {
for _, ev := range wresp.Events {
switch {
case ev.IsCreate():
g, err := models.GetGroupFromKv(ev.Kv)
if err != nil {
log.Warnf(err.Error())
continue
}
n.addGroup(g)
case ev.IsModify():
2017-01-20 09:42:50 +00:00
g, err := models.GetGroupFromKv(ev.Kv)
if err != nil {
log.Warnf(err.Error())
continue
}
2017-02-17 13:29:04 +00:00
n.modGroup(g)
case ev.Type == client.EventTypeDelete:
2017-02-17 13:29:04 +00:00
n.delGroup(models.GetIDFromKey(string(ev.Kv.Key)))
default:
log.Warnf("unknown event type[%v] from group[%s]", ev.Type, string(ev.Kv.Key))
}
}
}
}
2017-01-19 11:03:18 +00:00
2017-01-05 02:37:48 +00:00
// 启动服务
func (n *Node) Run() (err error) {
2017-01-06 08:44:42 +00:00
go n.keepAlive()
defer func() {
if err != nil {
n.Stop(nil)
}
}()
if err = n.loadJobs(); err != nil {
return
}
n.Cron.Start()
2017-02-17 10:16:47 +00:00
go n.watchJobs()
2017-02-17 13:29:04 +00:00
go n.watchGroups()
n.Node.On()
return
2017-01-06 08:44:42 +00:00
}
// 断网掉线重新注册
func (n *Node) keepAlive() {
for {
for _ = range n.lch {
}
2017-01-05 02:37:48 +00:00
2017-01-06 08:44:42 +00:00
select {
case <-n.done:
return
default:
}
time.Sleep(time.Duration(n.ttl+1) * time.Second)
log.Noticef("%s has dropped, try to reconnect...", n.String())
if err := n.Register(); err != nil {
log.Warn(err.Error())
} else {
log.Noticef("%s reconnected", n.String())
}
}
2017-01-05 02:37:48 +00:00
}
2017-01-18 08:27:35 +00:00
// 停止服务
2017-01-05 02:37:48 +00:00
func (n *Node) Stop(i interface{}) {
n.Node.Down()
2017-01-06 08:44:42 +00:00
close(n.done)
2017-01-09 10:51:52 +00:00
n.Node.Del()
2017-01-05 02:37:48 +00:00
n.Client.Close()
n.Cron.Stop()
2017-01-05 02:37:48 +00:00
}