Use struct instead of map, rewrite api of killing process

pull/106/head
Doflatango 2018-09-15 18:04:13 +08:00
parent 93220344ae
commit 0ef88c713d
6 changed files with 61 additions and 34 deletions

4
job.go
View File

@ -452,7 +452,9 @@ func (j *Job) Run() bool {
JobID: j.ID, JobID: j.ID,
Group: j.Group, Group: j.Group,
NodeID: j.runOn, NodeID: j.runOn,
Time: t, ProcessVal: ProcessVal{
Time: t,
},
} }
proc.Start() proc.Start()
defer proc.Stop() defer proc.Stop()

View File

@ -476,11 +476,12 @@ func (n *Node) watchExcutingProc() {
} }
val := string(ev.Kv.Value) val := string(ev.Kv.Value)
err = json.Unmarshal([]byte(val), process) pv := &cronsun.ProcessVal{}
err = json.Unmarshal([]byte(val), pv)
if err != nil { if err != nil {
continue continue
} }
process.ProcessVal = *pv
if process.Killed { if process.Killed {
n.KillExcutingProc(process) n.KillExcutingProc(process)
} }

31
proc.go
View File

@ -128,12 +128,13 @@ func (l *leaseID) keepAlive() {
// value: 开始执行时间 // value: 开始执行时间
// key 会自动过期,防止进程意外退出后没有清除相关 key过期时间可配置 // key 会自动过期,防止进程意外退出后没有清除相关 key过期时间可配置
type Process struct { type Process struct {
ID string `json:"id"` // pid // parse from key path
JobID string `json:"jobId"` ID string `json:"id"` // pid
Group string `json:"group"` JobID string `json:"jobId"`
NodeID string `json:"nodeId"` Group string `json:"group"`
Time time.Time `json:"time"` // 开始执行时间 NodeID string `json:"nodeId"`
Killed bool `json:"killed"` // 是否强制杀死 // parse from value
ProcessVal
running int32 running int32
hasPut int32 hasPut int32
@ -141,6 +142,11 @@ type Process struct {
done chan struct{} done chan struct{}
} }
type ProcessVal struct {
Time time.Time `json:"time"` // 开始执行时间
Killed bool `json:"killed"` // 是否强制杀死
}
func GetProcFromKey(key string) (proc *Process, err error) { func GetProcFromKey(key string) (proc *Process, err error) {
ss := strings.Split(key, "/") ss := strings.Split(key, "/")
var sslen = len(ss) var sslen = len(ss)
@ -163,16 +169,15 @@ func (p *Process) Key() string {
} }
func (p *Process) Val() (string, error) { func (p *Process) Val() (string, error) {
val := struct { b, err := json.Marshal(&p.ProcessVal)
Time string `json:"time"` if err != nil {
Killed bool `json:"killed"` return "", err
}{p.Time.Format(time.RFC3339), p.Killed} }
str, err := json.Marshal(val) return string(b), nil
return string(str), err
} }
// 获取点正在执行任务的数量 // 获取点正在执行任务的数量
func (j *Job) CountRunning() (int64, error) { func (j *Job) CountRunning() (int64, error) {
resp, err := DefalutClient.Get(conf.Config.Proc+j.runOn+"/"+j.Group+"/"+j.ID, client.WithPrefix(), client.WithCountOnly()) resp, err := DefalutClient.Get(conf.Config.Proc+j.runOn+"/"+j.Group+"/"+j.ID, client.WithPrefix(), client.WithCountOnly())
if err != nil { if err != nil {

View File

@ -351,9 +351,13 @@ func (j *Job) GetExecutingJob(ctx *Context) {
} }
val := string(gresp.Kvs[i].Value) val := string(gresp.Kvs[i].Value)
var p cronsun.Process var pv = &cronsun.ProcessVal{}
json.Unmarshal([]byte(val), &p) err = json.Unmarshal([]byte(val), pv)
if err != nil {
log.Errorf("Failed to unmarshal ProcessVal from val: %s", err.Error())
continue
}
proc.ProcessVal = *pv
list = append(list, proc) list = append(list, proc)
} }
@ -362,45 +366,56 @@ func (j *Job) GetExecutingJob(ctx *Context) {
} }
func (j *Job) KillExecutingJob(ctx *Context) { func (j *Job) KillExecutingJob(ctx *Context) {
vars := mux.Vars(ctx.R) proc := &cronsun.Process{
id := strings.TrimSpace(vars["id"]) ID: getStringVal("pid", ctx.R),
id = strings.Replace(id, ".", "/", -1) JobID: getStringVal("job", ctx.R),
Group: getStringVal("group", ctx.R),
NodeID: getStringVal("node", ctx.R),
}
procKey := conf.Config.Proc + id if proc.ID == "" || proc.JobID == "" || proc.Group == "" || proc.NodeID == "" {
outJSONWithCode(ctx.W, http.StatusBadRequest, "Invalid process info.")
return
}
procKey := proc.Key()
resp, err := cronsun.DefalutClient.Get(procKey) resp, err := cronsun.DefalutClient.Get(procKey)
if err != nil { if err != nil {
outJSONWithCode(ctx.W, http.StatusInternalServerError, err.Error()) outJSONWithCode(ctx.W, http.StatusInternalServerError, err.Error())
return return
} }
if len(resp.Kvs) < 1 { if len(resp.Kvs) < 1 {
outJSONWithCode(ctx.W, http.StatusInternalServerError, nil) outJSONWithCode(ctx.W, http.StatusNotFound, "Porcess not found")
return return
} }
var procVal cronsun.Process var procVal = &cronsun.ProcessVal{}
err = json.Unmarshal(resp.Kvs[0].Value, &procVal) err = json.Unmarshal(resp.Kvs[0].Value, &procVal)
if err != nil { if err != nil {
outJSONWithCode(ctx.W, http.StatusInternalServerError, err.Error()) outJSONWithCode(ctx.W, http.StatusInternalServerError, err.Error())
return return
} }
if procVal.Killed {
outJSONWithCode(ctx.W, http.StatusOK, "Killing process")
return
}
procVal.Killed = true procVal.Killed = true
newVal, err := json.Marshal(procVal) proc.ProcessVal = *procVal
str, err := proc.Val()
if err != nil { if err != nil {
outJSONWithCode(ctx.W, http.StatusInternalServerError, err.Error()) outJSONWithCode(ctx.W, http.StatusInternalServerError, err.Error())
return return
} }
_, err = cronsun.DefalutClient.Put(procKey, string(newVal)) _, err = cronsun.DefalutClient.Put(procKey, str)
if err != nil { if err != nil {
outJSONWithCode(ctx.W, http.StatusInternalServerError, err.Error()) outJSONWithCode(ctx.W, http.StatusInternalServerError, err.Error())
return return
} }
outJSONWithCode(ctx.W, http.StatusOK, "杀死进程成功") outJSONWithCode(ctx.W, http.StatusOK, "Killing process")
} }
type ProcFetchOptions struct { type ProcFetchOptions struct {

View File

@ -78,7 +78,7 @@ func initRouters() (s *http.Server, err error) {
// kill an executing job // kill an executing job
h = NewAuthHandler(jobHandler.KillExecutingJob, cronsun.Developer) h = NewAuthHandler(jobHandler.KillExecutingJob, cronsun.Developer)
subrouter.Handle("/job/executing/{id}", h).Methods("DELETE") subrouter.Handle("/job/executing", h).Methods("DELETE")
// get job log list // get job log list
h = NewAuthHandler(jobLogHandler.GetList, cronsun.Reporter) h = NewAuthHandler(jobLogHandler.GetList, cronsun.Reporter)

View File

@ -105,9 +105,13 @@ export default {
killProc(proc, index) { killProc(proc, index) {
if (!confirm(this.$L("whether to kill the process"))) return; if (!confirm(this.$L("whether to kill the process"))) return;
var vm = this
var id = proc.nodeId + "." + proc.group + "." + proc.jobId + "." + proc.id; var params = []
this.$rest.DELETE('job/executing/' + id) params.push('node='+proc.nodeId)
params.push('group='+proc.group)
params.push('job='+proc.jobId)
params.push('pid='+proc.id)
this.$rest.DELETE('job/executing?' + params.join('&'))
.onsucceed(200, (resp) => { .onsucceed(200, (resp) => {
this.executings.splice(index, 1); this.executings.splice(index, 1);
}) })