Merge pull request #99 from imxyb/feature/force-kill

Feature/force kill
pull/104/head
Doflatango 2018-09-07 22:18:41 +08:00 committed by GitHub
commit 27507a698f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 491 additions and 125 deletions

View File

@ -120,7 +120,7 @@ func (c *Client) DelLock(key string) error {
}
func IsValidAsKeyPath(s string) bool {
return strings.IndexByte(s, '/') == -1
return strings.IndexAny(s, "/\\") == -1
}
// etcdTimeoutContext return better error info

View File

@ -11,7 +11,7 @@ import (
client "github.com/coreos/etcd/clientv3"
"github.com/fsnotify/fsnotify"
"github.com/go-gomail/gomail"
"github.com/satori/go.uuid"
"github.com/gofrs/uuid"
"github.com/shunfei/cronsun/db"
"github.com/shunfei/cronsun/event"

View File

@ -8,8 +8,8 @@ var (
ErrEmptyJobName = errors.New("Name of job is empty.")
ErrEmptyJobCommand = errors.New("Command of job is empty.")
ErrIllegalJobId = errors.New("Invalid id that includes illegal characters such as '/'.")
ErrIllegalJobGroupName = errors.New("Invalid job group name that includes illegal characters such as '/'.")
ErrIllegalJobId = errors.New("Invalid id that includes illegal characters such as '/' '\\'.")
ErrIllegalJobGroupName = errors.New("Invalid job group name that includes illegal characters such as '/' '\\'.")
ErrEmptyNodeGroupName = errors.New("Name of node group is empty.")
ErrIllegalNodeGroupId = errors.New("Invalid node group id that includes illegal characters such as '/'.")

46
job.go
View File

@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"os/exec"
"os/user"
@ -416,31 +417,17 @@ func (j *Job) Run() bool {
cmd *exec.Cmd
proc *Process
sysProcAttr *syscall.SysProcAttr
err error
)
t := time.Now()
// 用户权限控制
if len(j.User) > 0 {
u, err := user.Lookup(j.User)
sysProcAttr, err = j.CreateCmdAttr()
if err != nil {
j.Fail(t, err.Error())
return false
}
uid, err := strconv.Atoi(u.Uid)
if err != nil {
j.Fail(t, "not support run with user on windows")
return false
}
if uid != _Uid {
gid, _ := strconv.Atoi(u.Gid)
sysProcAttr = &syscall.SysProcAttr{
Credential: &syscall.Credential{
Uid: uint32(uid),
Gid: uint32(gid),
},
}
}
}
// 超时控制
@ -722,3 +709,28 @@ func (j *Job) ShortName() string {
return string(names[:10]) + "..."
}
func (j *Job) CreateCmdAttr() (*syscall.SysProcAttr, error) {
var sysProcAttr *syscall.SysProcAttr
u, err := user.Lookup(j.User)
if err != nil {
return nil, err
}
uid, err := strconv.Atoi(u.Uid)
if err != nil {
return nil, errors.New("not support run with user on windows")
}
if uid != _Uid {
gid, _ := strconv.Atoi(u.Gid)
sysProcAttr = &syscall.SysProcAttr{
Credential: &syscall.Credential{
Uid: uint32(uid),
Gid: uint32(gid),
},
}
}
return sysProcAttr, nil
}

View File

@ -1,16 +1,17 @@
package node
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path"
"strconv"
"strings"
"syscall"
"time"
client "github.com/coreos/etcd/clientv3"
"github.com/shunfei/cronsun"
"github.com/shunfei/cronsun/conf"
"github.com/shunfei/cronsun/log"
@ -420,6 +421,14 @@ func (n *Node) groupRmNode(g, og *cronsun.Group) {
n.groups[g.ID] = g
}
func (n *Node) KillExcutingProc(process *cronsun.Process) {
pid, _ := strconv.Atoi(process.ID)
if err := syscall.Kill(pid, syscall.SIGKILL); err != nil {
log.Warnf("process:[%d] force kill failed, error:[%s]\n", pid, err)
return
}
}
func (n *Node) watchJobs() {
rch := cronsun.WatchJobs()
for wresp := range rch {
@ -452,6 +461,34 @@ func (n *Node) watchJobs() {
}
}
func (n *Node) watchExcutingProc() {
rch := cronsun.WatchProcs(n.ID)
for wresp := range rch {
for _, ev := range wresp.Events {
switch {
case ev.IsModify():
key := string(ev.Kv.Key)
process, err := cronsun.GetProcFromKey(key)
if err != nil {
log.Warnf("err: %s, kv: %s", err.Error(), ev.Kv.String())
continue
}
val := string(ev.Kv.Value)
err = json.Unmarshal([]byte(val), process)
if err != nil {
continue
}
if process.Killed {
n.KillExcutingProc(process)
}
}
}
}
}
func (n *Node) watchGroups() {
rch := cronsun.WatchGroups()
for wresp := range rch {
@ -531,6 +568,7 @@ func (n *Node) Run() (err error) {
n.Cron.Start()
go n.watchJobs()
go n.watchExcutingProc()
go n.watchGroups()
go n.watchOnce()
go n.watchCsctl()

27
proc.go
View File

@ -9,6 +9,8 @@ import (
client "github.com/coreos/etcd/clientv3"
"encoding/json"
"github.com/shunfei/cronsun/conf"
"github.com/shunfei/cronsun/log"
)
@ -131,7 +133,8 @@ type Process struct {
JobID string `json:"jobId"`
Group string `json:"group"`
NodeID string `json:"nodeId"`
Time time.Time `json:"time"` // 开始执行时间
Time time.Time `json:"time"` // 开始执行时间
Killed bool `json:"killed"` // 是否强制杀死
running int32
hasPut int32
@ -160,8 +163,14 @@ func (p *Process) Key() string {
return conf.Config.Proc + p.NodeID + "/" + p.Group + "/" + p.JobID + "/" + p.ID
}
func (p *Process) Val() string {
return p.Time.Format(time.RFC3339)
func (p *Process) Val() (string, error) {
val := struct {
Time string `json:"time"`
Killed bool `json:"killed"`
}{p.Time.Format(time.RFC3339), p.Killed}
str, err := json.Marshal(val)
return string(str), err
}
// 获取结点正在执行任务的数量
@ -187,13 +196,17 @@ func (p *Process) put() (err error) {
}
id := lID.get()
val, err := p.Val()
if err != nil {
return err
}
if id < 0 {
if _, err = DefalutClient.Put(p.Key(), p.Val()); err != nil {
if _, err = DefalutClient.Put(p.Key(), val); err != nil {
return
}
}
_, err = DefalutClient.Put(p.Key(), p.Val(), client.WithLease(id))
_, err = DefalutClient.Put(p.Key(), val, client.WithLease(id))
return
}
@ -254,3 +267,7 @@ func (p *Process) Stop() {
log.Warnf("proc del[%s] err: %s", p.Key(), err.Error())
}
}
func WatchProcs(nid string) client.WatchChan {
return DefalutClient.Watch(conf.Config.Proc+nid, client.WithPrefix())
}

View File

@ -350,7 +350,12 @@ func (j *Job) GetExecutingJob(ctx *Context) {
if !opt.Match(proc) {
continue
}
proc.Time, _ = time.Parse(time.RFC3339, string(gresp.Kvs[i].Value))
val := string(gresp.Kvs[i].Value)
var p cronsun.Process
json.Unmarshal([]byte(val), &p)
proc.Time, _ = time.Parse(time.RFC3339, p.Time)
list = append(list, proc)
}
@ -358,6 +363,48 @@ func (j *Job) GetExecutingJob(ctx *Context) {
outJSON(ctx.W, list)
}
func (j *Job) KillExecutingJob(ctx *Context) {
vars := mux.Vars(ctx.R)
id := strings.TrimSpace(vars["id"])
id = strings.Replace(id, ".", "/", -1)
procKey := conf.Config.Proc + id
resp, err := cronsun.DefalutClient.Get(procKey)
if err != nil {
outJSONWithCode(ctx.W, http.StatusInternalServerError, err.Error())
return
}
if len(resp.Kvs) < 1 {
outJSONWithCode(ctx.W, http.StatusInternalServerError, nil)
return
}
var procVal cronsun.Process
err = json.Unmarshal(resp.Kvs[0].Value, &procVal)
if err != nil {
outJSONWithCode(ctx.W, http.StatusInternalServerError, err.Error())
return
}
procVal.Killed = true
newVal, err := json.Marshal(procVal)
if err != nil {
outJSONWithCode(ctx.W, http.StatusInternalServerError, err.Error())
return
}
_, err = cronsun.DefalutClient.Put(procKey, string(newVal))
if err != nil {
outJSONWithCode(ctx.W, http.StatusInternalServerError, err.Error())
return
}
outJSONWithCode(ctx.W, http.StatusOK, "杀死进程成功")
}
type ProcFetchOptions struct {
Groups []string
NodeIds []string

View File

@ -76,6 +76,10 @@ func initRouters() (s *http.Server, err error) {
h = NewAuthHandler(jobHandler.GetExecutingJob, cronsun.Reporter)
subrouter.Handle("/job/executing", h).Methods("GET")
// kill an executing job
h = NewAuthHandler(jobHandler.KillExecutingJob, cronsun.Developer)
subrouter.Handle("/job/executing/{id}", h).Methods("DELETE")
// get job log list
h = NewAuthHandler(jobLogHandler.GetList, cronsun.Reporter)
subrouter.Handle("/logs", h).Methods("GET")

File diff suppressed because one or more lines are too long

View File

@ -61,7 +61,7 @@
<div id="app">
<div id="initloader"></div>
</div>
<script src="build.js?v=6fe9c2f"></script>
<script src="build.js?v=ed6e655"></script>
</body>
</html>

View File

@ -13,7 +13,7 @@
"chart.js": "^2.5.0",
"jquery": "^3.1.1",
"jquery.cookie": "^1.4.1",
"semantic-ui": "^2.2.7",
"semantic-ui": "^2.3.3",
"vue": "^2.3.4",
"vue-router": "^2.2.1",
"vuex": "^2.3.1"

View File

@ -211,7 +211,7 @@ export default {
}
}
}).
onfailed((msg)=> vm.$bus.$emit('error', data)).
onfailed((msg)=> vm.$bus.$emit('error', msg)).
do();
}

View File

@ -1,5 +1,6 @@
<style scope>
.clearfix:after {content:""; clear:both; display:table;}
.kill-proc-btn { color:red;cursor: pointer;}
</style>
<template>
<div>
@ -33,6 +34,7 @@
<th class="center aligned">{{$L('node')}}</th>
<th class="center aligned">{{$L('process ID')}}</th>
<th class="center aligned">{{$L('starting time')}}</th>
<th class="center aligned">{{$L('operation')}}</th>
</tr>
</thead>
<tbody>
@ -42,6 +44,7 @@
<td class="center aligned">{{$store.getters.hostshows(proc.nodeId)}}</td>
<td class="center aligned">{{proc.id}}</td>
<td class="center aligned">{{proc.time}}</td>
<td class="center aligned"><a class="kill-proc-btn" v-on:click="killProc(proc, index)">{{$L('kill process')}}</a></td>
</tr>
</tbody>
</table>
@ -100,6 +103,18 @@ export default {
this.$router.push('/job/executing?'+this.buildQuery());
},
killProc(proc, index) {
if (confirm(this.$L("whether to kill the process"))) {
var id = proc.nodeId + "." + proc.group + "." + proc.jobId + "." + proc.id;
this.$rest.DELETE('job/executing/' + id)
.onsucceed(200, (resp) => {
this.executings.splice(index, 1);
})
.onfailed((resp)=>{vm.$bus.$emit('error', resp)})
.do();
}
},
buildQuery(){
var params = [];
if (this.groups && this.groups.length > 0) params.push('groups='+this.groups.join(','));

View File

@ -54,6 +54,7 @@ var language = {
'view job list': 'View job list',
'starting time': 'Starting time',
'process ID': 'Process ID',
'kill process': 'Kill process',
'group filter': 'Group filter',
'node filter': 'Node filter',
'select a group': 'Select a group',
@ -83,6 +84,7 @@ var language = {
'job type': 'Job type',
'common job': 'Common',
'single node single process': 'Single Node Single Process',
'whether to kill the process': 'Whether to kill the process',
'group level common': 'Group Level Common',
'group level common help': 'It is difficult to name it, a simple way to explain is that we merge all selected nodes as a large node, then the job\'s behavior will looks like a Common job in a single node.',
'warning on': 'Warning ON',

View File

@ -55,6 +55,7 @@ var language = {
'view job list': '查看任务列表',
'starting time': '开始时间',
'process ID': '进程ID',
'kill process': '杀死进程',
'group filter': '分组过滤',
'node filter': '节点过滤',
@ -85,6 +86,7 @@ var language = {
'job type': '任务类型',
'common job': '普通任务',
'single node single process': '单机单进程',
'whether to kill the process': '是否杀死该进程',
'group level common': '组级别普通任务',
'group level common help': '暂时没想到好名字,一个比较简单的说明是,把所有选中的节点视为一个大节点,那么该类型的任务就相当于在单个节点上的普通任务',
'warning on': '开启报警',

View File

@ -4,9 +4,27 @@ require('semantic-ui/dist/semantic.min.css');
import store from './vuex/store';
import Vue from 'vue';
import Lang from './i18n/language';
// global restful client
import Rest from './libraries/rest-client.js';
import VueRouter from 'vue-router';
import App from './App.vue';
import Dash from './components/Dash.vue';
import Log from './components/Log.vue';
import LogDetail from './components/LogDetail.vue';
import Job from './components/Job.vue';
import JobEdit from './components/JobEdit.vue';
import JobExecuting from './components/JobExecuting.vue';
import Node from './components/Node.vue';
import NodeGroup from './components/NodeGroup.vue';
import NodeGroupEdit from './components/NodeGroupEdit.vue';
import Account from './components/Account.vue';
import AccountEdit from './components/AccountEdit.vue';
import Profile from './components/Profile.vue';
import Login from './components/Login.vue';
Vue.config.debug = true;
import Lang from './i18n/language';
Vue.use((Vue) => {
Vue.prototype.$L = Lang.L
Vue.prototype.$Lang = Lang
@ -18,35 +36,34 @@ Vue.use((Vue) => {
Vue.prototype.$bus = bus;
});
// global restful client
import Rest from './libraries/rest-client.js';
var restApi = new Rest('/v1/', (msg) => {
bus.$emit('error', msg);
}, (msg) => {
bus.$emit('error', msg);
}, {
401: (data, xhr) => { bus.$emit('goLogin') }
});
401: (data, xhr) => {
bus.$emit('goLogin')
}
});
Vue.use((Vue, options) => {
Vue.prototype.$rest = restApi;
}, null);
import VueRouter from 'vue-router';
Vue.use(VueRouter);
Vue.use((Vue) => {
Vue.prototype.$loadConfiguration = () => {
restApi.GET('configurations').
onsucceed(200, (resp) => {
const Config = (Vue, options) => {
Vue.prototype.$appConfig = resp;
}
Vue.use(Config);
bus.$emit('conf_loaded', resp);
}).onfailed((data, xhr) => {
var msg = data ? data : xhr.status + ' ' + xhr.statusText;
bus.$emit('error', msg);
}).do();
restApi.GET('configurations').onsucceed(200, (resp) => {
const Config = (Vue, options) => {
Vue.prototype.$appConfig = resp;
}
Vue.use(Config);
bus.$emit('conf_loaded', resp);
}).onfailed((data, xhr) => {
var msg = data ? data : xhr.status + ' ' + xhr.statusText;
bus.$emit('error', msg);
}).do();
}
});
@ -73,38 +90,23 @@ const onConfigLoaded = (Vue, options) => {
}
Vue.use(onConfigLoaded);
import App from './App.vue';
import Dash from './components/Dash.vue';
import Log from './components/Log.vue';
import LogDetail from './components/LogDetail.vue';
import Job from './components/Job.vue';
import JobEdit from './components/JobEdit.vue';
import JobExecuting from './components/JobExecuting.vue';
import Node from './components/Node.vue';
import NodeGroup from './components/NodeGroup.vue';
import NodeGroupEdit from './components/NodeGroupEdit.vue';
import Account from './components/Account.vue';
import AccountEdit from './components/AccountEdit.vue';
import Profile from './components/Profile.vue';
import Login from './components/Login.vue';
var routes = [
{ path: '/', component: Dash },
{ path: '/log', component: Log },
{ path: '/log/:id', component: LogDetail },
{ path: '/job', component: Job },
{ path: '/job/create', component: JobEdit },
{ path: '/job/edit/:group/:id', component: JobEdit },
{ path: '/job/executing', component: JobExecuting },
{ path: '/node', component: Node },
{ path: '/node/group', component: NodeGroup },
{ path: '/node/group/create', component: NodeGroupEdit },
{ path: '/node/group/:id', component: NodeGroupEdit },
{ path: '/admin/account/list', component: Account },
{ path: '/admin/account/add', component: AccountEdit },
{ path: '/admin/account/edit', component: AccountEdit },
{ path: '/user/setpwd', component: Profile },
{ path: '/login', component: Login }
{path: '/', component: Dash},
{path: '/log', component: Log},
{path: '/log/:id', component: LogDetail},
{path: '/job', component: Job},
{path: '/job/create', component: JobEdit},
{path: '/job/edit/:group/:id', component: JobEdit},
{path: '/job/executing', component: JobExecuting},
{path: '/node', component: Node},
{path: '/node/group', component: NodeGroup},
{path: '/node/group/create', component: NodeGroupEdit},
{path: '/node/group/:id', component: NodeGroupEdit},
{path: '/admin/account/list', component: Account},
{path: '/admin/account/add', component: AccountEdit},
{path: '/admin/account/edit', component: AccountEdit},
{path: '/user/setpwd', component: Profile},
{path: '/login', component: Login}
];
var router = new VueRouter({
@ -118,22 +120,20 @@ bus.$on('goLogin', () => {
});
var initConf = new Promise((resolve) => {
restApi.GET('session?check=1').
onsucceed(200, (resp) => {
restApi.GET('session?check=1').onsucceed(200, (resp) => {
store.commit('enabledAuth', resp.enabledAuth);
store.commit('setEmail', resp.email);
store.commit('setRole', resp.role);
restApi.GET('version').onsucceed(200, (resp)=>{
restApi.GET('version').onsucceed(200, (resp) => {
store.commit('setVersion', resp);
}).do();
restApi.GET('configurations').
onsucceed(200, (resp) => {
restApi.GET('configurations').onsucceed(200, (resp) => {
Vue.use((Vue) => Vue.prototype.$appConfig = resp);
bus.$emit('conf_loaded', resp);
restApi.GET('nodes').onsucceed(200, (resp)=>{
restApi.GET('nodes').onsucceed(200, (resp) => {
var nodes = {};
for (var i in resp) {
nodes[resp[i].id] = resp[i];
@ -153,8 +153,7 @@ var initConf = new Promise((resolve) => {
}
router.push('/login');
resolve()
}).
do();
}).do();
})
initConf.then(() => {