强杀进程

pull/98/head^2
xyb 2018-08-13 18:32:04 +08:00
parent 9bb382dfcf
commit b7a58396fd
12 changed files with 493 additions and 116 deletions

46
job.go
View File

@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"os/exec"
"os/user"
@ -416,31 +417,17 @@ func (j *Job) Run() bool {
cmd *exec.Cmd
proc *Process
sysProcAttr *syscall.SysProcAttr
err error
)
t := time.Now()
// 用户权限控制
if len(j.User) > 0 {
u, err := user.Lookup(j.User)
sysProcAttr, err = j.CreateCmdAttr()
if err != nil {
j.Fail(t, err.Error())
return false
}
uid, err := strconv.Atoi(u.Uid)
if err != nil {
j.Fail(t, "not support run with user on windows")
return false
}
if uid != _Uid {
gid, _ := strconv.Atoi(u.Gid)
sysProcAttr = &syscall.SysProcAttr{
Credential: &syscall.Credential{
Uid: uint32(uid),
Gid: uint32(gid),
},
}
}
}
// 超时控制
@ -722,3 +709,28 @@ func (j *Job) ShortName() string {
return string(names[:10]) + "..."
}
func (j *Job) CreateCmdAttr() (*syscall.SysProcAttr, error) {
var sysProcAttr *syscall.SysProcAttr
u, err := user.Lookup(j.User)
if err != nil {
return nil, err
}
uid, err := strconv.Atoi(u.Uid)
if err != nil {
return nil, errors.New("not support run with user on windows")
}
if uid != _Uid {
gid, _ := strconv.Atoi(u.Gid)
sysProcAttr = &syscall.SysProcAttr{
Credential: &syscall.Credential{
Uid: uint32(uid),
Gid: uint32(gid),
},
}
}
return sysProcAttr, nil
}

View File

