job: 增加任务执行中信息记录阀值

pull/1/head
miraclesu 2017-03-10 11:55:46 +08:00
parent 93e5d79839
commit 4f7a6625ee
5 changed files with 56 additions and 20 deletions

View File

@ -53,6 +53,9 @@ type Conf struct {
// 执行任务信息过期时间,单位秒
// 0 为不过期
ProcTtl int64
// 记录任务执行中的信息的执行时间阀值,单位秒
// 0 为不限制
ProcReq int64
Log *log.Config
Etcd client.Config
@ -98,6 +101,10 @@ func (c *Conf) parse() error {
return err
}
// 转为 ms
if c.ProcReq > 0 {
c.ProcReq *= 1000
}
if c.Etcd.DialTimeout > 0 {
c.Etcd.DialTimeout *= time.Second
}

View File

@ -5,9 +5,14 @@
"Cmd": "/cronsun/cmd/",
"Once": "/cronsun/once/",
"Group": "/cronsun/group/",
"#Ttl": "节点超时时间,单位秒",
"Ttl": 10,
"#ReqTimeout": "etcd 请求超时时间,单位秒",
"ReqTimeout": 2,
"#ProcTtl": "执行中的任务信息过期时间单位秒0 为不过期",
"ProcTtl": 600,
"#ProcReq": "记录任务执行中的信息的执行时间阀值单位秒0 为不限制",
"ProcReq": 5,
"Log": "@extend:log.json",
"Etcd": "@extend:etcd.json",
"Mgo": "@extend:db.json",

View File

@ -28,15 +28,19 @@ const (
// 需要执行的 cron cmd 命令
// 注册到 /cronsun/cmd/groupName/<id>
type Job struct {
ID string `json:"id"`
Name string `json:"name"`
Group string `json:"group"`
Command string `json:"cmd"`
User string `json:"user"`
Rules []*JobRule `json:"rules"`
Pause bool `json:"pause"` // 可手工控制的状态
Timeout int64 `json:"timeout"` // 任务执行时间超时设置,大于 0 时有效
Parallels int64 `json:"parallels"` // 设置任务在单个节点上可以同时允许多少个,针对两次任务执行间隔比任务执行时间要长的任务启用
ID string `json:"id"`
Name string `json:"name"`
Group string `json:"group"`
Command string `json:"cmd"`
User string `json:"user"`
Rules []*JobRule `json:"rules"`
Pause bool `json:"pause"` // 可手工控制的状态
Timeout int64 `json:"timeout"` // 任务执行时间超时设置,大于 0 时有效
// 设置任务在单个节点上可以同时允许多少个
// 针对两次任务执行间隔比任务执行时间要长的任务启用
Parallels int64 `json:"parallels"`
// 平均执行时间,单位 ms
AvgTime int64 `json:"avg_time"`
// 执行任务的结点,用于记录 job log
runOn string
@ -171,6 +175,7 @@ func (j *Job) String() string {
func (j *Job) Run() {
var (
cmd *exec.Cmd
proc *Process
sysProcAttr *syscall.SysProcAttr
)
@ -228,21 +233,23 @@ func (j *Job) Run() {
return
}
p := &Process{
ID: strconv.Itoa(cmd.Process.Pid),
JobID: j.ID,
Group: j.Group,
NodeID: j.runOn,
Time: t,
if j.AvgTime >= conf.Config.ProcReq {
proc = &Process{
ID: strconv.Itoa(cmd.Process.Pid),
JobID: j.ID,
Group: j.Group,
NodeID: j.runOn,
Time: t,
}
proc.Start()
}
p.Start()
if err := cmd.Wait(); err != nil {
p.Stop()
proc.Stop()
j.Fail(t, fmt.Sprintf("%s", err.Error()))
return
}
p.Stop()
proc.Stop()
j.Success(t, b.String())
}
@ -323,6 +330,16 @@ func (j *Job) Fail(t time.Time, msg string) {
CreateJobLog(j, t, msg, false)
}
func (j *Job) Avg(t, et time.Time) {
execTime := int64(et.Sub(t) / time.Millisecond)
if j.AvgTime == 0 {
j.AvgTime = execTime
return
}
j.AvgTime = (j.AvgTime + execTime) / 2
}
func (j *Job) Cmds(nid string, gs map[string]*Group) (cmds map[string]*Cmd) {
cmds = make(map[string]*Cmd)
if j.Pause {

View File

@ -82,6 +82,9 @@ func GetJobLatestLogListByJobIds(jobIds []string) (m map[string]*JobLatestLog, e
}
func CreateJobLog(j *Job, t time.Time, rs string, success bool) {
et := time.Now()
j.Avg(t, et)
jl := JobLog{
Id: bson.NewObjectId(),
JobId: j.ID,
@ -97,7 +100,7 @@ func CreateJobLog(j *Job, t time.Time, rs string, success bool) {
Success: success,
BeginTime: t,
EndTime: time.Now(),
EndTime: et,
}
if err := mgoDB.Insert(Coll_JobLog, jl); err != nil {
log.Error(err.Error())

View File

@ -191,6 +191,10 @@ func (p *Process) del() error {
}
func (p *Process) Start() {
if p == nil {
return
}
if err := p.put(); err != nil {
log.Warnf("proc put err: %s", err.Error())
return
@ -200,7 +204,7 @@ func (p *Process) Start() {
}
func (p *Process) Stop() error {
if !p.running {
if p == nil || !p.running {
return nil
}