查看执行中的任务

pull/1/head
Doflatango 8 years ago committed by miraclesu
parent f0f33f4a91
commit e2974c40d6

@ -220,7 +220,6 @@ func (j *Job) Run() {
cmd = exec.Command(j.cmd[0], j.cmd[1:]...)
}
cmd.SysProcAttr = sysProcAttr
var b bytes.Buffer
cmd.Stdout = &b
cmd.Stderr = &b

@ -2,11 +2,13 @@ package models
import (
"context"
"fmt"
"sync"
"time"
client "github.com/coreos/etcd/clientv3"
"strings"
"sunteng/commons/log"
"sunteng/cronsun/conf"
)
@ -88,7 +90,7 @@ func (l *leaseID) set() error {
}
func (l *leaseID) keepAlive() {
duration := time.Duration(l.ttl)
duration := time.Duration(l.ttl) * time.Second
timer := time.NewTimer(duration)
for {
select {
@ -124,19 +126,36 @@ func (l *leaseID) keepAlive() {
}
// 当前执行中的任务信息
// key: /cronsun/proc/node/job id/pid
// key: /cronsun/proc/node/group/jobId/pid
// value: 开始执行时间
// key 会自动过期,防止进程意外退出后没有清除相关 key过期时间可配置
type Process struct {
ID string `json:"id"`
JobID string `json:"job_id"`
ID string `json:"id"` // pid
JobID string `json:"jobId"`
Group string `json:"group"`
NodeID string `json:"node_id"`
Time time.Time `json:"name"` // 开始执行时间
NodeID string `json:"nodeId"`
Time time.Time `json:"time"` // 开始执行时间
running bool
}
func GetProcFromKey(key string) (proc *Process, err error) {
ss := strings.Split(key, "/")
var sslen = len(ss)
if sslen < 5 {
err = fmt.Errorf("invalid proc key [%s]", err.Error())
return
}
proc = &Process{
ID: ss[sslen-1],
JobID: ss[sslen-2],
Group: ss[sslen-3],
NodeID: ss[sslen-4],
}
return
}
func (p *Process) Key() string {
return conf.Config.Proc + p.NodeID + "/" + p.Group + "/" + p.JobID + "/" + p.ID
}
@ -147,7 +166,7 @@ func (p *Process) Val() string {
// 获取结点正在执行任务的数量
func (j *Job) CountRunning() (int64, error) {
resp, err := DefalutClient.Get(conf.Config.Proc + j.runOn + "/" + j.Group + "/" + j.ID)
resp, err := DefalutClient.Get(conf.Config.Proc+j.runOn+"/"+j.Group+"/"+j.ID, client.WithPrefix(), client.WithCountOnly())
if err != nil {
return 0, err
}

@ -53,3 +53,13 @@ func outJSONWithCode(w http.ResponseWriter, httpCode int, data interface{}) {
func outJSON(w http.ResponseWriter, data interface{}) {
outJSONWithCode(w, http.StatusOK, data)
}
func InStringArray(k string, ss []string) bool {
for i := range ss {
if ss[i] == k {
return true
}
}
return false
}

@ -12,6 +12,7 @@ import (
"sunteng/commons/log"
"sunteng/cronsun/conf"
"sunteng/cronsun/models"
"time"
)
type Job struct{}
@ -196,3 +197,64 @@ func (j *Job) GetList(w http.ResponseWriter, r *http.Request) {
outJSON(w, jobList)
}
func (j *Job) GetExecutingJob(w http.ResponseWriter, r *http.Request) {
opt := &ProcFetchOptions{
Groups: GetStringArrayFromQuery("groups", ",", r),
NodeIds: GetStringArrayFromQuery("nodes", ",", r),
JobIds: GetStringArrayFromQuery("jobs", ",", r),
}
gresp, err := models.DefalutClient.Get(conf.Config.Proc, clientv3.WithPrefix())
if err != nil {
outJSONWithCode(w, http.StatusInternalServerError, err.Error())
return
}
var list = make([]*models.Process, 0, 8)
for i := range gresp.Kvs {
proc, err := models.GetProcFromKey(string(gresp.Kvs[i].Key))
if err != nil {
log.Error("Failed to unmarshal Proc from key: ", err.Error())
continue
}
if !opt.Match(proc) {
continue
}
proc.Time, _ = time.Parse(time.RFC3339, string(gresp.Kvs[i].Value))
list = append(list, proc)
}
sort.Sort(ByProcTime(list))
outJSON(w, list)
}
type ProcFetchOptions struct {
Groups []string
NodeIds []string
JobIds []string
}
func (opt *ProcFetchOptions) Match(proc *models.Process) bool {
if len(opt.Groups) > 0 && !InStringArray(proc.Group, opt.Groups) {
return false
}
if len(opt.JobIds) > 0 && !InStringArray(proc.JobID, opt.JobIds) {
return false
}
if len(opt.NodeIds) > 0 && !InStringArray(proc.NodeID, opt.NodeIds) {
return false
}
return true
}
type ByProcTime []*models.Process
func (a ByProcTime) Len() int { return len(a) }
func (a ByProcTime) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByProcTime) Less(i, j int) bool { return a[i].Time.After(a[j].Time) }

@ -38,6 +38,10 @@ func InitRouters() (s *http.Server, err error) {
h = BaseHandler{Handle: jobHandler.DeleteJob}
subrouter.Handle("/job/{group}-{id}", h).Methods("DELETE")
// query executing job
h = BaseHandler{Handle: jobHandler.GetExecutingJob}
subrouter.Handle("/job/executing", h).Methods("GET")
// get job log list
h = BaseHandler{Handle: jobLogHandler.GetList}
subrouter.Handle("/logs", h).Methods("GET")

@ -3,9 +3,10 @@
</style>
<template>
<div>
<div class="clearfix">
<div class="clearfix" style="margin-bottom: 20px;">
<router-link class="ui left floated button" to="/job/executing">查看执行中的任务</router-link>
<button class="ui left floated icon button" v-on:click="refresh"><i class="refresh icon"></i></button>
<router-link class="ui right floated primary button" to="/job/create"><i class="add to calendar icon"></i> 新任务</router-link>
<button class="ui right floated icon button" v-on:click="refresh"><i class="refresh icon"></i></button>
</div>
<form class="ui form">
<div class="field">

@ -0,0 +1,140 @@
<style scope>
.clearfix:after {content:""; clear:both; display:table;}
</style>
<template>
<div>
<div class="clearfix" style="margin-bottom: 20px;">
<router-link class="ui left floated button" to="/job">查看任务列表</router-link>
<button class="ui left floated icon button" v-on:click="refresh"><i class="refresh icon"></i></button>
<router-link class="ui right floated primary button" to="/job/create"><i class="add to calendar icon"></i> 新任务</router-link>
</div>
<form class="ui form" v-bind:class="{loading:loading}" v-on:submit.prevent>
<div class="field">
<label>任务 ID</label>
<input type="text" ref="ids" v-model:value="ids" placeholder="多个 ID 使用英文逗号分隔"/>
</div>
<div class="field">
<label>选择分组</label>
<Dropdown title="选择分组" v-bind:items="prefetchs.groups" v-on:change="changeGroup" :selected="groups" :multiple="true"/>
</div>
<div class="field">
<label>选择节点</label>
<Dropdown title="选择节点" v-bind:items="prefetchs.nodes" v-on:change="changeNodes" :selected="nodes" :multiple="true"/>
</div>
<div class="field">
<button class="fluid ui button" type="button" v-on:click="submit">查询</button>
</div>
</form>
<table class="ui hover blue table" v-if="executings.length > 0">
<thead>
<tr>
<th class="center aligned">任务ID</th>
<th width="200px" class="center aligned">分组</th>
<th class="center aligned">节点</th>
<th class="center aligned">进程ID</th>
<th class="center aligned">开始时间</th>
</tr>
</thead>
<tbody>
<tr v-for="(proc, index) in executings">
<td class="center aligned"><router-link :to="'/job/edit/'+proc.group+'/'+proc.jobId">{{proc.jobId}}</router-link></td>
<td class="center aligned">{{proc.group}}</td>
<td class="center aligned">{{proc.nodeId}}</td>
<td class="center aligned">{{proc.id}}</td>
<td class="center aligned">{{proc.time}}</td>
</tr>
</tbody>
</table>
</div>
</template>
<script>
import Dropdown from './basic/Dropdown.vue';
import {split} from '../libraries/functions';
export default {
name: 'job-executing',
data(){
return {
prefetchs: {groups: [], nodes: []},
loading: false,
groups: [],
ids: '',
nodes: [],
executings: []
}
},
mounted(){
var vm = this;
this.groups = split(this.$route.query.groups, ',');
this.nodes = split(this.$route.query.nodes, ',');
this.ids = this.$route.query.ids || '';
this.$rest.GET('job/groups').onsucceed(200, (resp)=>{
!resp.includes('default') && resp.unshift('default');
vm.prefetchs.groups = resp;
this.fetchList(this.buildQuery());
}).do();
this.$rest.GET('nodes').onsucceed(200, (resp)=>{
for (var i in resp) {
vm.prefetchs.nodes.push(resp[i].id);
}
}).do();
},
watch: {
'$route': function(){
this.groups = split(this.$route.query.groups, ',');
this.nodes = split(this.$route.query.nodes, ',');
this.ids = this.$route.query.ids || '';
this.fetchList(this.buildQuery());
}
},
methods: {
changeGroup(val, text){
this.groups = split(val, ',');
},
changeNodes(val){
this.nodes = split(val, ',');
},
submit(){
this.$router.push('/job/executing?'+this.buildQuery());
},
buildQuery(){
var params = [];
if (this.groups && this.groups.length > 0) params.push('groups='+this.groups.join(','));
if (this.nodes && this.nodes.length > 0) params.push('nodes='+this.nodes.join(','));
if (this.ids) params.push('ids='+this.ids);
return params.join('&');
},
fetchList(query){
var vm = this;
this.loading = true;
this.$rest.GET('job/executing?'+query).
onsucceed(200, (resp)=>{
vm.executings = resp;
vm.$nextTick(()=>{
$(vm.$el).find('table .ui.dropdown').dropdown();
});
}).
onend(()=>{vm.loading = false}).
do();
},
refresh(){
this.fetchList(this.buildQuery());
}
},
components: {
Dropdown
}
}
</script>

@ -48,4 +48,9 @@ var formatNumber = function(i, len){
return '0'.repeat(len-n) + i.toString();
}
export {formatDuration, formatTime, formatNumber};
var split = function(str, sep){
if (typeof str != 'string' || str.length === 0) return [];
return str.split(sep || ',');
}
export {formatDuration, formatTime, formatNumber, split};

@ -27,6 +27,7 @@ import Log from './components/Log.vue';
import LogDetail from './components/LogDetail.vue';
import Job from './components/Job.vue';
import JobEdit from './components/JobEdit.vue';
import JobExecuting from './components/JobExecuting.vue';
import Node from './components/Node.vue';
import NodeGroup from './components/NodeGroup.vue';
import NodeGroupEdit from './components/NodeGroupEdit.vue';
@ -38,6 +39,7 @@ var routes = [
{path: '/job', component: Job},
{path: '/job/create', component: JobEdit},
{path: '/job/edit/:group/:id', component: JobEdit},
{path: '/job/executing', component: JobExecuting},
{path: '/node', component: Node},
{path: '/node/group', component: NodeGroup},
{path: '/node/group/create', component: NodeGroupEdit},

Loading…
Cancel
Save