cronsun/web/node.go

166 lines
3.8 KiB
Go
Raw Normal View History

2017-01-09 02:32:14 +00:00
package web
2017-01-11 08:12:37 +00:00
import (
"encoding/json"
2017-05-12 07:38:50 +00:00
"fmt"
2017-01-11 08:12:37 +00:00
"net/http"
"strings"
2017-01-12 08:35:30 +00:00
v3 "github.com/coreos/etcd/clientv3"
2017-01-11 08:12:37 +00:00
"github.com/gorilla/mux"
2017-05-12 06:48:24 +00:00
"github.com/shunfei/cronsun"
2017-05-12 07:38:50 +00:00
"github.com/shunfei/cronsun/conf"
"github.com/shunfei/cronsun/log"
2017-01-11 08:12:37 +00:00
)
type Node struct{}
var ngKeyDeepLen = len(conf.Config.Group)
2017-01-12 08:35:30 +00:00
func (n *Node) UpdateGroup(w http.ResponseWriter, r *http.Request) {
2017-05-12 06:48:24 +00:00
g := cronsun.Group{}
2017-01-12 08:35:30 +00:00
de := json.NewDecoder(r.Body)
var err error
if err = de.Decode(&g); err != nil {
outJSONWithCode(w, http.StatusBadRequest, err.Error())
2017-01-11 08:12:37 +00:00
return
}
2017-01-12 08:35:30 +00:00
defer r.Body.Close()
2017-01-11 08:12:37 +00:00
2017-01-12 08:35:30 +00:00
var successCode = http.StatusOK
g.ID = strings.TrimSpace(g.ID)
if len(g.ID) == 0 {
successCode = http.StatusCreated
2017-05-12 06:48:24 +00:00
g.ID = cronsun.NextID()
2017-01-12 08:35:30 +00:00
}
if err = g.Check(); err != nil {
outJSONWithCode(w, http.StatusBadRequest, err.Error())
2017-01-12 08:35:30 +00:00
return
2017-01-11 08:12:37 +00:00
}
// @TODO modRev
var modRev int64 = 0
if _, err = g.Put(modRev); err != nil {
outJSONWithCode(w, http.StatusBadRequest, err.Error())
2017-01-12 08:35:30 +00:00
return
2017-01-11 08:12:37 +00:00
}
2017-01-12 08:35:30 +00:00
outJSONWithCode(w, successCode, nil)
2017-01-11 08:12:37 +00:00
}
2017-01-12 08:35:30 +00:00
func (n *Node) GetGroups(w http.ResponseWriter, r *http.Request) {
2017-05-12 06:48:24 +00:00
list, err := cronsun.GetNodeGroups()
2017-01-11 08:12:37 +00:00
if err != nil {
outJSONWithCode(w, http.StatusInternalServerError, err.Error())
2017-01-11 08:12:37 +00:00
return
}
2017-01-12 08:35:30 +00:00
outJSON(w, list)
2017-01-11 08:12:37 +00:00
}
2017-01-12 08:35:30 +00:00
func (n *Node) GetGroupByGroupId(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
2017-05-12 06:48:24 +00:00
g, err := cronsun.GetGroupById(vars["id"])
2017-01-11 08:12:37 +00:00
if err != nil {
outJSONWithCode(w, http.StatusInternalServerError, err.Error())
2017-01-11 08:12:37 +00:00
return
}
2017-01-12 08:35:30 +00:00
if g == nil {
outJSONWithCode(w, http.StatusNotFound, nil)
2017-01-11 08:12:37 +00:00
return
}
2017-01-12 08:35:30 +00:00
outJSON(w, g)
2017-01-11 08:12:37 +00:00
}
2017-01-12 08:35:30 +00:00
func (n *Node) DeleteGroup(w http.ResponseWriter, r *http.Request) {
2017-01-16 06:30:55 +00:00
vars := mux.Vars(r)
2017-02-20 06:53:36 +00:00
groupId := strings.TrimSpace(vars["id"])
if len(groupId) == 0 {
outJSONWithCode(w, http.StatusBadRequest, "empty node ground id.")
2017-02-20 06:53:36 +00:00
return
}
2017-05-12 06:48:24 +00:00
_, err := cronsun.DeleteGroupById(groupId)
2017-01-16 06:30:55 +00:00
if err != nil {
outJSONWithCode(w, http.StatusInternalServerError, err.Error())
2017-01-16 06:30:55 +00:00
return
}
2017-05-12 06:48:24 +00:00
gresp, err := cronsun.DefalutClient.Get(conf.Config.Cmd, v3.WithPrefix())
2017-02-20 06:53:36 +00:00
if err != nil {
errstr := fmt.Sprintf("failed to fetch jobs from etcd after deleted node group[%s]: %s", groupId, err.Error())
2017-05-12 07:38:50 +00:00
log.Errorf(errstr)
outJSONWithCode(w, http.StatusInternalServerError, errstr)
2017-02-20 06:53:36 +00:00
return
}
// update rule's node group
for i := range gresp.Kvs {
2017-05-12 06:48:24 +00:00
job := cronsun.Job{}
2017-02-20 06:53:36 +00:00
err = json.Unmarshal(gresp.Kvs[i].Value, &job)
key := string(gresp.Kvs[i].Key)
if err != nil {
log.Errorf("failed to unmarshal job[%s]: %s", key, err.Error())
continue
}
update := false
for j := range job.Rules {
var ngs []string
for _, gid := range job.Rules[j].GroupIDs {
if gid != groupId {
ngs = append(ngs, gid)
}
}
if len(ngs) != len(job.Rules[j].GroupIDs) {
job.Rules[j].GroupIDs = ngs
update = true
}
}
if update {
v, err := json.Marshal(&job)
if err != nil {
log.Errorf("failed to marshal job[%s]: %s", key, err.Error())
continue
}
2017-05-12 06:48:24 +00:00
_, err = cronsun.DefalutClient.PutWithModRev(key, string(v), gresp.Kvs[i].ModRevision)
2017-02-20 06:53:36 +00:00
if err != nil {
log.Errorf("failed to update job[%s]: %s", key, err.Error())
continue
}
}
}
2017-01-16 06:30:55 +00:00
outJSONWithCode(w, http.StatusNoContent, nil)
2017-01-12 08:35:30 +00:00
}
2017-02-14 07:15:00 +00:00
func (n *Node) GetNodes(w http.ResponseWriter, r *http.Request) {
2017-05-12 06:48:24 +00:00
nodes, err := cronsun.GetNodes()
if err != nil {
outJSONWithCode(w, http.StatusInternalServerError, err.Error())
return
}
2017-05-12 06:48:24 +00:00
gresp, err := cronsun.DefalutClient.Get(conf.Config.Node, v3.WithPrefix(), v3.WithKeysOnly())
if err == nil {
connecedMap := make(map[string]bool, gresp.Count)
for i := range gresp.Kvs {
k := string(gresp.Kvs[i].Key)
index := strings.LastIndexByte(k, '/')
connecedMap[k[index+1:]] = true
}
for i := range nodes {
nodes[i].Connected = connecedMap[nodes[i].ID]
}
} else {
log.Errorf("failed to fetch key[%s] from etcd: %s", conf.Config.Node, err.Error())
}
2017-02-14 07:15:00 +00:00
outJSONWithCode(w, http.StatusOK, nodes)
}