cronsun/web/job.go

389 lines
9.0 KiB
Go
Raw Normal View History

2017-01-11 08:12:37 +00:00
package web
import (
"encoding/json"
2017-12-14 08:38:10 +00:00
"fmt"
2017-01-11 08:12:37 +00:00
"net/http"
"sort"
"strings"
2017-05-12 07:38:50 +00:00
"time"
2017-01-11 08:12:37 +00:00
"github.com/coreos/etcd/clientv3"
"github.com/gorilla/mux"
2017-05-12 06:48:24 +00:00
"github.com/shunfei/cronsun"
2017-05-12 07:38:50 +00:00
"github.com/shunfei/cronsun/conf"
"github.com/shunfei/cronsun/log"
2017-01-11 08:12:37 +00:00
)
type Job struct{}
func (j *Job) GetJob(ctx *Context) {
vars := mux.Vars(ctx.R)
2017-05-12 06:48:24 +00:00
job, err := cronsun.GetJob(vars["group"], vars["id"])
2017-01-16 06:30:55 +00:00
var statusCode int
if err != nil {
2017-05-12 06:48:24 +00:00
if err == cronsun.ErrNotFound {
2017-01-16 06:30:55 +00:00
statusCode = http.StatusNotFound
} else {
statusCode = http.StatusInternalServerError
}
outJSONWithCode(ctx.W, statusCode, err.Error())
2017-01-16 06:30:55 +00:00
return
}
outJSON(ctx.W, job)
2017-01-16 06:30:55 +00:00
}
func (j *Job) DeleteJob(ctx *Context) {
vars := mux.Vars(ctx.R)
2017-05-12 06:48:24 +00:00
_, err := cronsun.DeleteJob(vars["group"], vars["id"])
2017-01-16 06:30:55 +00:00
if err != nil {
outJSONWithCode(ctx.W, http.StatusInternalServerError, err.Error())
2017-01-16 06:30:55 +00:00
return
}
outJSONWithCode(ctx.W, http.StatusNoContent, nil)
2017-01-16 06:30:55 +00:00
}
func (j *Job) ChangeJobStatus(ctx *Context) {
2017-05-12 06:48:24 +00:00
job := &cronsun.Job{}
decoder := json.NewDecoder(ctx.R.Body)
err := decoder.Decode(&job)
if err != nil {
outJSONWithCode(ctx.W, http.StatusBadRequest, err.Error())
return
}
ctx.R.Body.Close()
vars := mux.Vars(ctx.R)
2017-12-14 08:38:10 +00:00
job, err = j.updateJobStatus(vars["group"], vars["id"], job.Pause)
if err != nil {
outJSONWithCode(ctx.W, http.StatusInternalServerError, err.Error())
return
}
2017-12-14 08:38:10 +00:00
outJSON(ctx.W, job)
}
func (j *Job) updateJobStatus(group, id string, isPause bool) (*cronsun.Job, error) {
originJob, rev, err := cronsun.GetJobAndRev(group, id)
if err != nil {
return nil, err
}
if originJob.Pause == isPause {
return nil, err
}
originJob.Pause = isPause
b, err := json.Marshal(originJob)
if err != nil {
2017-12-14 08:38:10 +00:00
return nil, err
}
2017-05-12 06:48:24 +00:00
_, err = cronsun.DefalutClient.PutWithModRev(originJob.Key(), string(b), rev)
if err != nil {
2017-12-14 08:38:10 +00:00
return nil, err
}
return originJob, nil
}
func (j *Job) BatchChangeJobStatus(ctx *Context) {
var jobIds []string
decoder := json.NewDecoder(ctx.R.Body)
err := decoder.Decode(&jobIds)
if err != nil {
outJSONWithCode(ctx.W, http.StatusBadRequest, err.Error())
return
}
ctx.R.Body.Close()
vars := mux.Vars(ctx.R)
op := vars["op"]
var isPause bool
switch op {
case "pause":
isPause = true
case "start":
default:
outJSONWithCode(ctx.W, http.StatusBadRequest, "Unknow batch operation.")
return
}
2017-12-14 08:38:10 +00:00
var updated int
for i := range jobIds {
id := strings.Split(jobIds[i], "/") // [Group, ID]
if len(id) != 2 || id[0] == "" || id[1] == "" {
continue
}
_, err = j.updateJobStatus(id[0], id[1], isPause)
if err != nil {
continue
}
updated++
}
outJSON(ctx.W, fmt.Sprintf("%d of %d updated.", updated, len(jobIds)))
}
func (j *Job) UpdateJob(ctx *Context) {
2017-02-20 02:56:32 +00:00
var job = &struct {
2017-05-12 06:48:24 +00:00
*cronsun.Job
2017-02-20 02:56:32 +00:00
OldGroup string `json:"oldGroup"`
}{}
decoder := json.NewDecoder(ctx.R.Body)
2017-01-11 08:12:37 +00:00
err := decoder.Decode(&job)
if err != nil {
outJSONWithCode(ctx.W, http.StatusBadRequest, err.Error())
2017-01-11 08:12:37 +00:00
return
}
ctx.R.Body.Close()
2017-01-11 08:12:37 +00:00
2017-01-12 08:35:30 +00:00
if err = job.Check(); err != nil {
outJSONWithCode(ctx.W, http.StatusBadRequest, err.Error())
2017-01-12 08:35:30 +00:00
return
}
2017-02-20 02:56:32 +00:00
var deleteOldKey string
2017-01-12 08:35:30 +00:00
var successCode = http.StatusOK
2017-01-11 08:12:37 +00:00
if len(job.ID) == 0 {
2017-01-12 08:35:30 +00:00
successCode = http.StatusCreated
2017-05-12 06:48:24 +00:00
job.ID = cronsun.NextID()
2017-02-20 02:56:32 +00:00
} else {
job.OldGroup = strings.TrimSpace(job.OldGroup)
if job.OldGroup != job.Group {
2017-05-12 06:48:24 +00:00
deleteOldKey = cronsun.JobKey(job.OldGroup, job.ID)
2017-02-20 02:56:32 +00:00
}
2017-01-11 08:12:37 +00:00
}
2017-01-12 08:35:30 +00:00
b, err := json.Marshal(job)
2017-01-11 08:12:37 +00:00
if err != nil {
outJSONWithCode(ctx.W, http.StatusInternalServerError, err.Error())
2017-01-11 08:12:37 +00:00
return
}
2017-02-20 02:56:32 +00:00
// remove old key
2017-06-13 08:00:42 +00:00
// it should be before the put method
2017-02-20 02:56:32 +00:00
if len(deleteOldKey) > 0 {
2017-05-12 06:48:24 +00:00
if _, err = cronsun.DefalutClient.Delete(deleteOldKey); err != nil {
2017-02-20 02:56:32 +00:00
log.Errorf("failed to remove old job key[%s], err: %s.", deleteOldKey, err.Error())
outJSONWithCode(ctx.W, http.StatusInternalServerError, err.Error())
2017-06-13 08:00:42 +00:00
return
2017-02-20 02:56:32 +00:00
}
}
2017-06-13 08:00:42 +00:00
_, err = cronsun.DefalutClient.Put(job.Key(), string(b))
if err != nil {
outJSONWithCode(ctx.W, http.StatusInternalServerError, err.Error())
2017-06-13 08:00:42 +00:00
return
}
outJSONWithCode(ctx.W, successCode, nil)
2017-01-11 08:12:37 +00:00
}
func (j *Job) GetGroups(ctx *Context) {
2017-05-12 06:48:24 +00:00
resp, err := cronsun.DefalutClient.Get(conf.Config.Cmd, clientv3.WithPrefix(), clientv3.WithKeysOnly())
2017-01-11 08:12:37 +00:00
if err != nil {
outJSONWithCode(ctx.W, http.StatusInternalServerError, err.Error())
2017-01-11 08:12:37 +00:00
return
}
2017-01-21 10:16:45 +00:00
var cmdKeyLen = len(conf.Config.Cmd)
2017-01-11 08:12:37 +00:00
var groupMap = make(map[string]bool, 8)
2017-01-21 10:16:45 +00:00
2017-01-11 08:12:37 +00:00
for i := range resp.Kvs {
2017-01-21 10:16:45 +00:00
ss := strings.Split(string(resp.Kvs[i].Key)[cmdKeyLen:], "/")
groupMap[ss[0]] = true
2017-01-11 08:12:37 +00:00
}
var groupList = make([]string, 0, len(groupMap))
for k := range groupMap {
groupList = append(groupList, k)
}
sort.Strings(groupList)
outJSON(ctx.W, groupList)
2017-01-11 08:12:37 +00:00
}
func (j *Job) GetList(ctx *Context) {
group := getStringVal("group", ctx.R)
node := getStringVal("node", ctx.R)
var prefix = conf.Config.Cmd
if len(group) != 0 {
prefix += group
}
type jobStatus struct {
2017-05-12 06:48:24 +00:00
*cronsun.Job
LatestStatus *cronsun.JobLatestLog `json:"latestStatus"`
}
2017-05-12 06:48:24 +00:00
resp, err := cronsun.DefalutClient.Get(prefix, clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))
2017-01-11 08:12:37 +00:00
if err != nil {
outJSONWithCode(ctx.W, http.StatusInternalServerError, err.Error())
2017-01-11 08:12:37 +00:00
return
}
2017-05-12 06:48:24 +00:00
var nodeGroupMap map[string]*cronsun.Group
2017-03-16 10:27:26 +00:00
if len(node) > 0 {
2017-05-12 06:48:24 +00:00
nodeGrouplist, err := cronsun.GetNodeGroups()
2017-03-16 10:27:26 +00:00
if err != nil {
outJSONWithCode(ctx.W, http.StatusInternalServerError, err.Error())
2017-03-16 10:27:26 +00:00
return
}
2017-05-12 06:48:24 +00:00
nodeGroupMap = map[string]*cronsun.Group{}
2017-03-16 10:27:26 +00:00
for i := range nodeGrouplist {
nodeGroupMap[nodeGrouplist[i].ID] = nodeGrouplist[i]
}
}
var jobIds []string
var jobList = make([]*jobStatus, 0, resp.Count)
2017-01-11 08:12:37 +00:00
for i := range resp.Kvs {
2017-05-12 06:48:24 +00:00
job := cronsun.Job{}
2017-01-11 08:12:37 +00:00
err = json.Unmarshal(resp.Kvs[i].Value, &job)
if err != nil {
outJSONWithCode(ctx.W, http.StatusInternalServerError, err.Error())
2017-01-11 08:12:37 +00:00
return
}
2017-03-16 10:27:26 +00:00
if len(node) > 0 && !job.IsRunOn(node, nodeGroupMap) {
continue
}
jobList = append(jobList, &jobStatus{Job: &job})
jobIds = append(jobIds, job.ID)
}
2017-05-12 06:48:24 +00:00
m, err := cronsun.GetJobLatestLogListByJobIds(jobIds)
if err != nil {
2017-05-12 07:38:50 +00:00
log.Errorf("GetJobLatestLogListByJobIds error: %s", err.Error())
} else {
for i := range jobList {
jobList[i].LatestStatus = m[jobList[i].ID]
}
2017-01-11 08:12:37 +00:00
}
outJSON(ctx.W, jobList)
2017-01-11 08:12:37 +00:00
}
2017-03-10 03:06:40 +00:00
func (j *Job) GetJobNodes(ctx *Context) {
vars := mux.Vars(ctx.R)
2017-05-12 06:48:24 +00:00
job, err := cronsun.GetJob(vars["group"], vars["id"])
2017-03-10 08:36:44 +00:00
var statusCode int
if err != nil {
2017-05-12 06:48:24 +00:00
if err == cronsun.ErrNotFound {
2017-03-10 08:36:44 +00:00
statusCode = http.StatusNotFound
} else {
statusCode = http.StatusInternalServerError
}
outJSONWithCode(ctx.W, statusCode, err.Error())
2017-03-10 08:36:44 +00:00
return
}
var nodes []string
var exNodes []string
2017-05-12 06:48:24 +00:00
groups, err := cronsun.GetGroups("")
2017-03-10 08:36:44 +00:00
if err != nil {
outJSONWithCode(ctx.W, http.StatusInternalServerError, err.Error())
2017-03-10 08:36:44 +00:00
return
}
for i := range job.Rules {
inNodes := append(nodes, job.Rules[i].NodeIDs...)
for _, gid := range job.Rules[i].GroupIDs {
if g, ok := groups[gid]; ok {
inNodes = append(inNodes, g.NodeIDs...)
}
}
exNodes = append(exNodes, job.Rules[i].ExcludeNodeIDs...)
inNodes = SubtractStringArray(inNodes, exNodes)
nodes = append(nodes, inNodes...)
}
outJSON(ctx.W, UniqueStringArray(nodes))
2017-03-10 08:36:44 +00:00
}
func (j *Job) JobExecute(ctx *Context) {
vars := mux.Vars(ctx.R)
2017-03-10 08:36:44 +00:00
group := strings.TrimSpace(vars["group"])
id := strings.TrimSpace(vars["id"])
if len(group) == 0 || len(id) == 0 {
outJSONWithCode(ctx.W, http.StatusBadRequest, "Invalid job id or group.")
2017-03-10 08:36:44 +00:00
return
}
node := getStringVal("node", ctx.R)
2017-05-12 06:48:24 +00:00
err := cronsun.PutOnce(group, id, node)
2017-03-10 08:36:44 +00:00
if err != nil {
outJSONWithCode(ctx.W, http.StatusInternalServerError, err.Error())
2017-03-10 08:36:44 +00:00
return
}
outJSONWithCode(ctx.W, http.StatusNoContent, nil)
2017-03-10 08:36:44 +00:00
}
func (j *Job) GetExecutingJob(ctx *Context) {
2017-03-10 03:06:40 +00:00
opt := &ProcFetchOptions{
Groups: getStringArrayFromQuery("groups", ",", ctx.R),
NodeIds: getStringArrayFromQuery("nodes", ",", ctx.R),
JobIds: getStringArrayFromQuery("jobs", ",", ctx.R),
2017-03-10 03:06:40 +00:00
}
2017-05-12 06:48:24 +00:00
gresp, err := cronsun.DefalutClient.Get(conf.Config.Proc, clientv3.WithPrefix())
2017-03-10 03:06:40 +00:00
if err != nil {
outJSONWithCode(ctx.W, http.StatusInternalServerError, err.Error())
2017-03-10 03:06:40 +00:00
return
}
2017-05-12 06:48:24 +00:00
var list = make([]*cronsun.Process, 0, 8)
2017-03-10 03:06:40 +00:00
for i := range gresp.Kvs {
2017-05-12 06:48:24 +00:00
proc, err := cronsun.GetProcFromKey(string(gresp.Kvs[i].Key))
2017-03-10 03:06:40 +00:00
if err != nil {
2017-05-12 07:38:50 +00:00
log.Errorf("Failed to unmarshal Proc from key: %s", err.Error())
2017-03-10 03:06:40 +00:00
continue
}
if !opt.Match(proc) {
continue
}
proc.Time, _ = time.Parse(time.RFC3339, string(gresp.Kvs[i].Value))
list = append(list, proc)
}
sort.Sort(ByProcTime(list))
outJSON(ctx.W, list)
2017-03-10 03:06:40 +00:00
}
type ProcFetchOptions struct {
Groups []string
NodeIds []string
JobIds []string
}
2017-05-12 06:48:24 +00:00
func (opt *ProcFetchOptions) Match(proc *cronsun.Process) bool {
2017-03-10 03:06:40 +00:00
if len(opt.Groups) > 0 && !InStringArray(proc.Group, opt.Groups) {
return false
}
if len(opt.JobIds) > 0 && !InStringArray(proc.JobID, opt.JobIds) {
return false
}
if len(opt.NodeIds) > 0 && !InStringArray(proc.NodeID, opt.NodeIds) {
return false
}
return true
}
2017-05-12 06:48:24 +00:00
type ByProcTime []*cronsun.Process
2017-03-10 03:06:40 +00:00
func (a ByProcTime) Len() int { return len(a) }
func (a ByProcTime) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByProcTime) Less(i, j int) bool { return a[i].Time.After(a[j].Time) }