From e2974c40d6866acfb95db0c00ba901a6341141f4 Mon Sep 17 00:00:00 2001 From: Doflatango Date: Fri, 10 Mar 2017 11:06:40 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9F=A5=E7=9C=8B=E6=89=A7=E8=A1=8C=E4=B8=AD?= =?UTF-8?q?=E7=9A=84=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- models/job.go | 1 - models/proc.go | 33 ++++-- web/base.go | 10 ++ web/job.go | 62 +++++++++++ web/routers.go | 4 + web/ui/src/components/Job.vue | 5 +- web/ui/src/components/JobExecuting.vue | 140 +++++++++++++++++++++++++ web/ui/src/libraries/functions.js | 7 +- web/ui/src/main.js | 2 + 9 files changed, 253 insertions(+), 11 deletions(-) create mode 100644 web/ui/src/components/JobExecuting.vue diff --git a/models/job.go b/models/job.go index a27ee23..50123df 100644 --- a/models/job.go +++ b/models/job.go @@ -220,7 +220,6 @@ func (j *Job) Run() { cmd = exec.Command(j.cmd[0], j.cmd[1:]...) } cmd.SysProcAttr = sysProcAttr - var b bytes.Buffer cmd.Stdout = &b cmd.Stderr = &b diff --git a/models/proc.go b/models/proc.go index 34761be..bcb2417 100644 --- a/models/proc.go +++ b/models/proc.go @@ -2,11 +2,13 @@ package models import ( "context" + "fmt" "sync" "time" client "github.com/coreos/etcd/clientv3" + "strings" "sunteng/commons/log" "sunteng/cronsun/conf" ) @@ -88,7 +90,7 @@ func (l *leaseID) set() error { } func (l *leaseID) keepAlive() { - duration := time.Duration(l.ttl) + duration := time.Duration(l.ttl) * time.Second timer := time.NewTimer(duration) for { select { @@ -124,19 +126,36 @@ func (l *leaseID) keepAlive() { } // 当前执行中的任务信息 -// key: /cronsun/proc/node/job id/pid +// key: /cronsun/proc/node/group/jobId/pid // value: 开始执行时间 // key 会自动过期,防止进程意外退出后没有清除相关 key,过期时间可配置 type Process struct { - ID string `json:"id"` - JobID string `json:"job_id"` + ID string `json:"id"` // pid + JobID string `json:"jobId"` Group string `json:"group"` - NodeID string `json:"node_id"` - Time time.Time `json:"name"` // 开始执行时间 + NodeID string `json:"nodeId"` + Time time.Time `json:"time"` // 开始执行时间 running bool } +func GetProcFromKey(key string) (proc *Process, err error) { + ss := strings.Split(key, "/") + var sslen = len(ss) + if sslen < 5 { + err = fmt.Errorf("invalid proc key [%s]", err.Error()) + return + } + + proc = &Process{ + ID: ss[sslen-1], + JobID: ss[sslen-2], + Group: ss[sslen-3], + NodeID: ss[sslen-4], + } + return +} + func (p *Process) Key() string { return conf.Config.Proc + p.NodeID + "/" + p.Group + "/" + p.JobID + "/" + p.ID } @@ -147,7 +166,7 @@ func (p *Process) Val() string { // 获取结点正在执行任务的数量 func (j *Job) CountRunning() (int64, error) { - resp, err := DefalutClient.Get(conf.Config.Proc + j.runOn + "/" + j.Group + "/" + j.ID) + resp, err := DefalutClient.Get(conf.Config.Proc+j.runOn+"/"+j.Group+"/"+j.ID, client.WithPrefix(), client.WithCountOnly()) if err != nil { return 0, err } diff --git a/web/base.go b/web/base.go index e33b8ed..5641a09 100644 --- a/web/base.go +++ b/web/base.go @@ -53,3 +53,13 @@ func outJSONWithCode(w http.ResponseWriter, httpCode int, data interface{}) { func outJSON(w http.ResponseWriter, data interface{}) { outJSONWithCode(w, http.StatusOK, data) } + +func InStringArray(k string, ss []string) bool { + for i := range ss { + if ss[i] == k { + return true + } + } + + return false +} diff --git a/web/job.go b/web/job.go index 52f53ad..3e03e8d 100644 --- a/web/job.go +++ b/web/job.go @@ -12,6 +12,7 @@ import ( "sunteng/commons/log" "sunteng/cronsun/conf" "sunteng/cronsun/models" + "time" ) type Job struct{} @@ -196,3 +197,64 @@ func (j *Job) GetList(w http.ResponseWriter, r *http.Request) { outJSON(w, jobList) } + +func (j *Job) GetExecutingJob(w http.ResponseWriter, r *http.Request) { + opt := &ProcFetchOptions{ + Groups: GetStringArrayFromQuery("groups", ",", r), + NodeIds: GetStringArrayFromQuery("nodes", ",", r), + JobIds: GetStringArrayFromQuery("jobs", ",", r), + } + + gresp, err := models.DefalutClient.Get(conf.Config.Proc, clientv3.WithPrefix()) + if err != nil { + outJSONWithCode(w, http.StatusInternalServerError, err.Error()) + return + } + + var list = make([]*models.Process, 0, 8) + for i := range gresp.Kvs { + proc, err := models.GetProcFromKey(string(gresp.Kvs[i].Key)) + if err != nil { + log.Error("Failed to unmarshal Proc from key: ", err.Error()) + continue + } + + if !opt.Match(proc) { + continue + } + proc.Time, _ = time.Parse(time.RFC3339, string(gresp.Kvs[i].Value)) + list = append(list, proc) + } + + sort.Sort(ByProcTime(list)) + outJSON(w, list) +} + +type ProcFetchOptions struct { + Groups []string + NodeIds []string + JobIds []string +} + +func (opt *ProcFetchOptions) Match(proc *models.Process) bool { + if len(opt.Groups) > 0 && !InStringArray(proc.Group, opt.Groups) { + return false + } + + if len(opt.JobIds) > 0 && !InStringArray(proc.JobID, opt.JobIds) { + return false + + } + + if len(opt.NodeIds) > 0 && !InStringArray(proc.NodeID, opt.NodeIds) { + return false + } + + return true +} + +type ByProcTime []*models.Process + +func (a ByProcTime) Len() int { return len(a) } +func (a ByProcTime) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a ByProcTime) Less(i, j int) bool { return a[i].Time.After(a[j].Time) } diff --git a/web/routers.go b/web/routers.go index 2c9dff1..cbb265c 100644 --- a/web/routers.go +++ b/web/routers.go @@ -38,6 +38,10 @@ func InitRouters() (s *http.Server, err error) { h = BaseHandler{Handle: jobHandler.DeleteJob} subrouter.Handle("/job/{group}-{id}", h).Methods("DELETE") + // query executing job + h = BaseHandler{Handle: jobHandler.GetExecutingJob} + subrouter.Handle("/job/executing", h).Methods("GET") + // get job log list h = BaseHandler{Handle: jobLogHandler.GetList} subrouter.Handle("/logs", h).Methods("GET") diff --git a/web/ui/src/components/Job.vue b/web/ui/src/components/Job.vue index bfe86e0..ef51f70 100644 --- a/web/ui/src/components/Job.vue +++ b/web/ui/src/components/Job.vue @@ -3,9 +3,10 @@