diff --git a/conf/conf.go b/conf/conf.go index f5369d9..f622fc7 100644 --- a/conf/conf.go +++ b/conf/conf.go @@ -45,6 +45,7 @@ type Conf struct { Node string // node 进程路径 Proc string // 当前执行任务路径 Cmd string // cmd 路径 + Once string // 马上执行任务路径 Group string // 节点分组 Ttl int64 // 节点超时时间,单位秒 @@ -158,6 +159,9 @@ func (c *Conf) reload() { return } + // etcd key 选项需要重启 + cf.Node, cf.Proc, cf.Cmd, cf.Once, cf.Group = c.Node, c.Proc, c.Cmd, c.Once, c.Group + *c = *cf log.Noticef("config file[%s] reload success", *confFile) return diff --git a/conf/files/base.json.sample b/conf/files/base.json.sample index 6db342a..e095ff9 100644 --- a/conf/files/base.json.sample +++ b/conf/files/base.json.sample @@ -3,6 +3,7 @@ "Node": "/cronsun/node/", "Proc": "/cronsun/proc/", "Cmd": "/cronsun/cmd/", + "Once": "/cronsun/once/", "Group": "/cronsun/group/", "Ttl": 10, "ReqTimeout": 2, diff --git a/models/job.go b/models/job.go index c22a366..a27ee23 100644 --- a/models/job.go +++ b/models/job.go @@ -6,6 +6,7 @@ import ( "fmt" "os/exec" "os/user" + "runtime" "strconv" "strings" "syscall" @@ -247,6 +248,18 @@ func (j *Job) Run() { j.Success(t, b.String()) } +func (j *Job) RunWithRecovery() { + defer func() { + if r := recover(); r != nil { + const size = 64 << 10 + buf := make([]byte, size) + buf = buf[:runtime.Stack(buf, false)] + log.Warnf("panic running job: %v\n%s", r, buf) + } + }() + j.Run() +} + // 从 etcd 的 key 中取 id func GetIDFromKey(key string) string { index := strings.LastIndex(key, "/") @@ -336,6 +349,22 @@ func (j *Job) Cmds(nid string, gs map[string]*Group) (cmds map[string]*Cmd) { return } +func (j Job) IsRunOn(nid string, gs map[string]*Group) bool { + for _, r := range j.Rules { + for _, id := range r.ExcludeNodeIDs { + if nid == id { + continue + } + } + + if r.included(nid, gs) { + return true + } + } + + return false +} + // 安全选项验证 func (j *Job) Valid() error { if len(j.cmd) == 0 { diff --git a/models/once.go b/models/once.go new file mode 100644 index 0000000..abb6d77 --- /dev/null +++ b/models/once.go @@ -0,0 +1,21 @@ +package models + +import ( + client "github.com/coreos/etcd/clientv3" + + "sunteng/cronsun/conf" +) + +// 马上执行 job 任务 +// 注册到 /cronsun/once/group/ +// value +// 若执行单个结点,则值为 NodeID +// 若 job 所在的结点都需执行,则值为空 "" +func PutOnce(group, jobID, nodeID string) error { + _, err := DefalutClient.Put(conf.Config.Once+group+"/"+jobID, nodeID) + return err +} + +func WatchOnce() client.WatchChan { + return DefalutClient.Watch(conf.Config.Once, client.WithPrefix()) +} diff --git a/node/node.go b/node/node.go index 418ec86..37de034 100644 --- a/node/node.go +++ b/node/node.go @@ -389,6 +389,27 @@ func (n *Node) watchGroups() { } } +func (n *Node) watchOnce() { + rch := models.WatchOnce() + for wresp := range rch { + for _, ev := range wresp.Events { + switch { + case ev.IsCreate(), ev.IsModify(): + if len(ev.Kv.Value) != 0 && string(ev.Kv.Value) != n.ID { + continue + } + + job, ok := n.jobs[models.GetIDFromKey(string(ev.Kv.Key))] + if !ok || job.IsRunOn(n.ID, n.groups) { + continue + } + + go job.RunWithRecovery() + } + } + } +} + // 启动服务 func (n *Node) Run() (err error) { go n.keepAlive()