job: 支持配置只有一台机器执行任务

pull/1/head
miraclesu 2017-03-20 15:20:44 +08:00
parent c207e96a51
commit 3f9bd07b1b
4 changed files with 54 additions and 1 deletions

View File

@ -46,6 +46,7 @@ type Conf struct {
Proc string // 当前执行任务路径
Cmd string // cmd 路径
Once string // 马上执行任务路径
Lock string // job lock 路径
Group string // 节点分组
Ttl int64 // 节点超时时间,单位秒
@ -166,7 +167,7 @@ func (c *Conf) reload() {
}
// etcd key 选项需要重启
cf.Node, cf.Proc, cf.Cmd, cf.Once, cf.Group = c.Node, c.Proc, c.Cmd, c.Once, c.Group
cf.Node, cf.Proc, cf.Cmd, cf.Once, cf.Lock, cf.Group = c.Node, c.Proc, c.Cmd, c.Once, c.Lock, c.Group
*c = *cf
log.Noticef("config file[%s] reload success", *confFile)

View File

@ -4,6 +4,7 @@
"Proc": "/cronsun/proc/",
"Cmd": "/cronsun/cmd/",
"Once": "/cronsun/once/",
"Lock": "/cronsun/lock/",
"Group": "/cronsun/group/",
"#Ttl": "节点超时时间,单位秒",
"Ttl": 10,

View File

@ -92,6 +92,27 @@ func (c *Client) KeepAliveOnce(id client.LeaseID) (*client.LeaseKeepAliveRespons
return c.Client.KeepAliveOnce(ctx, id)
}
func (c *Client) GetLock(key string) (bool, error) {
key = conf.Config.Lock + key
ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout)
resp, err := DefalutClient.Txn(ctx).
If(client.Compare(client.CreateRevision(key), "=", 0)).
Then(client.OpPut(key, "")).
Commit()
cancel()
if err != nil {
return false, err
}
return resp.Succeeded, nil
}
func (c *Client) DelLock(key string) error {
_, err := c.Delete(conf.Config.Lock + key)
return err
}
func IsValidAsKeyPath(s string) bool {
return strings.IndexByte(s, '/') == -1
}

View File

@ -26,6 +26,11 @@ const (
DefaultJobGroup = "default"
)
const (
KindCommon = iota
KindAlone // 单机执行
)
// 需要执行的 cron cmd 命令
// 注册到 /cronsun/cmd/groupName/<id>
type Job struct {
@ -46,6 +51,10 @@ type Job struct {
// 执行任务失败重试时间间隔
// 单位秒,如果不大于 0 则马上重试
Interval int `json:"interval"`
// 任务类型
// 0: 普通任务
// 1: 单机任务
Kind int `json:"kind"`
// 平均执行时间,单位 ms
AvgTime int64 `json:"avg_time"`
@ -225,6 +234,20 @@ func (j *Job) unlimit() {
atomic.AddInt64(&j.count, -1)
}
func (j *Job) lock() bool {
ok, err := DefalutClient.GetLock(j.ID)
if err != nil {
log.Noticef("job[%s] didn't get a lock", j.Key())
}
return ok
}
func (j *Job) unlock() {
if err := DefalutClient.DelLock(j.ID); err != nil {
log.Noticef("job[%s] del a lock err: %s", j.Key(), err.Error())
}
}
// Run 执行任务
func (j *Job) Run() bool {
var (
@ -256,6 +279,13 @@ func (j *Job) Run() bool {
}
}
if j.Kind == KindAlone {
if !j.lock() {
return true
}
defer j.unlock()
}
// 超时控制
if j.Timeout > 0 {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(j.Timeout)*time.Second)