mirror of https://github.com/shunfei/cronsun
立即执行任务功能
parent
01b4ba5d6b
commit
e254eb5a2c
|
@ -115,13 +115,13 @@ func (n *Node) loadJobs() (err error) {
|
|||
|
||||
func (n *Node) addJob(job *models.Job, notice bool) {
|
||||
n.link.addJob(job)
|
||||
n.jobs[job.ID] = job
|
||||
|
||||
cmds := job.Cmds(n.ID, n.groups)
|
||||
if len(cmds) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
n.jobs[job.ID] = job
|
||||
for _, cmd := range cmds {
|
||||
n.addCmd(cmd, notice)
|
||||
}
|
||||
|
@ -400,7 +400,7 @@ func (n *Node) watchOnce() {
|
|||
}
|
||||
|
||||
job, ok := n.jobs[models.GetIDFromKey(string(ev.Kv.Key))]
|
||||
if !ok || job.IsRunOn(n.ID, n.groups) {
|
||||
if !ok || !job.IsRunOn(n.ID, n.groups) {
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -427,6 +427,7 @@ func (n *Node) Run() (err error) {
|
|||
n.Cron.Start()
|
||||
go n.watchJobs()
|
||||
go n.watchGroups()
|
||||
go n.watchOnce()
|
||||
n.Node.On()
|
||||
return
|
||||
}
|
||||
|
|
77
web/base.go
77
web/base.go
|
@ -6,6 +6,9 @@ import (
|
|||
"fmt"
|
||||
"net/http"
|
||||
"runtime/debug"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"sunteng/commons/log"
|
||||
)
|
||||
|
@ -54,6 +57,44 @@ func outJSON(w http.ResponseWriter, data interface{}) {
|
|||
outJSONWithCode(w, http.StatusOK, data)
|
||||
}
|
||||
|
||||
func getStringArrayFromQuery(name, sep string, r *http.Request) (arr []string) {
|
||||
val := strings.TrimSpace(r.FormValue(name))
|
||||
if len(val) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
return strings.Split(val, sep)
|
||||
}
|
||||
|
||||
func getPage(page string) int {
|
||||
p, err := strconv.Atoi(page)
|
||||
if err != nil || p < 1 {
|
||||
p = 1
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
func getPageSize(ps string) int {
|
||||
p, err := strconv.Atoi(ps)
|
||||
if err != nil || p < 1 {
|
||||
p = 50
|
||||
} else if p > 200 {
|
||||
p = 200
|
||||
}
|
||||
return p
|
||||
}
|
||||
|
||||
func getTime(t string) time.Time {
|
||||
t = strings.TrimSpace(t)
|
||||
time, _ := time.Parse("2006-01-02", t)
|
||||
return time
|
||||
}
|
||||
|
||||
func getStringVal(n string, r *http.Request) string {
|
||||
return strings.TrimSpace(r.FormValue(n))
|
||||
}
|
||||
|
||||
func InStringArray(k string, ss []string) bool {
|
||||
for i := range ss {
|
||||
if ss[i] == k {
|
||||
|
@ -63,3 +104,39 @@ func InStringArray(k string, ss []string) bool {
|
|||
|
||||
return false
|
||||
}
|
||||
|
||||
func UniqueStringArray(a []string) []string {
|
||||
al := len(a)
|
||||
if al == 0 {
|
||||
return a
|
||||
}
|
||||
|
||||
ret := make([]string, al)
|
||||
index := 0
|
||||
|
||||
loopa:
|
||||
for i := 0; i < al; i++ {
|
||||
for j := 0; j < index; j++ {
|
||||
if a[i] == ret[j] {
|
||||
continue loopa
|
||||
}
|
||||
}
|
||||
ret[index] = a[i]
|
||||
index++
|
||||
}
|
||||
|
||||
return ret[:index]
|
||||
}
|
||||
|
||||
// 返回存在于 a 且不存在于 b 中的元素集合
|
||||
func SubtractStringArray(a, b []string) (c []string) {
|
||||
c = []string{}
|
||||
|
||||
for _, _a := range a {
|
||||
if !InStringArray(_a, b) {
|
||||
c = append(c, _a)
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
|
62
web/job.go
62
web/job.go
|
@ -198,11 +198,67 @@ func (j *Job) GetList(w http.ResponseWriter, r *http.Request) {
|
|||
outJSON(w, jobList)
|
||||
}
|
||||
|
||||
func (j *Job) GetJobNodes(w http.ResponseWriter, r *http.Request) {
|
||||
vars := mux.Vars(r)
|
||||
job, err := models.GetJob(vars["group"], vars["id"])
|
||||
var statusCode int
|
||||
if err != nil {
|
||||
if err == models.ErrNotFound {
|
||||
statusCode = http.StatusNotFound
|
||||
} else {
|
||||
statusCode = http.StatusInternalServerError
|
||||
}
|
||||
outJSONWithCode(w, statusCode, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
var nodes []string
|
||||
var exNodes []string
|
||||
groups, err := models.GetGroups("")
|
||||
if err != nil {
|
||||
outJSONWithCode(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
for i := range job.Rules {
|
||||
inNodes := append(nodes, job.Rules[i].NodeIDs...)
|
||||
for _, gid := range job.Rules[i].GroupIDs {
|
||||
if g, ok := groups[gid]; ok {
|
||||
inNodes = append(inNodes, g.NodeIDs...)
|
||||
}
|
||||
}
|
||||
exNodes = append(exNodes, job.Rules[i].ExcludeNodeIDs...)
|
||||
inNodes = SubtractStringArray(inNodes, exNodes)
|
||||
nodes = append(nodes, inNodes...)
|
||||
}
|
||||
|
||||
outJSON(w, UniqueStringArray(nodes))
|
||||
}
|
||||
|
||||
func (j *Job) JobExecute(w http.ResponseWriter, r *http.Request) {
|
||||
vars := mux.Vars(r)
|
||||
group := strings.TrimSpace(vars["group"])
|
||||
id := strings.TrimSpace(vars["id"])
|
||||
if len(group) == 0 || len(id) == 0 {
|
||||
outJSONWithCode(w, http.StatusBadRequest, "Invalid job id or group.")
|
||||
return
|
||||
}
|
||||
|
||||
node := getStringVal("node", r)
|
||||
err := models.PutOnce(group, id, node)
|
||||
if err != nil {
|
||||
outJSONWithCode(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
outJSONWithCode(w, http.StatusNoContent, nil)
|
||||
}
|
||||
|
||||
func (j *Job) GetExecutingJob(w http.ResponseWriter, r *http.Request) {
|
||||
opt := &ProcFetchOptions{
|
||||
Groups: GetStringArrayFromQuery("groups", ",", r),
|
||||
NodeIds: GetStringArrayFromQuery("nodes", ",", r),
|
||||
JobIds: GetStringArrayFromQuery("jobs", ",", r),
|
||||
Groups: getStringArrayFromQuery("groups", ",", r),
|
||||
NodeIds: getStringArrayFromQuery("nodes", ",", r),
|
||||
JobIds: getStringArrayFromQuery("jobs", ",", r),
|
||||
}
|
||||
|
||||
gresp, err := models.DefalutClient.Get(conf.Config.Proc, clientv3.WithPrefix())
|
||||
|
|
|
@ -2,7 +2,6 @@ package web
|
|||
|
||||
import (
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
|
@ -44,9 +43,9 @@ func (jl *JobLog) GetDetail(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
func (jl *JobLog) GetList(w http.ResponseWriter, r *http.Request) {
|
||||
nodes := GetStringArrayFromQuery("nodes", ",", r)
|
||||
names := GetStringArrayFromQuery("names", ",", r)
|
||||
ids := GetStringArrayFromQuery("ids", ",", r)
|
||||
nodes := getStringArrayFromQuery("nodes", ",", r)
|
||||
names := getStringArrayFromQuery("names", ",", r)
|
||||
ids := getStringArrayFromQuery("ids", ",", r)
|
||||
begin := getTime(r.FormValue("begin"))
|
||||
end := getTime(r.FormValue("end"))
|
||||
page := getPage(r.FormValue("page"))
|
||||
|
@ -112,37 +111,3 @@ func (jl *JobLog) GetList(w http.ResponseWriter, r *http.Request) {
|
|||
pager.Total = int(math.Ceil(float64(pager.Total) / float64(pageSize)))
|
||||
outJSON(w, pager)
|
||||
}
|
||||
|
||||
func GetStringArrayFromQuery(name, sep string, r *http.Request) (arr []string) {
|
||||
val := strings.TrimSpace(r.FormValue(name))
|
||||
if len(val) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
return strings.Split(val, sep)
|
||||
}
|
||||
|
||||
func getPage(page string) int {
|
||||
p, err := strconv.Atoi(page)
|
||||
if err != nil || p < 1 {
|
||||
p = 1
|
||||
}
|
||||
|
||||
return p
|
||||
}
|
||||
|
||||
func getPageSize(ps string) int {
|
||||
p, err := strconv.Atoi(ps)
|
||||
if err != nil || p < 1 {
|
||||
p = 50
|
||||
} else if p > 200 {
|
||||
p = 200
|
||||
}
|
||||
return p
|
||||
}
|
||||
|
||||
func getTime(t string) time.Time {
|
||||
t = strings.TrimSpace(t)
|
||||
time, _ := time.Parse("2006-01-02", t)
|
||||
return time
|
||||
}
|
||||
|
|
|
@ -38,6 +38,12 @@ func InitRouters() (s *http.Server, err error) {
|
|||
h = BaseHandler{Handle: jobHandler.DeleteJob}
|
||||
subrouter.Handle("/job/{group}-{id}", h).Methods("DELETE")
|
||||
|
||||
h = BaseHandler{Handle: jobHandler.GetJobNodes}
|
||||
subrouter.Handle("/job/{group}-{id}/nodes", h).Methods("GET")
|
||||
|
||||
h = BaseHandler{Handle: jobHandler.JobExecute}
|
||||
subrouter.Handle("/job/{group}-{id}/execute", h).Methods("PUT")
|
||||
|
||||
// query executing job
|
||||
h = BaseHandler{Handle: jobHandler.GetExecutingJob}
|
||||
subrouter.Handle("/job/executing", h).Methods("GET")
|
||||
|
|
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
|
@ -0,0 +1,89 @@
|
|||
<template>
|
||||
<div class="ui modal">
|
||||
<i class="close icon"></i>
|
||||
<div class="header">执行任务 {{jobName}}</div>
|
||||
<div class="content">
|
||||
<Dropdown title="选择执行的节点" :items="nodes" v-on:change="changeNode"></Dropdown>
|
||||
</div>
|
||||
<div class="actions">
|
||||
<div class="ui deny button">取消</div>
|
||||
<div class="ui positive right labeled icon button">立刻执行任务 <i class="checkmark icon"></i></div>
|
||||
</div>
|
||||
</div>
|
||||
</template>
|
||||
|
||||
<script>
|
||||
import Dropdown from './basic/Dropdown.vue';
|
||||
|
||||
export default {
|
||||
name: 'execute-job',
|
||||
data(){
|
||||
return {
|
||||
jobGroup: '',
|
||||
jobId: '',
|
||||
jobName: '',
|
||||
nodes: [],
|
||||
selectedNode: '',
|
||||
loading: false
|
||||
}
|
||||
},
|
||||
|
||||
mounted(){},
|
||||
|
||||
methods: {
|
||||
show(jobName, jobGroup, jobId){
|
||||
this.jobName = jobName;
|
||||
this.jobGroup = jobGroup;
|
||||
this.jobId = jobId;
|
||||
this.fetchJobNodes();
|
||||
$(this.$el).modal({
|
||||
closable: false,
|
||||
onApprove: this.submit
|
||||
}).modal('show');
|
||||
},
|
||||
|
||||
hide(){
|
||||
$(this.$el).modal('hide');
|
||||
},
|
||||
|
||||
fetchJobNodes(){
|
||||
var vm = this;
|
||||
this.loading = true;
|
||||
this.$rest.GET('job/'+this.jobGroup+'-'+this.jobId+'/nodes').
|
||||
onsucceed(200, (resp)=>{
|
||||
resp.unshift('全部节点');
|
||||
vm.nodes = resp;
|
||||
}).
|
||||
onfailed((msg)=>{
|
||||
vm.$bus.$emit('error', msg);
|
||||
vm.hide();
|
||||
}).
|
||||
onend(()=>{vm.loading = false}).
|
||||
do();
|
||||
},
|
||||
|
||||
submit(){
|
||||
var vm = this;
|
||||
this.loading = true;
|
||||
var node = this.selectedNode === '全部节点' ? '' : this.selectedNode;
|
||||
this.$rest.PUT('/job/'+this.jobGroup+'-'+this.jobId+'/execute?node='+node).
|
||||
onsucceed(204, ()=>{
|
||||
vm.$bus.$emit('success', '执行命令已发送,注意查看任务日志');
|
||||
vm.hide();
|
||||
}).
|
||||
onfailed((msg)=>{vm.$bus.$emit('error', msg)}).
|
||||
onend(()=>{vm.loading = false}).
|
||||
do();
|
||||
return false;
|
||||
},
|
||||
|
||||
changeNode(val){
|
||||
this.selectedNode = val;
|
||||
}
|
||||
},
|
||||
|
||||
components: {
|
||||
Dropdown
|
||||
}
|
||||
}
|
||||
</script>
|
|
@ -51,17 +51,19 @@
|
|||
<td :class="{error: job.latestStatus && !job.latestStatus.success}">
|
||||
<span v-if="!job.latestStatus">-</span>
|
||||
<router-link v-else :to="'/log/'+job.latestStatus.refLogId">{{job.latestStatus.success ? '成功' : '失败'}}</router-link> |
|
||||
<router-link :to="{path: 'log', query: {latest:true, ids: job.id}}">latest</router-link>
|
||||
<router-link :to="{path: 'log', query: {latest:true, ids: job.id}}">latest</router-link> |
|
||||
<a href="#" title="点此选择节点重新执行任务" v-on:click.prevent="showExecuteJobModal(job.name, job.group, job.id)"><i class="icon repeat"></i></a>
|
||||
</td>
|
||||
</tr>
|
||||
</tbody>
|
||||
</table>
|
||||
<ExecuteJob ref="executeJobModal">
|
||||
</div>
|
||||
</template>
|
||||
|
||||
<script>
|
||||
import Dropdown from './basic/Dropdown.vue';
|
||||
import Pager from './basic/Pager.vue';
|
||||
import ExecuteJob from './ExecuteJob.vue';
|
||||
import {formatTime, formatDuration} from '../libraries/functions';
|
||||
|
||||
export default {
|
||||
|
@ -141,12 +143,16 @@ export default {
|
|||
|
||||
formatLatest: function(latest){
|
||||
return formatTime(latest.beginTime, latest.endTime)+',于 '+latest.node+' 耗时 '+formatDuration(latest.beginTime, latest.endTime);
|
||||
},
|
||||
|
||||
showExecuteJobModal: function(jobName, jobGroup, jobId){
|
||||
this.$refs.executeJobModal.show(jobName, jobGroup, jobId);
|
||||
}
|
||||
},
|
||||
|
||||
components: {
|
||||
Dropdown,
|
||||
Pager
|
||||
ExecuteJob
|
||||
}
|
||||
}
|
||||
</script>
|
|
@ -60,17 +60,20 @@
|
|||
<td>{{log.user}}</td>
|
||||
<td :class="{warning: durationAttention(log.beginTime, log.endTime)}"><i class="attention icon" v-if="durationAttention(log.beginTime, log.endTime)"></i> {{formatTime(log)}}</td>
|
||||
<td :class="{error: !log.success}">
|
||||
<router-link :to="'/log/'+log.id">{{log.success ? '成功' : '失败'}}</router-link>
|
||||
<router-link :to="'/log/'+log.id">{{log.success ? '成功' : '失败'}}</router-link> |
|
||||
<a href="#" title="点此选择节点重新执行任务" v-on:click.prevent="showExecuteJobModal(log.name, log.jobGroup, log.jobId)"><i class="icon repeat"></i></a>
|
||||
</td>
|
||||
</tr>
|
||||
</tbody>
|
||||
</table>
|
||||
<ExecuteJob ref="executeJobModal">
|
||||
<Pager v-if="list && list.length>0" :total="total" :length="5"/>
|
||||
</div>
|
||||
</template>
|
||||
|
||||
<script>
|
||||
import Pager from './basic/Pager.vue';
|
||||
import ExecuteJob from './ExecuteJob.vue';
|
||||
import {formatTime, formatDuration} from '../libraries/functions';
|
||||
|
||||
export default {
|
||||
|
@ -130,7 +133,7 @@ export default {
|
|||
vm.list = resp.list;
|
||||
vm.total = resp.total;
|
||||
})
|
||||
.onfailed((resp)=>{console.log(resp)})
|
||||
.onfailed((msg)=>{vm.$bus.$emit('error', msg)})
|
||||
.onend(()=>{vm.loading=false})
|
||||
.do();
|
||||
},
|
||||
|
@ -166,10 +169,15 @@ export default {
|
|||
|
||||
formatTime: function(log){
|
||||
return formatTime(log.beginTime, log.endTime)+',于 '+log.node+' 耗时 '+formatDuration(log.beginTime, log.endTime);
|
||||
},
|
||||
|
||||
showExecuteJobModal: function(jobName, jobGroup, jobId){
|
||||
this.$refs.executeJobModal.show(jobName, jobGroup, jobId);
|
||||
}
|
||||
},
|
||||
components: {
|
||||
Pager
|
||||
Pager,
|
||||
ExecuteJob
|
||||
}
|
||||
}
|
||||
</script>
|
|
@ -19,7 +19,7 @@ export default {
|
|||
},
|
||||
|
||||
mounted: function() {
|
||||
if (this.title.length === 0) this.title = '选择分组';
|
||||
if (!this.title || this.title.length === 0) this.title = '选择分组';
|
||||
|
||||
var vm = this;
|
||||
$(this.$el).dropdown({
|
||||
|
|
Loading…
Reference in New Issue