job: 优化正在执行任务信息规则&同时执行任务数控制

pull/1/head
miraclesu 2017-03-13 16:23:01 +08:00
parent aec8b17621
commit 2976114527
3 changed files with 85 additions and 34 deletions

View File

@ -101,10 +101,6 @@ func (c *Conf) parse() error {
return err
}
// 转为 ms
if c.ProcReq > 0 {
c.ProcReq *= 1000
}
if c.Etcd.DialTimeout > 0 {
c.Etcd.DialTimeout *= time.Second
}

View File

@ -9,6 +9,7 @@ import (
"runtime"
"strconv"
"strings"
"sync/atomic"
"syscall"
"time"
@ -46,6 +47,8 @@ type Job struct {
runOn string
// 用于存储分隔后的任务
cmd []string
// 控制同时执行任务数
count int64
}
type JobRule struct {
@ -171,8 +174,35 @@ func (j *Job) String() string {
return string(data)
}
func (j *Job) limit() bool {
if j.Parallels == 0 {
return false
}
count := atomic.LoadInt64(&j.count)
if j.Parallels <= count {
j.Fail(time.Now(), fmt.Sprintf("job[%s] running on[%s] running:[%d]", j.Key(), j.runOn, count))
return true
}
atomic.AddInt64(&j.count, 1)
return false
}
func (j *Job) unlimit() {
if j.Parallels == 0 {
return
}
atomic.AddInt64(&j.count, -1)
}
// Run 执行任务
func (j *Job) Run() {
if j.limit() {
return
}
defer j.unlimit()
var (
cmd *exec.Cmd
proc *Process
@ -202,20 +232,6 @@ func (j *Job) Run() {
}
}
// 同时允许任务进程数控制
if j.Parallels > 0 {
count, err := j.CountRunning()
if err != nil {
j.Fail(t, fmt.Sprintf("count job[%s] running on[%s] err: %s", j.Key(), j.runOn, err.Error()))
return
}
if j.Parallels <= count {
j.Fail(t, fmt.Sprintf("job[%s] running on[%s] limit:[%d]", j.Key(), j.runOn, count))
return
}
}
// 超时控制
if j.Timeout > 0 {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(j.Timeout)*time.Second)
@ -233,16 +249,14 @@ func (j *Job) Run() {
return
}
if j.AvgTime >= conf.Config.ProcReq {
proc = &Process{
ID: strconv.Itoa(cmd.Process.Pid),
JobID: j.ID,
Group: j.Group,
NodeID: j.runOn,
Time: t,
}
proc.Start()
proc = &Process{
ID: strconv.Itoa(cmd.Process.Pid),
JobID: j.ID,
Group: j.Group,
NodeID: j.runOn,
Time: t,
}
proc.Start()
if err := cmd.Wait(); err != nil {
proc.Stop()

View File

@ -137,6 +137,9 @@ type Process struct {
Time time.Time `json:"time"` // 开始执行时间
running bool
hasPut bool
timer *time.Timer
done chan struct{}
}
func GetProcFromKey(key string) (proc *Process, err error) {
@ -175,32 +178,60 @@ func (j *Job) CountRunning() (int64, error) {
}
func (p *Process) put() error {
if p.hasPut {
return nil
}
id := lID.get()
if id < 0 {
_, err := DefalutClient.Put(p.Key(), p.Val())
if err == nil {
p.hasPut = true
}
return err
}
_, err := DefalutClient.Put(p.Key(), p.Val(), client.WithLease(id))
if err == nil {
p.hasPut = true
}
return err
}
func (p *Process) del() error {
if !p.hasPut {
return nil
}
_, err := DefalutClient.Delete(p.Key())
return err
}
func (p *Process) Start() {
if p == nil {
return
}
if err := p.put(); err != nil {
log.Warnf("proc put err: %s", err.Error())
if p == nil || p.running {
return
}
p.running = true
if conf.Config.ProcReq == 0 {
if err := p.put(); err != nil {
log.Warnf("proc put err: %s", err.Error())
}
return
}
p.timer = time.NewTimer(time.Duration(conf.Config.ProcReq) * time.Second)
p.done = make(chan struct{})
go func() {
select {
case <-p.done:
case <-p.timer.C:
if err := p.put(); err != nil {
log.Warnf("proc put err: %s", err.Error())
}
}
}()
}
func (p *Process) Stop() error {
@ -208,5 +239,15 @@ func (p *Process) Stop() error {
return nil
}
return p.del()
if p.done != nil {
close(p.done)
}
if p.timer != nil {
p.timer.Stop()
}
err := p.del()
p.running, p.hasPut = false, false
return err
}