2017-03-06 09:34:30 +00:00
|
|
|
|
package models
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
2017-03-10 03:06:40 +00:00
|
|
|
|
"fmt"
|
2017-03-08 09:00:22 +00:00
|
|
|
|
"sync"
|
2017-03-06 09:34:30 +00:00
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
client "github.com/coreos/etcd/clientv3"
|
|
|
|
|
|
2017-03-10 03:06:40 +00:00
|
|
|
|
"strings"
|
2017-03-07 09:07:19 +00:00
|
|
|
|
"sunteng/commons/log"
|
2017-03-06 09:34:30 +00:00
|
|
|
|
"sunteng/cronsun/conf"
|
|
|
|
|
)
|
|
|
|
|
|
2017-03-07 09:07:19 +00:00
|
|
|
|
var (
|
2017-03-08 09:00:22 +00:00
|
|
|
|
lID *leaseID
|
2017-03-07 09:07:19 +00:00
|
|
|
|
)
|
|
|
|
|
|
2017-03-08 09:00:22 +00:00
|
|
|
|
// 维持 lease id 服务
|
|
|
|
|
func StartProc() error {
|
|
|
|
|
lID = &leaseID{
|
|
|
|
|
ttl: conf.Config.ProcTtl,
|
|
|
|
|
lk: new(sync.RWMutex),
|
|
|
|
|
done: make(chan struct{}),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if lID.ttl == 0 {
|
2017-03-07 09:07:19 +00:00
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2017-03-08 09:00:22 +00:00
|
|
|
|
err := lID.set()
|
|
|
|
|
go lID.keepAlive()
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func Reload(i interface{}) {
|
|
|
|
|
if lID.ttl == conf.Config.ProcTtl {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
close(lID.done)
|
|
|
|
|
lID.done, lID.ttl = make(chan struct{}), conf.Config.ProcTtl
|
|
|
|
|
if conf.Config.ProcTtl == 0 {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if err := lID.set(); err != nil {
|
|
|
|
|
log.Warnf("proc lease id set err: %s", err.Error())
|
|
|
|
|
}
|
|
|
|
|
go lID.keepAlive()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func Exit(i interface{}) {
|
|
|
|
|
if lID.done != nil {
|
|
|
|
|
close(lID.done)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type leaseID struct {
|
|
|
|
|
ttl int64
|
|
|
|
|
ID client.LeaseID
|
|
|
|
|
lk *sync.RWMutex
|
|
|
|
|
|
|
|
|
|
done chan struct{}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (l *leaseID) get() client.LeaseID {
|
|
|
|
|
if l.ttl == 0 {
|
|
|
|
|
return -1
|
2017-03-07 09:07:19 +00:00
|
|
|
|
}
|
|
|
|
|
|
2017-03-08 09:00:22 +00:00
|
|
|
|
l.lk.RLock()
|
|
|
|
|
id := l.ID
|
|
|
|
|
l.lk.RUnlock()
|
|
|
|
|
return id
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (l *leaseID) set() error {
|
|
|
|
|
id := client.LeaseID(-1)
|
|
|
|
|
resp, err := DefalutClient.Grant(context.TODO(), l.ttl+2)
|
|
|
|
|
if err == nil {
|
|
|
|
|
id = resp.ID
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
l.lk.Lock()
|
|
|
|
|
l.ID = id
|
|
|
|
|
l.lk.Unlock()
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (l *leaseID) keepAlive() {
|
2017-03-10 03:06:40 +00:00
|
|
|
|
duration := time.Duration(l.ttl) * time.Second
|
2017-03-08 09:00:22 +00:00
|
|
|
|
timer := time.NewTimer(duration)
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-l.done:
|
|
|
|
|
return
|
|
|
|
|
case <-timer.C:
|
|
|
|
|
if l.ttl == 0 {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
id := l.get()
|
|
|
|
|
if id < 0 {
|
|
|
|
|
if err := l.set(); err != nil {
|
|
|
|
|
log.Warnf("proc lease id set err: %s, try to reset after %d seconds...", err.Error(), l.ttl)
|
|
|
|
|
}
|
|
|
|
|
timer.Reset(duration)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
_, err := DefalutClient.KeepAliveOnce(context.TODO(), l.ID)
|
|
|
|
|
if err == nil {
|
|
|
|
|
timer.Reset(duration)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
log.Warnf("proc lease id keepAlive err: %s, try to reset...", err.Error())
|
|
|
|
|
if err = l.set(); err != nil {
|
|
|
|
|
log.Warnf("proc lease id set err: %s, try to reset after %d seconds...", err.Error(), l.ttl)
|
|
|
|
|
}
|
|
|
|
|
timer.Reset(duration)
|
|
|
|
|
}
|
|
|
|
|
}
|
2017-03-07 09:07:19 +00:00
|
|
|
|
}
|
|
|
|
|
|
2017-03-06 09:34:30 +00:00
|
|
|
|
// 当前执行中的任务信息
|
2017-03-10 03:06:40 +00:00
|
|
|
|
// key: /cronsun/proc/node/group/jobId/pid
|
2017-03-06 09:34:30 +00:00
|
|
|
|
// value: 开始执行时间
|
|
|
|
|
// key 会自动过期,防止进程意外退出后没有清除相关 key,过期时间可配置
|
|
|
|
|
type Process struct {
|
2017-03-10 03:06:40 +00:00
|
|
|
|
ID string `json:"id"` // pid
|
|
|
|
|
JobID string `json:"jobId"`
|
2017-03-08 09:00:22 +00:00
|
|
|
|
Group string `json:"group"`
|
2017-03-10 03:06:40 +00:00
|
|
|
|
NodeID string `json:"nodeId"`
|
|
|
|
|
Time time.Time `json:"time"` // 开始执行时间
|
2017-03-07 09:07:19 +00:00
|
|
|
|
|
|
|
|
|
running bool
|
2017-03-06 09:34:30 +00:00
|
|
|
|
}
|
|
|
|
|
|
2017-03-10 03:06:40 +00:00
|
|
|
|
func GetProcFromKey(key string) (proc *Process, err error) {
|
|
|
|
|
ss := strings.Split(key, "/")
|
|
|
|
|
var sslen = len(ss)
|
|
|
|
|
if sslen < 5 {
|
|
|
|
|
err = fmt.Errorf("invalid proc key [%s]", err.Error())
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
proc = &Process{
|
|
|
|
|
ID: ss[sslen-1],
|
|
|
|
|
JobID: ss[sslen-2],
|
|
|
|
|
Group: ss[sslen-3],
|
|
|
|
|
NodeID: ss[sslen-4],
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
2017-03-06 09:34:30 +00:00
|
|
|
|
func (p *Process) Key() string {
|
2017-03-08 09:00:22 +00:00
|
|
|
|
return conf.Config.Proc + p.NodeID + "/" + p.Group + "/" + p.JobID + "/" + p.ID
|
2017-03-06 09:34:30 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (p *Process) Val() string {
|
|
|
|
|
return p.Time.Format(time.RFC3339)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 获取结点正在执行任务的数量
|
2017-03-09 08:12:32 +00:00
|
|
|
|
func (j *Job) CountRunning() (int64, error) {
|
2017-03-10 03:06:40 +00:00
|
|
|
|
resp, err := DefalutClient.Get(conf.Config.Proc+j.runOn+"/"+j.Group+"/"+j.ID, client.WithPrefix(), client.WithCountOnly())
|
2017-03-06 09:34:30 +00:00
|
|
|
|
if err != nil {
|
|
|
|
|
return 0, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return resp.Count, nil
|
|
|
|
|
}
|
|
|
|
|
|
2017-03-08 09:00:22 +00:00
|
|
|
|
func (p *Process) put() error {
|
|
|
|
|
id := lID.get()
|
|
|
|
|
if id < 0 {
|
2017-03-06 09:34:30 +00:00
|
|
|
|
_, err := DefalutClient.Put(p.Key(), p.Val())
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
2017-03-08 09:00:22 +00:00
|
|
|
|
_, err := DefalutClient.Put(p.Key(), p.Val(), client.WithLease(id))
|
2017-03-07 09:07:19 +00:00
|
|
|
|
return err
|
|
|
|
|
}
|
2017-03-06 09:34:30 +00:00
|
|
|
|
|
2017-03-08 09:00:22 +00:00
|
|
|
|
func (p *Process) del() error {
|
2017-03-07 09:07:19 +00:00
|
|
|
|
_, err := DefalutClient.Delete(p.Key())
|
2017-03-06 09:34:30 +00:00
|
|
|
|
return err
|
|
|
|
|
}
|
2017-03-07 03:42:08 +00:00
|
|
|
|
|
2017-03-07 09:07:19 +00:00
|
|
|
|
func (p *Process) Start() {
|
2017-03-10 03:55:46 +00:00
|
|
|
|
if p == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
2017-03-08 09:00:22 +00:00
|
|
|
|
if err := p.put(); err != nil {
|
2017-03-07 09:07:19 +00:00
|
|
|
|
log.Warnf("proc put err: %s", err.Error())
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
p.running = true
|
2017-03-07 03:42:08 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (p *Process) Stop() error {
|
2017-03-10 03:55:46 +00:00
|
|
|
|
if p == nil || !p.running {
|
2017-03-07 09:07:19 +00:00
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2017-03-08 09:00:22 +00:00
|
|
|
|
return p.del()
|
2017-03-07 03:42:08 +00:00
|
|
|
|
}
|