job: 支持马上运行任务

pull/1/head
miraclesu 2017-03-09 17:43:08 +08:00
parent d18c59f6cc
commit 174e3baee5
5 changed files with 76 additions and 0 deletions

View File

@ -45,6 +45,7 @@ type Conf struct {
Node string // node 进程路径
Proc string // 当前执行任务路径
Cmd string // cmd 路径
Once string // 马上执行任务路径
Group string // 节点分组
Ttl int64 // 节点超时时间,单位秒
@ -158,6 +159,9 @@ func (c *Conf) reload() {
return
}
// etcd key 选项需要重启
cf.Node, cf.Proc, cf.Cmd, cf.Once, cf.Group = c.Node, c.Proc, c.Cmd, c.Once, c.Group
*c = *cf
log.Noticef("config file[%s] reload success", *confFile)
return

View File

@ -3,6 +3,7 @@
"Node": "/cronsun/node/",
"Proc": "/cronsun/proc/",
"Cmd": "/cronsun/cmd/",
"Once": "/cronsun/once/",
"Group": "/cronsun/group/",
"Ttl": 10,
"ReqTimeout": 2,

View File

@ -6,6 +6,7 @@ import (
"fmt"
"os/exec"
"os/user"
"runtime"
"strconv"
"strings"
"syscall"
@ -247,6 +248,18 @@ func (j *Job) Run() {
j.Success(t, b.String())
}
func (j *Job) RunWithRecovery() {
defer func() {
if r := recover(); r != nil {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
log.Warnf("panic running job: %v\n%s", r, buf)
}
}()
j.Run()
}
// 从 etcd 的 key 中取 id
func GetIDFromKey(key string) string {
index := strings.LastIndex(key, "/")
@ -336,6 +349,22 @@ func (j *Job) Cmds(nid string, gs map[string]*Group) (cmds map[string]*Cmd) {
return
}
func (j Job) IsRunOn(nid string, gs map[string]*Group) bool {
for _, r := range j.Rules {
for _, id := range r.ExcludeNodeIDs {
if nid == id {
continue
}
}
if r.included(nid, gs) {
return true
}
}
return false
}
// 安全选项验证
func (j *Job) Valid() error {
if len(j.cmd) == 0 {

21
models/once.go Normal file
View File

@ -0,0 +1,21 @@
package models
import (
client "github.com/coreos/etcd/clientv3"
"sunteng/cronsun/conf"
)
// 马上执行 job 任务
// 注册到 /cronsun/once/group/<jobID>
// value
// 若执行单个结点,则值为 NodeID
// 若 job 所在的结点都需执行,则值为空 ""
func PutOnce(group, jobID, nodeID string) error {
_, err := DefalutClient.Put(conf.Config.Once+group+"/"+jobID, nodeID)
return err
}
func WatchOnce() client.WatchChan {
return DefalutClient.Watch(conf.Config.Once, client.WithPrefix())
}

View File

@ -389,6 +389,27 @@ func (n *Node) watchGroups() {
}
}
func (n *Node) watchOnce() {
rch := models.WatchOnce()
for wresp := range rch {
for _, ev := range wresp.Events {
switch {
case ev.IsCreate(), ev.IsModify():
if len(ev.Kv.Value) != 0 && string(ev.Kv.Value) != n.ID {
continue
}
job, ok := n.jobs[models.GetIDFromKey(string(ev.Kv.Key))]
if !ok || job.IsRunOn(n.ID, n.groups) {
continue
}
go job.RunWithRecovery()
}
}
}
}
// 启动服务
func (n *Node) Run() (err error) {
go n.keepAlive()