diff --git a/conf/conf.go b/conf/conf.go index 0d46753..83ad407 100644 --- a/conf/conf.go +++ b/conf/conf.go @@ -34,6 +34,10 @@ func Init() error { } log.InitConf(&Config.Log) + Config.Cmd = cleanKeyPrefix(Config.Cmd) + Config.Proc = cleanKeyPrefix(Config.Proc) + Config.Group = cleanKeyPrefix(Config.Group) + Config.Root = path.Join(Config.Root, "..") initialized = true @@ -59,3 +63,15 @@ type webConfig struct { BindAddr string UIDir string } + +// 返回前后包含斜杆的 /a/b/ 的前缀 +func cleanKeyPrefix(p string) string { + p = path.Clean(p) + if p[0] != '/' { + p = "/" + p + } + + p += "/" + + return p +} diff --git a/models/client.go b/models/client.go index 9d3372e..cf5b3c7 100644 --- a/models/client.go +++ b/models/client.go @@ -1,6 +1,7 @@ package models import ( + "errors" "time" "golang.org/x/net/context" @@ -63,6 +64,31 @@ func (c *Client) Put(key, val string, opts ...client.OpOption) (*client.PutRespo return c.Client.Put(ctx, key, val, opts...) } +var ErrValueMayChanged = errors.New("The value has been changed by others on this time.") + +func (c *Client) PutWithRev(key, val string, rev int64) (*client.PutResponse, error) { + if rev == 0 { + return c.Put(key, val) + } + + ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout) + tresp, err := DefalutClient.Txn(ctx). + If(client.Compare(client.Version(key), "=", rev)). + Then(client.OpPut(key, val)). + Commit() + cancel() + if err != nil { + return nil, err + } + + if !tresp.Succeeded { + return nil, ErrValueMayChanged + } + + resp := client.PutResponse(*tresp.Responses[0].GetResponsePut()) + return &resp, nil +} + func (c *Client) Get(key string, opts ...client.OpOption) (*client.GetResponse, error) { ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout) defer cancel() diff --git a/models/group.go b/models/group.go index 6bfd5ce..7303f08 100644 --- a/models/group.go +++ b/models/group.go @@ -2,6 +2,8 @@ package models import ( "encoding/json" + "errors" + "strings" client "github.com/coreos/etcd/clientv3" @@ -12,12 +14,25 @@ import ( // 结点类型分组 // 注册到 /cronsun/group/ type Group struct { - ID string `json:"-"` + ID string `json:"id"` Name string `json:"name"` NodeIDs []string `json:"nids"` } +func GetGroupById(gid string) (g *Group, err error) { + if len(gid) == 0 { + return + } + resp, err := DefalutClient.Get(conf.Config.Group + gid) + if err != nil || resp.Count == 0 { + return + } + + err = json.Unmarshal(resp.Kvs[0].Value, &g) + return +} + func GetGroups() (groups map[string]*Group, err error) { resp, err := DefalutClient.Get(conf.Config.Group, client.WithPrefix()) if err != nil { @@ -40,3 +55,29 @@ func GetGroups() (groups map[string]*Group, err error) { } return } + +var ( + ErrEmptyNodeGroupName = errors.New("Name of node group is empty.") +) + +func (g *Group) Key() string { + return conf.Config.Group + g.ID +} + +func (g *Group) Put(rev int64) (*client.PutResponse, error) { + b, err := json.Marshal(g) + if err != nil { + return nil, err + } + + return DefalutClient.PutWithRev(g.Key(), string(b), rev) +} + +func (g *Group) Check() error { + g.Name = strings.TrimSpace(g.Name) + if len(g.Name) == 0 { + return ErrEmptyNodeGroupName + } + + return nil +} diff --git a/models/job.go b/models/job.go index 8bcb9c7..bce8da6 100644 --- a/models/job.go +++ b/models/job.go @@ -2,6 +2,8 @@ package models import ( "encoding/json" + "errors" + "strings" client "github.com/coreos/etcd/clientv3" @@ -9,10 +11,14 @@ import ( "sunteng/cronsun/conf" ) +const ( + DefaultJobGroup = "Default" +) + // 需要执行的 cron cmd 命令 -// 注册到 /cronsun/cmd/ +// 注册到 /cronsun/cmd/groupName/ type Job struct { - ID string `json:"-"` + ID string `json:"id"` Name string `json:"name"` Group string `json:"group"` Command string `json:"cmd"` @@ -90,5 +96,32 @@ func (j *Job) Schedule(id string) ([]string, bool) { } func (j *Job) Run() { - +} + +var ( + ErrEmptyJobName = errors.New("Name of job is empty.") + ErrEmptyJobCommand = errors.New("Command of job is empty.") +) + +func (j *Job) Key() string { + return conf.Config.Cmd + j.Group + "/" + j.ID +} + +func (j *Job) Check() error { + j.Name = strings.TrimSpace(j.Name) + if len(j.Name) == 0 { + return ErrEmptyJobName + } + + j.Group = strings.TrimSpace(j.Group) + if len(j.Group) == 0 { + j.Group = DefaultJobGroup + } + + // 不修改 Command 的内容,简单判断是否为空 + if len(strings.TrimSpace(j.Command)) == 0 { + return ErrEmptyJobCommand + } + + return nil } diff --git a/web/job.go b/web/job.go index a28353c..806799d 100644 --- a/web/job.go +++ b/web/job.go @@ -26,30 +26,30 @@ func (j *Job) Update(w http.ResponseWriter, r *http.Request) { } r.Body.Close() - var creation bool + if err = job.Check(); err != nil { + outJSONError(w, http.StatusBadRequest, err.Error()) + return + } + + var successCode = http.StatusOK if len(job.ID) == 0 { - creation = true + successCode = http.StatusCreated job.ID = models.NextID() } - jobb, err := json.Marshal(job) + b, err := json.Marshal(job) if err != nil { outJSONError(w, http.StatusInternalServerError, err.Error()) return } - _, err = models.DefalutClient.Put(path.Join(conf.Config.Cmd, job.Group, job.ID), string(jobb)) + _, err = models.DefalutClient.Put(path.Join(conf.Config.Cmd, job.Group, job.ID), string(b)) if err != nil { outJSONError(w, http.StatusInternalServerError, err.Error()) return } - statusCode := http.StatusOK - if creation { - statusCode = http.StatusCreated - } - outJSONWithCode(w, statusCode, nil) - + outJSONWithCode(w, successCode, nil) } var cmdKeyDeepLen = len(strings.Split(conf.Config.Cmd, "/")) diff --git a/web/node.go b/web/node.go index ddb668d..01dea84 100644 --- a/web/node.go +++ b/web/node.go @@ -2,13 +2,10 @@ package web import ( "encoding/json" - "fmt" "net/http" - "path" - "sort" "strings" - "github.com/coreos/etcd/clientv3" + v3 "github.com/coreos/etcd/clientv3" "github.com/gorilla/mux" "sunteng/commons/log" @@ -20,115 +17,76 @@ type Node struct{} var ngKeyDeepLen = len(conf.Config.Group) -func (n *Node) GetGroups(w http.ResponseWriter, r *http.Request) { - resp, err := models.DefalutClient.Get(conf.Config.Group, clientv3.WithPrefix(), clientv3.WithKeysOnly()) - if err != nil { - outJSONError(w, http.StatusInternalServerError, err.Error()) - return - } - - var groupMap = make(map[string]bool, 8) - for i := range resp.Kvs { - ss := strings.Split(string(resp.Kvs[i].Key), "/") - groupMap[ss[ngKeyDeepLen]] = true - } - - var groupList = make([]string, 0, len(groupMap)) - for k := range groupMap { - groupList = append(groupList, k) - } - - sort.Strings(groupList) - outJSON(w, groupList) -} - -func (n *Node) GetGroupByGroupName(w http.ResponseWriter, r *http.Request) { - vars := mux.Vars(r) - resp, err := models.DefalutClient.Get(path.Join(conf.Config.Group, vars["name"]), clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)) - if err != nil { - outJSONError(w, http.StatusInternalServerError, err.Error()) - return - } - - var nodeList = make([]*models.Node, 0, resp.Count) - for i := range resp.Kvs { - node := &models.Node{} - err = json.Unmarshal(resp.Kvs[i].Value, &node) - if err != nil { - outJSONError(w, http.StatusInternalServerError, err.Error()) - return - } - nodeList = append(nodeList) - } - - outJSON(w, nodeList) -} - -func (n *Node) JoinGroup(w http.ResponseWriter, r *http.Request) { - ng := []struct { - Nodes []string - Group string - }{} - +func (n *Node) UpdateGroup(w http.ResponseWriter, r *http.Request) { + g := models.Group{} de := json.NewDecoder(r.Body) - err := de.Decode(&ng) - if err != nil { + var err error + if err = de.Decode(&g); err != nil { + outJSONError(w, http.StatusBadRequest, err.Error()) + return + } + defer r.Body.Close() + + var successCode = http.StatusOK + g.ID = strings.TrimSpace(g.ID) + if len(g.ID) == 0 { + successCode = http.StatusCreated + g.ID = models.NextID() + } else { + + } + + if err = g.Check(); err != nil { outJSONError(w, http.StatusBadRequest, err.Error()) return } - gresp, err := models.DefalutClient.Get(conf.Config.Proc, clientv3.WithPrefix(), clientv3.WithKeysOnly()) + // @TODO rev + var rev int64 = 0 + if _, err = g.Put(rev); err != nil { + outJSONError(w, http.StatusBadRequest, err.Error()) + return + } + + outJSONWithCode(w, successCode, nil) +} + +func (n *Node) GetGroups(w http.ResponseWriter, r *http.Request) { + resp, err := models.DefalutClient.Get(conf.Config.Group, v3.WithPrefix(), v3.WithSort(v3.SortByKey, v3.SortAscend)) if err != nil { - log.Errorf("get nodes list failed: %s", err.Error()) outJSONError(w, http.StatusInternalServerError, err.Error()) return } - var nodes map[string]bool - for i := range gresp.Kvs { - ip := strings.TrimLeft(string(gresp.Kvs[i].Key), conf.Config.Proc) - nodes[ip] = true + + var list = make([]*models.Group, 0, resp.Count) + for i := range resp.Kvs { + g := models.Group{} + err = json.Unmarshal(resp.Kvs[i].Value, &g) + if err != nil { + log.Errorf("node.GetGroups(key: %s) error: %s", string(resp.Kvs[i].Key), err.Error()) + outJSONError(w, http.StatusInternalServerError, err.Error()) + return + } + list = append(list, &g) } - var errMsg string - var status int -NGLOOP: - for i := range ng { - g := strings.TrimSpace(ng[i].Group) - if len(g) == 0 { - errMsg = "group name is emtpy." - status = http.StatusBadRequest - break - } + outJSON(w, list) +} - for _, n := range ng[i].Nodes { - n = strings.TrimSpace(n) - if len(n) == 0 { - errMsg = fmt.Sprintf("[%s] node ip is emtpy.", g) - status = http.StatusBadRequest - break NGLOOP - } - - if _, ok := nodes[n]; !ok { - errMsg = fmt.Sprintf("node[%s] not found.", n) - status = http.StatusBadRequest - break NGLOOP - } - - _, err = models.DefalutClient.Put(path.Join(conf.Config.Group, g, n), "") - if err != nil { - errMsg = "join failed: " + err.Error() - status = http.StatusInternalServerError - break NGLOOP - } - } - } - - if len(errMsg) > 0 { - outJSONError(w, status, errMsg) +func (n *Node) GetGroupByGroupId(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + g, err := models.GetGroupById(vars["id"]) + if err != nil { + outJSONError(w, http.StatusInternalServerError, err.Error()) return } - outJSON(w, nil) + if g == nil { + outJSONWithCode(w, http.StatusNotFound, nil) + return + } + outJSON(w, g) } -func (n *Node) LeaveGroup(w http.ResponseWriter, r *http.Request) {} +func (n *Node) DeleteGroup(w http.ResponseWriter, r *http.Request) { +} diff --git a/web/routers.go b/web/routers.go index df94287..26771ae 100644 --- a/web/routers.go +++ b/web/routers.go @@ -25,16 +25,17 @@ func InitRouters() (s *http.Server, err error) { h = BaseHandler{Handle: jobHandler.Update} subrouter.Handle("/job", h).Methods("PUT") + // get node group list h = BaseHandler{Handle: nodeHandler.GetGroups} subrouter.Handle("/node/groups", h).Methods("GET") - - h = BaseHandler{Handle: nodeHandler.GetGroupByGroupName} - subrouter.Handle("/node/group/{name}", h).Methods("GET") - - h = BaseHandler{Handle: nodeHandler.JoinGroup} + // get a node group by group id + h = BaseHandler{Handle: nodeHandler.GetGroupByGroupId} + subrouter.Handle("/node/group/{id}", h).Methods("GET") + // create/update a node group + h = BaseHandler{Handle: nodeHandler.UpdateGroup} subrouter.Handle("/node/group", h).Methods("PUT") - - h = BaseHandler{Handle: nodeHandler.LeaveGroup} + // delete a node group + h = BaseHandler{Handle: nodeHandler.DeleteGroup} subrouter.Handle("/node/group", h).Methods("DELETE") uidir := conf.Config.Web.UIDir