diff --git a/bin/node/server.go b/bin/node/server.go index b1a8693..8361ba3 100644 --- a/bin/node/server.go +++ b/bin/node/server.go @@ -18,7 +18,7 @@ import ( var ( level = flag.Int("l", 0, "log level, -1:debug, 0:info, 1:warn, 2:error") - confFile = flag.String("conf", "conf/files/base.json", "config file path") + confFile = flag.String("conf", "../../conf/files/base.json", "config file path") ) func main() { diff --git a/bin/web/server.go b/bin/web/server.go index 2c2e7e3..e48460a 100644 --- a/bin/web/server.go +++ b/bin/web/server.go @@ -19,7 +19,7 @@ import ( var ( level = flag.Int("l", 0, "log level, -1:debug, 0:info, 1:warn, 2:error") - confFile = flag.String("conf", "conf/files/base.json", "config file path") + confFile = flag.String("conf", "../../conf/files/base.json", "config file path") network = flag.String("network", "", "network protocol of listen address: ipv4/ipv6, or empty use both") ) diff --git a/job.go b/job.go index 512eb71..730863d 100644 --- a/job.go +++ b/job.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "os/exec" "os/user" @@ -416,31 +417,17 @@ func (j *Job) Run() bool { cmd *exec.Cmd proc *Process sysProcAttr *syscall.SysProcAttr + err error ) t := time.Now() - // 用户权限控制 + if len(j.User) > 0 { - u, err := user.Lookup(j.User) + sysProcAttr, err = j.CreateCmdAttr() if err != nil { j.Fail(t, err.Error()) return false } - - uid, err := strconv.Atoi(u.Uid) - if err != nil { - j.Fail(t, "not support run with user on windows") - return false - } - if uid != _Uid { - gid, _ := strconv.Atoi(u.Gid) - sysProcAttr = &syscall.SysProcAttr{ - Credential: &syscall.Credential{ - Uid: uint32(uid), - Gid: uint32(gid), - }, - } - } } // 超时控制 @@ -722,3 +709,28 @@ func (j *Job) ShortName() string { return string(names[:10]) + "..." } + +func (j *Job) CreateCmdAttr() (*syscall.SysProcAttr, error) { + var sysProcAttr *syscall.SysProcAttr + + u, err := user.Lookup(j.User) + if err != nil { + return nil, err + } + + uid, err := strconv.Atoi(u.Uid) + if err != nil { + return nil, errors.New("not support run with user on windows") + } + if uid != _Uid { + gid, _ := strconv.Atoi(u.Gid) + sysProcAttr = &syscall.SysProcAttr{ + Credential: &syscall.Credential{ + Uid: uint32(uid), + Gid: uint32(gid), + }, + } + } + + return sysProcAttr, nil +} diff --git a/node/node.go b/node/node.go index 64a201b..30c19ce 100644 --- a/node/node.go +++ b/node/node.go @@ -7,10 +7,15 @@ import ( "path" "strconv" "strings" + "syscall" "time" client "github.com/coreos/etcd/clientv3" + "os/exec" + + "encoding/json" + "github.com/shunfei/cronsun" "github.com/shunfei/cronsun/conf" "github.com/shunfei/cronsun/log" @@ -420,6 +425,35 @@ func (n *Node) groupRmNode(g, og *cronsun.Group) { n.groups[g.ID] = g } +func (n *Node) KillExcutingProc(process *cronsun.Process) { + var ( + cmd *exec.Cmd + sysProcAttr *syscall.SysProcAttr + err error + ) + + job, ok := n.jobs[process.JobID] + if !ok { + log.Warnf("jobId:[%s] is not exist!\n", process.JobID) + return + } + + if job.User != "" { + sysProcAttr, err = job.CreateCmdAttr() + if err != nil { + log.Warnf("process:[%s] createCmdAttr failed, error:[%s]\n", process.ID, err) + return + } + } + + cmd = exec.Command("kill", "-9", process.ID) + cmd.SysProcAttr = sysProcAttr + if err := cmd.Run(); err != nil { + log.Warnf("process:[%s] force kill failed, error:[%s]\n", process.ID, err) + return + } +} + func (n *Node) watchJobs() { rch := cronsun.WatchJobs() for wresp := range rch { @@ -452,6 +486,34 @@ func (n *Node) watchJobs() { } } +func (n *Node) watchExcutingProc() { + rch := cronsun.WatchProcs(n.ID) + + for wresp := range rch { + for _, ev := range wresp.Events { + switch { + case ev.IsModify(): + key := string(ev.Kv.Key) + process, err := cronsun.GetProcFromKey(key) + if err != nil { + log.Warnf("err: %s, kv: %s", err.Error(), ev.Kv.String()) + continue + } + + val := string(ev.Kv.Value) + err = json.Unmarshal([]byte(val), process) + if err != nil { + continue + } + + if process.Killed { + n.KillExcutingProc(process) + } + } + } + } +} + func (n *Node) watchGroups() { rch := cronsun.WatchGroups() for wresp := range rch { @@ -531,6 +593,7 @@ func (n *Node) Run() (err error) { n.Cron.Start() go n.watchJobs() + go n.watchExcutingProc() go n.watchGroups() go n.watchOnce() go n.watchCsctl() diff --git a/proc.go b/proc.go index 07316ba..5346d55 100644 --- a/proc.go +++ b/proc.go @@ -9,6 +9,8 @@ import ( client "github.com/coreos/etcd/clientv3" + "encoding/json" + "github.com/shunfei/cronsun/conf" "github.com/shunfei/cronsun/log" ) @@ -131,7 +133,8 @@ type Process struct { JobID string `json:"jobId"` Group string `json:"group"` NodeID string `json:"nodeId"` - Time time.Time `json:"time"` // 开始执行时间 + Time time.Time `json:"time"` // 开始执行时间 + Killed bool `json:"killed"` // 是否强制杀死 running int32 hasPut int32 @@ -161,7 +164,12 @@ func (p *Process) Key() string { } func (p *Process) Val() string { - return p.Time.Format(time.RFC3339) + val := map[string]interface{}{ + "time": p.Time.Format(time.RFC3339), + "killed": p.Killed, + } + str, _ := json.Marshal(val) + return string(str) } // 获取结点正在执行任务的数量 @@ -254,3 +262,7 @@ func (p *Process) Stop() { log.Warnf("proc del[%s] err: %s", p.Key(), err.Error()) } } + +func WatchProcs(nid string) client.WatchChan { + return DefalutClient.Watch(conf.Config.Proc+nid, client.WithPrefix()) +} diff --git a/web/job.go b/web/job.go index 5ac5a7e..2b4c980 100644 --- a/web/job.go +++ b/web/job.go @@ -350,7 +350,12 @@ func (j *Job) GetExecutingJob(ctx *Context) { if !opt.Match(proc) { continue } - proc.Time, _ = time.Parse(time.RFC3339, string(gresp.Kvs[i].Value)) + + val := string(gresp.Kvs[i].Value) + m := make(map[string]interface{}) + json.Unmarshal([]byte(val), &m) + proc.Time, _ = time.Parse(time.RFC3339, m["time"].(string)) + list = append(list, proc) } @@ -358,6 +363,44 @@ func (j *Job) GetExecutingJob(ctx *Context) { outJSON(ctx.W, list) } +func (j *Job) KillExecutingJob(ctx *Context) { + vars := mux.Vars(ctx.R) + id := strings.TrimSpace(vars["id"]) + id = strings.Replace(id, ".", "/", -1) + + procKey := conf.Config.Proc + id + resp, err := cronsun.DefalutClient.Get(procKey) + + if err != nil { + outJSONWithCode(ctx.W, http.StatusInternalServerError, err.Error()) + return + } + + if len(resp.Kvs) < 1 { + outJSONWithCode(ctx.W, http.StatusInternalServerError, "进程不存在") + return + } + + procVal := make(map[string]interface{}) + err = json.Unmarshal(resp.Kvs[0].Value, &procVal) + + if err != nil { + outJSONWithCode(ctx.W, http.StatusInternalServerError, err.Error()) + return + } + + procVal["killed"] = true + newVal, _ := json.Marshal(procVal) + + _, err = cronsun.DefalutClient.Put(procKey, string(newVal)) + if err != nil { + outJSONWithCode(ctx.W, http.StatusInternalServerError, err.Error()) + return + } + + outJSONWithCode(ctx.W, http.StatusOK, "杀死进程成功") +} + type ProcFetchOptions struct { Groups []string NodeIds []string diff --git a/web/routers.go b/web/routers.go index 04b081e..b450433 100644 --- a/web/routers.go +++ b/web/routers.go @@ -76,6 +76,10 @@ func initRouters() (s *http.Server, err error) { h = NewAuthHandler(jobHandler.GetExecutingJob, cronsun.Reporter) subrouter.Handle("/job/executing", h).Methods("GET") + // kill an executing job + h = NewAuthHandler(jobHandler.KillExecutingJob, cronsun.Developer) + subrouter.Handle("/job/executing/{id}", h).Methods("DELETE") + // get job log list h = NewAuthHandler(jobLogHandler.GetList, cronsun.Reporter) subrouter.Handle("/logs", h).Methods("GET") diff --git a/web/static_assets.go b/web/static_assets.go index f81be92..152fdb8 100644 --- a/web/static_assets.go +++ b/web/static_assets.go @@ -308,15 +308,15 @@ func AssetNames() []string { // _bindata is a table, holding each asset generator, mapped to its name. var _bindata = map[string]func() (*asset, error){ - "build.js": buildJs, + "build.js": buildJs, "build.js.map": buildJsMap, - "flags.png": flagsPng, - "icons.eot": iconsEot, - "icons.svg": iconsSvg, - "icons.ttf": iconsTtf, - "icons.woff": iconsWoff, - "icons.woff2": iconsWoff2, - "index.html": indexHtml, + "flags.png": flagsPng, + "icons.eot": iconsEot, + "icons.svg": iconsSvg, + "icons.ttf": iconsTtf, + "icons.woff": iconsWoff, + "icons.woff2": iconsWoff2, + "index.html": indexHtml, } // AssetDir returns the file names below a certain @@ -358,16 +358,17 @@ type bintree struct { Func func() (*asset, error) Children map[string]*bintree } + var _bintree = &bintree{nil, map[string]*bintree{ - "build.js": &bintree{buildJs, map[string]*bintree{}}, + "build.js": &bintree{buildJs, map[string]*bintree{}}, "build.js.map": &bintree{buildJsMap, map[string]*bintree{}}, - "flags.png": &bintree{flagsPng, map[string]*bintree{}}, - "icons.eot": &bintree{iconsEot, map[string]*bintree{}}, - "icons.svg": &bintree{iconsSvg, map[string]*bintree{}}, - "icons.ttf": &bintree{iconsTtf, map[string]*bintree{}}, - "icons.woff": &bintree{iconsWoff, map[string]*bintree{}}, - "icons.woff2": &bintree{iconsWoff2, map[string]*bintree{}}, - "index.html": &bintree{indexHtml, map[string]*bintree{}}, + "flags.png": &bintree{flagsPng, map[string]*bintree{}}, + "icons.eot": &bintree{iconsEot, map[string]*bintree{}}, + "icons.svg": &bintree{iconsSvg, map[string]*bintree{}}, + "icons.ttf": &bintree{iconsTtf, map[string]*bintree{}}, + "icons.woff": &bintree{iconsWoff, map[string]*bintree{}}, + "icons.woff2": &bintree{iconsWoff2, map[string]*bintree{}}, + "index.html": &bintree{indexHtml, map[string]*bintree{}}, }} // RestoreAsset restores an asset under the given directory @@ -416,4 +417,3 @@ func _filePath(dir, name string) string { cannonicalName := strings.Replace(name, "\\", "/", -1) return filepath.Join(append([]string{dir}, strings.Split(cannonicalName, "/")...)...) } - diff --git a/web/ui/package.json b/web/ui/package.json index be28c27..ec59338 100644 --- a/web/ui/package.json +++ b/web/ui/package.json @@ -13,7 +13,7 @@ "chart.js": "^2.5.0", "jquery": "^3.1.1", "jquery.cookie": "^1.4.1", - "semantic-ui": "^2.2.7", + "semantic-ui": "^2.3.3", "vue": "^2.3.4", "vue-router": "^2.2.1", "vuex": "^2.3.1" @@ -25,6 +25,7 @@ "cross-env": "^3.0.0", "css-loader": "^0.26.1", "file-loader": "^0.9.0", + "gulp-header": "^2.0.5", "style-loader": "^0.13.1", "vue-loader": "^10.0.0", "vue-template-compiler": "^2.3.4", diff --git a/web/ui/src/components/JobExecuting.vue b/web/ui/src/components/JobExecuting.vue index 877786e..b2825e0 100644 --- a/web/ui/src/components/JobExecuting.vue +++ b/web/ui/src/components/JobExecuting.vue @@ -33,6 +33,7 @@