diff --git a/models/job.go b/models/job.go index dd76dfb..c22a366 100644 --- a/models/job.go +++ b/models/job.go @@ -35,7 +35,7 @@ type Job struct { Rules []*JobRule `json:"rules"` Pause bool `json:"pause"` // 可手工控制的状态 Timeout int64 `json:"timeout"` // 任务执行时间超时设置,大于 0 时有效 - Parallels int `json:"parallels"` // 设置任务在单个节点上可以同时允许多少个,针对两次任务执行间隔比任务执行时间要长的任务启用 + Parallels int64 `json:"parallels"` // 设置任务在单个节点上可以同时允许多少个,针对两次任务执行间隔比任务执行时间要长的任务启用 // 执行任务的结点,用于记录 job log runOn string @@ -174,6 +174,7 @@ func (j *Job) Run() { ) t := time.Now() + // 用户权限控制 if len(j.User) > 0 { u, err := user.Lookup(j.User) if err != nil { @@ -183,10 +184,8 @@ func (j *Job) Run() { uid, err := strconv.Atoi(u.Uid) if err != nil { - if err != nil { - j.Fail(t, "not support run with user on windows") - return - } + j.Fail(t, "not support run with user on windows") + return } gid, _ := strconv.Atoi(u.Gid) sysProcAttr = &syscall.SysProcAttr{ @@ -197,6 +196,21 @@ func (j *Job) Run() { } } + // 同时允许任务进程数控制 + if j.Parallels > 0 { + count, err := j.CountRunning() + if err != nil { + j.Fail(t, fmt.Sprintf("count job[%s] running on[%s] err: %s", j.Key(), j.runOn, err.Error())) + return + } + + if j.Parallels <= count { + j.Fail(t, fmt.Sprintf("job[%s] running on[%s] limit:[%d]", j.Key(), j.runOn, count)) + return + } + } + + // 超时控制 if j.Timeout > 0 { ctx, cancel := context.WithTimeout(context.Background(), time.Duration(j.Timeout)*time.Second) defer cancel() diff --git a/models/proc.go b/models/proc.go index 3411bd1..34761be 100644 --- a/models/proc.go +++ b/models/proc.go @@ -146,9 +146,8 @@ func (p *Process) Val() string { } // 获取结点正在执行任务的数量 -func (p *Process) Count() (int64, error) { - key := p.Key() - resp, err := DefalutClient.Get(key[:len(key)-len(p.ID)]) +func (j *Job) CountRunning() (int64, error) { + resp, err := DefalutClient.Get(conf.Config.Proc + j.runOn + "/" + j.Group + "/" + j.ID) if err != nil { return 0, err }