@ -7,10 +7,15 @@ import (
"path"
"strconv"
"strings"
"syscall"
"time"
client "github.com/coreos/etcd/clientv3"
"os/exec"
"encoding/json"
"github.com/shunfei/cronsun"
"github.com/shunfei/cronsun/conf"
"github.com/shunfei/cronsun/log"
@ -420,6 +425,35 @@ func (n *Node) groupRmNode(g, og *cronsun.Group) {
n.groups[g.ID] = g
}
func (n *Node) KillExcutingProc(process *cronsun.Process) {
var (
cmd *exec.Cmd
sysProcAttr *syscall.SysProcAttr
err error
)
job, ok := n.jobs[process.JobID]
if !ok {
log.Warnf("jobId:[%s] is not exist!\n", process.JobID)
return
}
if job.User != "" {
sysProcAttr, err = job.CreateCmdAttr()
if err != nil {
log.Warnf("process:[%s] createCmdAttr failed, error:[%s]\n", process.ID, err)
return
}
}
cmd = exec.Command("kill", "-9", process.ID)
cmd.SysProcAttr = sysProcAttr
if err := cmd.Run(); err != nil {
log.Warnf("process:[%s] force kill failed, error:[%s]\n", process.ID, err)
return
}
}
func (n *Node) watchJobs() {
rch := cronsun.WatchJobs()
for wresp := range rch {
@ -452,6 +486,34 @@ func (n *Node) watchJobs() {
}
}
func (n *Node) watchExcutingProc() {
rch := cronsun.WatchProcs(n.ID)
for wresp := range rch {
for _, ev := range wresp.Events {
switch {
case ev.IsModify():
key := string(ev.Kv.Key)
process, err := cronsun.GetProcFromKey(key)
if err != nil {
log.Warnf("err: %s, kv: %s", err.Error(), ev.Kv.String())
continue
}
val := string(ev.Kv.Value)
err = json.Unmarshal([]byte(val), process)
if err != nil {
continue
}
if process.Killed {
n.KillExcutingProc(process)
}
}
}
}
}
func (n *Node) watchGroups() {
rch := cronsun.WatchGroups()
for wresp := range rch {
@ -531,6 +593,7 @@ func (n *Node) Run() (err error) {
n.Cron.Start()
go n.watchJobs()
go n.watchExcutingProc()
go n.watchGroups()
go n.watchOnce()
go n.watchCsctl()

16
proc.go
View File

@ -9,6 +9,8 @@ import (
client "github.com/coreos/etcd/clientv3"
"encoding/json"
"github.com/shunfei/cronsun/conf"
"github.com/shunfei/cronsun/log"
)
@ -131,7 +133,8 @@ type Process struct {
JobID string `json:"jobId"`
Group string `json:"group"`
NodeID string `json:"nodeId"`
Time time.Time `json:"time"` // 开始执行时间
Time time.Time `json:"time"` // 开始执行时间
Killed bool `json:"killed"` // 是否强制杀死
running int32
hasPut int32
@ -161,7 +164,12 @@ func (p *Process) Key() string {
}
func (p *Process) Val() string {
return p.Time.Format(time.RFC3339)
val := map[string]interface{}{
"time": p.Time.Format(time.RFC3339),
"killed": p.Killed,
}
str, _ := json.Marshal(val)
return string(str)
}
// 获取结点正在执行任务的数量
@ -254,3 +262,7 @@ func (p *Process) Stop() {
log.Warnf("proc del[%s] err: %s", p.Key(), err.Error())
}
}
func WatchProcs(nid string) client.WatchChan {
return DefalutClient.Watch(conf.Config.Proc+nid, client.WithPrefix())
}

View File

@ -350,7 +350,12 @@ func (j *Job) GetExecutingJob(ctx *Context) {
if !opt.Match(proc) {
continue
}
proc.Time, _ = time.Parse(time.RFC3339, string(gresp.Kvs[i].Value))
val := string(gresp.Kvs[i].Value)
m := make(map[string]interface{})
json.Unmarshal([]byte(val), &m)
proc.Time, _ = time.Parse(time.RFC3339, m["time"].(string))
list = append(list, proc)
}
@ -358,6 +363,44 @@ func (j *Job) GetExecutingJob(ctx *Context) {
outJSON(ctx.W, list)
}
func (j *Job) KillExecutingJob(ctx *Context) {
vars := mux.Vars(ctx.R)
id := strings.TrimSpace(vars["id"])
id = strings.Replace(id, ".", "/", -1)
procKey := conf.Config.Proc + id
resp, err := cronsun.DefalutClient.Get(procKey)
if err != nil {
outJSONWithCode(ctx.W, http.StatusInternalServerError, err.Error())
return
}
if len(resp.Kvs) < 1 {
outJSONWithCode(ctx.W, http.StatusInternalServerError, "进程不存在")
return
}
procVal := make(map[string]interface{})
err = json.Unmarshal(resp.Kvs[0].Value, &procVal)
if err != nil {
outJSONWithCode(ctx.W, http.StatusInternalServerError, err.Error())
return
}
procVal["killed"] = true
newVal, _ := json.Marshal(procVal)
_, err = cronsun.DefalutClient.Put(procKey, string(newVal))
if err != nil {
outJSONWithCode(ctx.W, http.StatusInternalServerError, err.Error())
return
}
outJSONWithCode(ctx.W, http.StatusOK, "杀死进程成功")
}
type ProcFetchOptions struct {
Groups []string
NodeIds []string

View File

@ -76,6 +76,10 @@ func initRouters() (s *http.Server, err error) {
h = NewAuthHandler(jobHandler.GetExecutingJob, cronsun.Reporter)
subrouter.Handle("/job/executing", h).Methods("GET")
// kill an executing job
h = NewAuthHandler(jobHandler.KillExecutingJob, cronsun.Developer)
subrouter.Handle("/job/executing/{id}", h).Methods("DELETE")
// get job log list
h = NewAuthHandler(jobLogHandler.GetList, cronsun.Reporter)
subrouter.Handle("/logs", h).Methods("GET")

File diff suppressed because one or more lines are too long

View File

@ -61,7 +61,7 @@
<div id="app">
<div id="initloader"></div>
</div>
<script src="build.js?v=6fe9c2f"></script>
<script src="build.js?v=9bb382d"></script>
</body>
</html>

View File

@ -13,7 +13,7 @@
"chart.js": "^2.5.0",
"jquery": "^3.1.1",
"jquery.cookie": "^1.4.1",
"semantic-ui": "^2.2.7",
"semantic-ui": "^2.3.3",
"vue": "^2.3.4",
"vue-router": "^2.2.1",
"vuex": "^2.3.1"
@ -25,6 +25,7 @@
"cross-env": "^3.0.0",
"css-loader": "^0.26.1",
"file-loader": "^0.9.0",
"gulp-header": "^2.0.5",
"style-loader": "^0.13.1",
"vue-loader": "^10.0.0",
"vue-template-compiler": "^2.3.4",

View File

@ -1,5 +1,6 @@
<style scope>
.clearfix:after {content:""; clear:both; display:table;}
.kill-proc-btn { color:red;cursor: pointer;}
</style>
<template>
<div>
@ -33,6 +34,7 @@
<th class="center aligned">{{$L('node')}}</th>
<th class="center aligned">{{$L('process ID')}}</th>
<th class="center aligned">{{$L('starting time')}}</th>
<th class="center aligned">{{$L('operation')}}</th>
</tr>
</thead>
<tbody>
@ -42,6 +44,7 @@
<td class="center aligned">{{$store.getters.hostshows(proc.nodeId)}}</td>
<td class="center aligned">{{proc.id}}</td>
<td class="center aligned">{{proc.time}}</td>
<td class="center aligned"><a class="kill-proc-btn" v-on:click="killProc(proc, index)">{{$L('kill process')}}</a></td>
</tr>
</tbody>
</table>
@ -100,6 +103,14 @@ export default {
this.$router.push('/job/executing?'+this.buildQuery());
},
killProc(proc, index) {
if (confirm("确认杀死该进程?")) {
var id = proc.nodeId + "." + proc.group + "." + proc.jobId + "." + proc.id;
this.$rest.DELETE('job/executing/' + id).do();
this.executings.splice(index, 1);
}
},
buildQuery(){
var params = [];
if (this.groups && this.groups.length > 0) params.push('groups='+this.groups.join(','));

View File

@ -54,6 +54,7 @@ var language = {
'view job list': 'View job list',
'starting time': 'Starting time',
'process ID': 'Process ID',
'kill process': 'Kill process',
'group filter': 'Group filter',
'node filter': 'Node filter',
'select a group': 'Select a group',

View File

@ -55,6 +55,7 @@ var language = {
'view job list': '查看任务列表',
'starting time': '开始时间',
'process ID': '进程ID',
'kill process': '杀死进程',
'group filter': '分组过滤',
'node filter': '节点过滤',

View File

@ -4,9 +4,27 @@ require('semantic-ui/dist/semantic.min.css');
import store from './vuex/store';
import Vue from 'vue';
import Lang from './i18n/language';
// global restful client
import Rest from './libraries/rest-client.js';
import VueRouter from 'vue-router';
import App from './App.vue';
import Dash from './components/Dash.vue';
import Log from './components/Log.vue';
import LogDetail from './components/LogDetail.vue';
import Job from './components/Job.vue';
import JobEdit from './components/JobEdit.vue';
import JobExecuting from './components/JobExecuting.vue';
import Node from './components/Node.vue';
import NodeGroup from './components/NodeGroup.vue';
import NodeGroupEdit from './components/NodeGroupEdit.vue';
import Account from './components/Account.vue';
import AccountEdit from './components/AccountEdit.vue';
import Profile from './components/Profile.vue';
import Login from './components/Login.vue';
Vue.config.debug = true;
import Lang from './i18n/language';
Vue.use((Vue) => {
Vue.prototype.$L = Lang.L
Vue.prototype.$Lang = Lang
@ -18,35 +36,34 @@ Vue.use((Vue) => {
Vue.prototype.$bus = bus;
});
// global restful client
import Rest from './libraries/rest-client.js';
var restApi = new Rest('/v1/', (msg) => {
bus.$emit('error', msg);
}, (msg) => {
bus.$emit('error', msg);
}, {
401: (data, xhr) => { bus.$emit('goLogin') }
});
401: (data, xhr) => {
bus.$emit('goLogin')
}
});
Vue.use((Vue, options) => {
Vue.prototype.$rest = restApi;
}, null);
import VueRouter from 'vue-router';
Vue.use(VueRouter);
Vue.use((Vue) => {
Vue.prototype.$loadConfiguration = () => {
restApi.GET('configurations').
onsucceed(200, (resp) => {
const Config = (Vue, options) => {
Vue.prototype.$appConfig = resp;
}
Vue.use(Config);
bus.$emit('conf_loaded', resp);
}).onfailed((data, xhr) => {
var msg = data ? data : xhr.status + ' ' + xhr.statusText;
bus.$emit('error', msg);
}).do();
restApi.GET('configurations').onsucceed(200, (resp) => {
const Config = (Vue, options) => {
Vue.prototype.$appConfig = resp;
}
Vue.use(Config);
bus.$emit('conf_loaded', resp);
}).onfailed((data, xhr) => {
var msg = data ? data : xhr.status + ' ' + xhr.statusText;
bus.$emit('error', msg);
}).do();
}
});
@ -73,38 +90,23 @@ const onConfigLoaded = (Vue, options) => {
}
Vue.use(onConfigLoaded);
import App from './App.vue';
import Dash from './components/Dash.vue';
import Log from './components/Log.vue';
import LogDetail from './components/LogDetail.vue';
import Job from './components/Job.vue';
import JobEdit from './components/JobEdit.vue';
import JobExecuting from './components/JobExecuting.vue';
import Node from './components/Node.vue';
import NodeGroup from './components/NodeGroup.vue';
import NodeGroupEdit from './components/NodeGroupEdit.vue';
import Account from './components/Account.vue';
import AccountEdit from './components/AccountEdit.vue';
import Profile from './components/Profile.vue';
import Login from './components/Login.vue';
var routes = [
{ path: '/', component: Dash },
{ path: '/log', component: Log },
{ path: '/log/:id', component: LogDetail },
{ path: '/job', component: Job },
{ path: '/job/create', component: JobEdit },
{ path: '/job/edit/:group/:id', component: JobEdit },
{ path: '/job/executing', component: JobExecuting },
{ path: '/node', component: Node },
{ path: '/node/group', component: NodeGroup },
{ path: '/node/group/create', component: NodeGroupEdit },
{ path: '/node/group/:id', component: NodeGroupEdit },
{ path: '/admin/account/list', component: Account },
{ path: '/admin/account/add', component: AccountEdit },
{ path: '/admin/account/edit', component: AccountEdit },
{ path: '/user/setpwd', component: Profile },
{ path: '/login', component: Login }
{path: '/', component: Dash},
{path: '/log', component: Log},
{path: '/log/:id', component: LogDetail},
{path: '/job', component: Job},
{path: '/job/create', component: JobEdit},
{path: '/job/edit/:group/:id', component: JobEdit},
{path: '/job/executing', component: JobExecuting},
{path: '/node', component: Node},
{path: '/node/group', component: NodeGroup},
{path: '/node/group/create', component: NodeGroupEdit},
{path: '/node/group/:id', component: NodeGroupEdit},
{path: '/admin/account/list', component: Account},
{path: '/admin/account/add', component: AccountEdit},
{path: '/admin/account/edit', component: AccountEdit},
{path: '/user/setpwd', component: Profile},
{path: '/login', component: Login}
];
var router = new VueRouter({
@ -118,22 +120,20 @@ bus.$on('goLogin', () => {
});
var initConf = new Promise((resolve) => {
restApi.GET('session?check=1').
onsucceed(200, (resp) => {
restApi.GET('session?check=1').onsucceed(200, (resp) => {
store.commit('enabledAuth', resp.enabledAuth);
store.commit('setEmail', resp.email);
store.commit('setRole', resp.role);
restApi.GET('version').onsucceed(200, (resp)=>{
restApi.GET('version').onsucceed(200, (resp) => {
store.commit('setVersion', resp);
}).do();
restApi.GET('configurations').
onsucceed(200, (resp) => {
restApi.GET('configurations').onsucceed(200, (resp) => {
Vue.use((Vue) => Vue.prototype.$appConfig = resp);
bus.$emit('conf_loaded', resp);
restApi.GET('nodes').onsucceed(200, (resp)=>{
restApi.GET('nodes').onsucceed(200, (resp) => {
var nodes = {};
for (var i in resp) {
nodes[resp[i].id] = resp[i];
@ -153,8 +153,7 @@ var initConf = new Promise((resolve) => {
}
router.push('/login');
resolve()
}).
do();
}).do();
})
initConf.then(() => {