job: 增加同时运行进程数限制

pull/1/head
miraclesu 2017-03-09 16:12:32 +08:00
parent c8f06e843e
commit d18c59f6cc
2 changed files with 21 additions and 8 deletions

View File

@ -35,7 +35,7 @@ type Job struct {
Rules []*JobRule `json:"rules"`
Pause bool `json:"pause"` // 可手工控制的状态
Timeout int64 `json:"timeout"` // 任务执行时间超时设置,大于 0 时有效
Parallels int `json:"parallels"` // 设置任务在单个节点上可以同时允许多少个,针对两次任务执行间隔比任务执行时间要长的任务启用
Parallels int64 `json:"parallels"` // 设置任务在单个节点上可以同时允许多少个,针对两次任务执行间隔比任务执行时间要长的任务启用
// 执行任务的结点,用于记录 job log
runOn string
@ -174,6 +174,7 @@ func (j *Job) Run() {
)
t := time.Now()
// 用户权限控制
if len(j.User) > 0 {
u, err := user.Lookup(j.User)
if err != nil {
@ -183,10 +184,8 @@ func (j *Job) Run() {
uid, err := strconv.Atoi(u.Uid)
if err != nil {
if err != nil {
j.Fail(t, "not support run with user on windows")
return
}
j.Fail(t, "not support run with user on windows")
return
}
gid, _ := strconv.Atoi(u.Gid)
sysProcAttr = &syscall.SysProcAttr{
@ -197,6 +196,21 @@ func (j *Job) Run() {
}
}
// 同时允许任务进程数控制
if j.Parallels > 0 {
count, err := j.CountRunning()
if err != nil {
j.Fail(t, fmt.Sprintf("count job[%s] running on[%s] err: %s", j.Key(), j.runOn, err.Error()))
return
}
if j.Parallels <= count {
j.Fail(t, fmt.Sprintf("job[%s] running on[%s] limit:[%d]", j.Key(), j.runOn, count))
return
}
}
// 超时控制
if j.Timeout > 0 {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(j.Timeout)*time.Second)
defer cancel()

View File

@ -146,9 +146,8 @@ func (p *Process) Val() string {
}
// 获取结点正在执行任务的数量
func (p *Process) Count() (int64, error) {
key := p.Key()
resp, err := DefalutClient.Get(key[:len(key)-len(p.ID)])
func (j *Job) CountRunning() (int64, error) {
resp, err := DefalutClient.Get(conf.Config.Proc + j.runOn + "/" + j.Group + "/" + j.ID)
if err != nil {
return 0, err
}