cronsun/proc.go

257 lines
4.6 KiB
Go
Raw Normal View History

2017-05-12 06:48:24 +00:00
package cronsun
2017-03-06 09:34:30 +00:00
import (
2017-03-10 03:06:40 +00:00
"fmt"
2017-05-12 07:38:50 +00:00
"strings"
2017-03-08 09:00:22 +00:00
"sync"
"sync/atomic"
2017-03-06 09:34:30 +00:00
"time"
client "github.com/coreos/etcd/clientv3"
2017-05-09 10:27:32 +00:00
"github.com/shunfei/cronsun/conf"
2017-05-12 07:38:50 +00:00
"github.com/shunfei/cronsun/log"
2017-03-06 09:34:30 +00:00
)
var (
2017-03-08 09:00:22 +00:00
lID *leaseID
)
2017-03-08 09:00:22 +00:00
// 维持 lease id 服务
func StartProc() error {
lID = &leaseID{
ttl: conf.Config.ProcTtl,
lk: new(sync.RWMutex),
done: make(chan struct{}),
}
if lID.ttl == 0 {
return nil
}
2017-03-08 09:00:22 +00:00
err := lID.set()
go lID.keepAlive()
return err
}
func Reload(i interface{}) {
if lID.ttl == conf.Config.ProcTtl {
return
}
close(lID.done)
lID.done, lID.ttl = make(chan struct{}), conf.Config.ProcTtl
if conf.Config.ProcTtl == 0 {
return
}
if err := lID.set(); err != nil {
log.Warnf("proc lease id set err: %s", err.Error())
}
go lID.keepAlive()
}
func Exit(i interface{}) {
if lID.done != nil {
close(lID.done)
}
}
type leaseID struct {
ttl int64
ID client.LeaseID
lk *sync.RWMutex
done chan struct{}
}
func (l *leaseID) get() client.LeaseID {
if l.ttl == 0 {
return -1
}
2017-03-08 09:00:22 +00:00
l.lk.RLock()
id := l.ID
l.lk.RUnlock()
return id
}
func (l *leaseID) set() error {
id := client.LeaseID(-1)
2017-03-20 04:03:54 +00:00
resp, err := DefalutClient.Grant(l.ttl + 2)
2017-03-08 09:00:22 +00:00
if err == nil {
id = resp.ID
}
l.lk.Lock()
l.ID = id
l.lk.Unlock()
return err
}
func (l *leaseID) keepAlive() {
2017-03-10 03:06:40 +00:00
duration := time.Duration(l.ttl) * time.Second
2017-03-08 09:00:22 +00:00
timer := time.NewTimer(duration)
for {
select {
case <-l.done:
return
case <-timer.C:
if l.ttl == 0 {
return
}
id := l.get()
2017-03-14 07:23:53 +00:00
if id > 0 {
2017-03-20 04:03:54 +00:00
_, err := DefalutClient.KeepAliveOnce(l.ID)
2017-03-14 07:23:53 +00:00
if err == nil {
timer.Reset(duration)
continue
2017-03-08 09:00:22 +00:00
}
2017-03-14 07:23:53 +00:00
log.Warnf("proc lease id[%x] keepAlive err: %s, try to reset...", id, err.Error())
2017-03-08 09:00:22 +00:00
}
2017-03-14 07:23:53 +00:00
if err := l.set(); err != nil {
2017-03-08 09:00:22 +00:00
log.Warnf("proc lease id set err: %s, try to reset after %d seconds...", err.Error(), l.ttl)
2017-03-14 07:23:53 +00:00
} else {
2017-05-12 07:38:50 +00:00
log.Infof("proc set lease id[%x] success", l.get())
2017-03-08 09:00:22 +00:00
}
timer.Reset(duration)
}
}
}
2017-03-06 09:34:30 +00:00
// 当前执行中的任务信息
2017-03-10 03:06:40 +00:00
// key: /cronsun/proc/node/group/jobId/pid
2017-03-06 09:34:30 +00:00
// value: 开始执行时间
// key 会自动过期,防止进程意外退出后没有清除相关 key过期时间可配置
type Process struct {
2017-03-10 03:06:40 +00:00
ID string `json:"id"` // pid
JobID string `json:"jobId"`
2017-03-08 09:00:22 +00:00
Group string `json:"group"`
2017-03-10 03:06:40 +00:00
NodeID string `json:"nodeId"`
Time time.Time `json:"time"` // 开始执行时间
running int32
hasPut int32
wg sync.WaitGroup
done chan struct{}
2017-03-06 09:34:30 +00:00
}
2017-03-10 03:06:40 +00:00
func GetProcFromKey(key string) (proc *Process, err error) {
ss := strings.Split(key, "/")
var sslen = len(ss)
if sslen < 5 {
2017-03-16 02:30:31 +00:00
err = fmt.Errorf("invalid proc key [%s]", key)
2017-03-10 03:06:40 +00:00
return
}
proc = &Process{
ID: ss[sslen-1],
JobID: ss[sslen-2],
Group: ss[sslen-3],
NodeID: ss[sslen-4],
}
return
}
2017-03-06 09:34:30 +00:00
func (p *Process) Key() string {
2017-03-08 09:00:22 +00:00
return conf.Config.Proc + p.NodeID + "/" + p.Group + "/" + p.JobID + "/" + p.ID
2017-03-06 09:34:30 +00:00
}
func (p *Process) Val() string {
return p.Time.Format(time.RFC3339)
}
// 获取结点正在执行任务的数量
2017-03-09 08:12:32 +00:00
func (j *Job) CountRunning() (int64, error) {
2017-03-10 03:06:40 +00:00
resp, err := DefalutClient.Get(conf.Config.Proc+j.runOn+"/"+j.Group+"/"+j.ID, client.WithPrefix(), client.WithCountOnly())
2017-03-06 09:34:30 +00:00
if err != nil {
return 0, err
}
return resp.Count, nil
}
// put 出错也进行 del 操作
// 有可能某种原因put 命令已经发送到 etcd server
// 目前已知的 deadline 会出现此情况
func (p *Process) put() (err error) {
if atomic.LoadInt32(&p.running) != 1 {
return
}
if !atomic.CompareAndSwapInt32(&p.hasPut, 0, 1) {
return
}
2017-03-08 09:00:22 +00:00
id := lID.get()
if id < 0 {
if _, err = DefalutClient.Put(p.Key(), p.Val()); err != nil {
return
}
2017-03-06 09:34:30 +00:00
}
_, err = DefalutClient.Put(p.Key(), p.Val(), client.WithLease(id))
return
}
2017-03-06 09:34:30 +00:00
2017-03-08 09:00:22 +00:00
func (p *Process) del() error {
if atomic.LoadInt32(&p.hasPut) != 1 {
return nil
}
_, err := DefalutClient.Delete(p.Key())
2017-03-06 09:34:30 +00:00
return err
}
func (p *Process) Start() {
if p == nil {
return
}
if !atomic.CompareAndSwapInt32(&p.running, 0, 1) {
return
}
if conf.Config.ProcReq == 0 {
if err := p.put(); err != nil {
log.Warnf("proc put[%s] err: %s", p.Key(), err.Error())
}
return
}
p.done = make(chan struct{})
p.wg.Add(1)
go func() {
select {
case <-p.done:
case <-time.After(time.Duration(conf.Config.ProcReq) * time.Second):
if err := p.put(); err != nil {
log.Warnf("proc put[%s] err: %s", p.Key(), err.Error())
}
}
p.wg.Done()
}()
}
func (p *Process) Stop() {
if p == nil {
return
}
if !atomic.CompareAndSwapInt32(&p.running, 1, 0) {
return
}
if p.done != nil {
close(p.done)
}
p.wg.Wait()
if err := p.del(); err != nil {
log.Warnf("proc del[%s] err: %s", p.Key(), err.Error())
}
}