From 3f9bd07b1b99dc1dae27e98673b61fd15b1780ed Mon Sep 17 00:00:00 2001 From: miraclesu Date: Mon, 20 Mar 2017 15:20:44 +0800 Subject: [PATCH] =?UTF-8?q?job:=20=E6=94=AF=E6=8C=81=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=E5=8F=AA=E6=9C=89=E4=B8=80=E5=8F=B0=E6=9C=BA=E5=99=A8=E6=89=A7?= =?UTF-8?q?=E8=A1=8C=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- conf/conf.go | 3 ++- conf/files/base.json.sample | 1 + models/client.go | 21 +++++++++++++++++++++ models/job.go | 30 ++++++++++++++++++++++++++++++ 4 files changed, 54 insertions(+), 1 deletion(-) diff --git a/conf/conf.go b/conf/conf.go index f2e8a4d..79ca8d4 100644 --- a/conf/conf.go +++ b/conf/conf.go @@ -46,6 +46,7 @@ type Conf struct { Proc string // 当前执行任务路径 Cmd string // cmd 路径 Once string // 马上执行任务路径 + Lock string // job lock 路径 Group string // 节点分组 Ttl int64 // 节点超时时间,单位秒 @@ -166,7 +167,7 @@ func (c *Conf) reload() { } // etcd key 选项需要重启 - cf.Node, cf.Proc, cf.Cmd, cf.Once, cf.Group = c.Node, c.Proc, c.Cmd, c.Once, c.Group + cf.Node, cf.Proc, cf.Cmd, cf.Once, cf.Lock, cf.Group = c.Node, c.Proc, c.Cmd, c.Once, c.Lock, c.Group *c = *cf log.Noticef("config file[%s] reload success", *confFile) diff --git a/conf/files/base.json.sample b/conf/files/base.json.sample index e6778dc..0daf4a6 100644 --- a/conf/files/base.json.sample +++ b/conf/files/base.json.sample @@ -4,6 +4,7 @@ "Proc": "/cronsun/proc/", "Cmd": "/cronsun/cmd/", "Once": "/cronsun/once/", + "Lock": "/cronsun/lock/", "Group": "/cronsun/group/", "#Ttl": "节点超时时间,单位秒", "Ttl": 10, diff --git a/models/client.go b/models/client.go index dc282ab..8f2f48a 100644 --- a/models/client.go +++ b/models/client.go @@ -92,6 +92,27 @@ func (c *Client) KeepAliveOnce(id client.LeaseID) (*client.LeaseKeepAliveRespons return c.Client.KeepAliveOnce(ctx, id) } +func (c *Client) GetLock(key string) (bool, error) { + key = conf.Config.Lock + key + ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout) + resp, err := DefalutClient.Txn(ctx). + If(client.Compare(client.CreateRevision(key), "=", 0)). + Then(client.OpPut(key, "")). + Commit() + cancel() + + if err != nil { + return false, err + } + + return resp.Succeeded, nil +} + +func (c *Client) DelLock(key string) error { + _, err := c.Delete(conf.Config.Lock + key) + return err +} + func IsValidAsKeyPath(s string) bool { return strings.IndexByte(s, '/') == -1 } diff --git a/models/job.go b/models/job.go index fd1cd16..13274d1 100644 --- a/models/job.go +++ b/models/job.go @@ -26,6 +26,11 @@ const ( DefaultJobGroup = "default" ) +const ( + KindCommon = iota + KindAlone // 单机执行 +) + // 需要执行的 cron cmd 命令 // 注册到 /cronsun/cmd/groupName/ type Job struct { @@ -46,6 +51,10 @@ type Job struct { // 执行任务失败重试时间间隔 // 单位秒,如果不大于 0 则马上重试 Interval int `json:"interval"` + // 任务类型 + // 0: 普通任务 + // 1: 单机任务 + Kind int `json:"kind"` // 平均执行时间,单位 ms AvgTime int64 `json:"avg_time"` @@ -225,6 +234,20 @@ func (j *Job) unlimit() { atomic.AddInt64(&j.count, -1) } +func (j *Job) lock() bool { + ok, err := DefalutClient.GetLock(j.ID) + if err != nil { + log.Noticef("job[%s] didn't get a lock", j.Key()) + } + return ok +} + +func (j *Job) unlock() { + if err := DefalutClient.DelLock(j.ID); err != nil { + log.Noticef("job[%s] del a lock err: %s", j.Key(), err.Error()) + } +} + // Run 执行任务 func (j *Job) Run() bool { var ( @@ -256,6 +279,13 @@ func (j *Job) Run() bool { } } + if j.Kind == KindAlone { + if !j.lock() { + return true + } + defer j.unlock() + } + // 超时控制 if j.Timeout > 0 { ctx, cancel := context.WithTimeout(context.Background(), time.Duration(j.Timeout)*time.Second)