修改接口

pull/1/head
Doflatango 2017-01-12 16:35:30 +08:00 committed by miraclesu
parent b84ea425ce
commit 385022fa61
7 changed files with 195 additions and 120 deletions

View File

@ -34,6 +34,10 @@ func Init() error {
} }
log.InitConf(&Config.Log) log.InitConf(&Config.Log)
Config.Cmd = cleanKeyPrefix(Config.Cmd)
Config.Proc = cleanKeyPrefix(Config.Proc)
Config.Group = cleanKeyPrefix(Config.Group)
Config.Root = path.Join(Config.Root, "..") Config.Root = path.Join(Config.Root, "..")
initialized = true initialized = true
@ -59,3 +63,15 @@ type webConfig struct {
BindAddr string BindAddr string
UIDir string UIDir string
} }
// 返回前后包含斜杆的 /a/b/ 的前缀
func cleanKeyPrefix(p string) string {
p = path.Clean(p)
if p[0] != '/' {
p = "/" + p
}
p += "/"
return p
}

View File

@ -1,6 +1,7 @@
package models package models
import ( import (
"errors"
"time" "time"
"golang.org/x/net/context" "golang.org/x/net/context"
@ -63,6 +64,31 @@ func (c *Client) Put(key, val string, opts ...client.OpOption) (*client.PutRespo
return c.Client.Put(ctx, key, val, opts...) return c.Client.Put(ctx, key, val, opts...)
} }
var ErrValueMayChanged = errors.New("The value has been changed by others on this time.")
func (c *Client) PutWithRev(key, val string, rev int64) (*client.PutResponse, error) {
if rev == 0 {
return c.Put(key, val)
}
ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout)
tresp, err := DefalutClient.Txn(ctx).
If(client.Compare(client.Version(key), "=", rev)).
Then(client.OpPut(key, val)).
Commit()
cancel()
if err != nil {
return nil, err
}
if !tresp.Succeeded {
return nil, ErrValueMayChanged
}
resp := client.PutResponse(*tresp.Responses[0].GetResponsePut())
return &resp, nil
}
func (c *Client) Get(key string, opts ...client.OpOption) (*client.GetResponse, error) { func (c *Client) Get(key string, opts ...client.OpOption) (*client.GetResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout) ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout)
defer cancel() defer cancel()

View File

@ -2,6 +2,8 @@ package models
import ( import (
"encoding/json" "encoding/json"
"errors"
"strings"
client "github.com/coreos/etcd/clientv3" client "github.com/coreos/etcd/clientv3"
@ -12,12 +14,25 @@ import (
// 结点类型分组 // 结点类型分组
// 注册到 /cronsun/group/<id> // 注册到 /cronsun/group/<id>
type Group struct { type Group struct {
ID string `json:"-"` ID string `json:"id"`
Name string `json:"name"` Name string `json:"name"`
NodeIDs []string `json:"nids"` NodeIDs []string `json:"nids"`
} }
func GetGroupById(gid string) (g *Group, err error) {
if len(gid) == 0 {
return
}
resp, err := DefalutClient.Get(conf.Config.Group + gid)
if err != nil || resp.Count == 0 {
return
}
err = json.Unmarshal(resp.Kvs[0].Value, &g)
return
}
func GetGroups() (groups map[string]*Group, err error) { func GetGroups() (groups map[string]*Group, err error) {
resp, err := DefalutClient.Get(conf.Config.Group, client.WithPrefix()) resp, err := DefalutClient.Get(conf.Config.Group, client.WithPrefix())
if err != nil { if err != nil {
@ -40,3 +55,29 @@ func GetGroups() (groups map[string]*Group, err error) {
} }
return return
} }
var (
ErrEmptyNodeGroupName = errors.New("Name of node group is empty.")
)
func (g *Group) Key() string {
return conf.Config.Group + g.ID
}
func (g *Group) Put(rev int64) (*client.PutResponse, error) {
b, err := json.Marshal(g)
if err != nil {
return nil, err
}
return DefalutClient.PutWithRev(g.Key(), string(b), rev)
}
func (g *Group) Check() error {
g.Name = strings.TrimSpace(g.Name)
if len(g.Name) == 0 {
return ErrEmptyNodeGroupName
}
return nil
}

View File

@ -2,6 +2,8 @@ package models
import ( import (
"encoding/json" "encoding/json"
"errors"
"strings"
client "github.com/coreos/etcd/clientv3" client "github.com/coreos/etcd/clientv3"
@ -9,10 +11,14 @@ import (
"sunteng/cronsun/conf" "sunteng/cronsun/conf"
) )
const (
DefaultJobGroup = "Default"
)
// 需要执行的 cron cmd 命令 // 需要执行的 cron cmd 命令
// 注册到 /cronsun/cmd/<id> // 注册到 /cronsun/cmd/groupName/<id>
type Job struct { type Job struct {
ID string `json:"-"` ID string `json:"id"`
Name string `json:"name"` Name string `json:"name"`
Group string `json:"group"` Group string `json:"group"`
Command string `json:"cmd"` Command string `json:"cmd"`
@ -90,5 +96,32 @@ func (j *Job) Schedule(id string) ([]string, bool) {
} }
func (j *Job) Run() { func (j *Job) Run() {
}
var (
ErrEmptyJobName = errors.New("Name of job is empty.")
ErrEmptyJobCommand = errors.New("Command of job is empty.")
)
func (j *Job) Key() string {
return conf.Config.Cmd + j.Group + "/" + j.ID
}
func (j *Job) Check() error {
j.Name = strings.TrimSpace(j.Name)
if len(j.Name) == 0 {
return ErrEmptyJobName
}
j.Group = strings.TrimSpace(j.Group)
if len(j.Group) == 0 {
j.Group = DefaultJobGroup
}
// 不修改 Command 的内容,简单判断是否为空
if len(strings.TrimSpace(j.Command)) == 0 {
return ErrEmptyJobCommand
}
return nil
} }

View File

@ -26,30 +26,30 @@ func (j *Job) Update(w http.ResponseWriter, r *http.Request) {
} }
r.Body.Close() r.Body.Close()
var creation bool if err = job.Check(); err != nil {
outJSONError(w, http.StatusBadRequest, err.Error())
return
}
var successCode = http.StatusOK
if len(job.ID) == 0 { if len(job.ID) == 0 {
creation = true successCode = http.StatusCreated
job.ID = models.NextID() job.ID = models.NextID()
} }
jobb, err := json.Marshal(job) b, err := json.Marshal(job)
if err != nil { if err != nil {
outJSONError(w, http.StatusInternalServerError, err.Error()) outJSONError(w, http.StatusInternalServerError, err.Error())
return return
} }
_, err = models.DefalutClient.Put(path.Join(conf.Config.Cmd, job.Group, job.ID), string(jobb)) _, err = models.DefalutClient.Put(path.Join(conf.Config.Cmd, job.Group, job.ID), string(b))
if err != nil { if err != nil {
outJSONError(w, http.StatusInternalServerError, err.Error()) outJSONError(w, http.StatusInternalServerError, err.Error())
return return
} }
statusCode := http.StatusOK outJSONWithCode(w, successCode, nil)
if creation {
statusCode = http.StatusCreated
}
outJSONWithCode(w, statusCode, nil)
} }
var cmdKeyDeepLen = len(strings.Split(conf.Config.Cmd, "/")) var cmdKeyDeepLen = len(strings.Split(conf.Config.Cmd, "/"))

View File

@ -2,13 +2,10 @@ package web
import ( import (
"encoding/json" "encoding/json"
"fmt"
"net/http" "net/http"
"path"
"sort"
"strings" "strings"
"github.com/coreos/etcd/clientv3" v3 "github.com/coreos/etcd/clientv3"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"sunteng/commons/log" "sunteng/commons/log"
@ -20,115 +17,76 @@ type Node struct{}
var ngKeyDeepLen = len(conf.Config.Group) var ngKeyDeepLen = len(conf.Config.Group)
func (n *Node) GetGroups(w http.ResponseWriter, r *http.Request) { func (n *Node) UpdateGroup(w http.ResponseWriter, r *http.Request) {
resp, err := models.DefalutClient.Get(conf.Config.Group, clientv3.WithPrefix(), clientv3.WithKeysOnly()) g := models.Group{}
if err != nil {
outJSONError(w, http.StatusInternalServerError, err.Error())
return
}
var groupMap = make(map[string]bool, 8)
for i := range resp.Kvs {
ss := strings.Split(string(resp.Kvs[i].Key), "/")
groupMap[ss[ngKeyDeepLen]] = true
}
var groupList = make([]string, 0, len(groupMap))
for k := range groupMap {
groupList = append(groupList, k)
}
sort.Strings(groupList)
outJSON(w, groupList)
}
func (n *Node) GetGroupByGroupName(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
resp, err := models.DefalutClient.Get(path.Join(conf.Config.Group, vars["name"]), clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))
if err != nil {
outJSONError(w, http.StatusInternalServerError, err.Error())
return
}
var nodeList = make([]*models.Node, 0, resp.Count)
for i := range resp.Kvs {
node := &models.Node{}
err = json.Unmarshal(resp.Kvs[i].Value, &node)
if err != nil {
outJSONError(w, http.StatusInternalServerError, err.Error())
return
}
nodeList = append(nodeList)
}
outJSON(w, nodeList)
}
func (n *Node) JoinGroup(w http.ResponseWriter, r *http.Request) {
ng := []struct {
Nodes []string
Group string
}{}
de := json.NewDecoder(r.Body) de := json.NewDecoder(r.Body)
err := de.Decode(&ng) var err error
if err != nil { if err = de.Decode(&g); err != nil {
outJSONError(w, http.StatusBadRequest, err.Error())
return
}
defer r.Body.Close()
var successCode = http.StatusOK
g.ID = strings.TrimSpace(g.ID)
if len(g.ID) == 0 {
successCode = http.StatusCreated
g.ID = models.NextID()
} else {
}
if err = g.Check(); err != nil {
outJSONError(w, http.StatusBadRequest, err.Error()) outJSONError(w, http.StatusBadRequest, err.Error())
return return
} }
gresp, err := models.DefalutClient.Get(conf.Config.Proc, clientv3.WithPrefix(), clientv3.WithKeysOnly()) // @TODO rev
var rev int64 = 0
if _, err = g.Put(rev); err != nil {
outJSONError(w, http.StatusBadRequest, err.Error())
return
}
outJSONWithCode(w, successCode, nil)
}
func (n *Node) GetGroups(w http.ResponseWriter, r *http.Request) {
resp, err := models.DefalutClient.Get(conf.Config.Group, v3.WithPrefix(), v3.WithSort(v3.SortByKey, v3.SortAscend))
if err != nil { if err != nil {
log.Errorf("get nodes list failed: %s", err.Error())
outJSONError(w, http.StatusInternalServerError, err.Error()) outJSONError(w, http.StatusInternalServerError, err.Error())
return return
} }
var nodes map[string]bool
for i := range gresp.Kvs { var list = make([]*models.Group, 0, resp.Count)
ip := strings.TrimLeft(string(gresp.Kvs[i].Key), conf.Config.Proc) for i := range resp.Kvs {
nodes[ip] = true g := models.Group{}
err = json.Unmarshal(resp.Kvs[i].Value, &g)
if err != nil {
log.Errorf("node.GetGroups(key: %s) error: %s", string(resp.Kvs[i].Key), err.Error())
outJSONError(w, http.StatusInternalServerError, err.Error())
return
}
list = append(list, &g)
} }
var errMsg string outJSON(w, list)
var status int }
NGLOOP:
for i := range ng {
g := strings.TrimSpace(ng[i].Group)
if len(g) == 0 {
errMsg = "group name is emtpy."
status = http.StatusBadRequest
break
}
for _, n := range ng[i].Nodes { func (n *Node) GetGroupByGroupId(w http.ResponseWriter, r *http.Request) {
n = strings.TrimSpace(n) vars := mux.Vars(r)
if len(n) == 0 { g, err := models.GetGroupById(vars["id"])
errMsg = fmt.Sprintf("[%s] node ip is emtpy.", g) if err != nil {
status = http.StatusBadRequest outJSONError(w, http.StatusInternalServerError, err.Error())
break NGLOOP
}
if _, ok := nodes[n]; !ok {
errMsg = fmt.Sprintf("node[%s] not found.", n)
status = http.StatusBadRequest
break NGLOOP
}
_, err = models.DefalutClient.Put(path.Join(conf.Config.Group, g, n), "")
if err != nil {
errMsg = "join failed: " + err.Error()
status = http.StatusInternalServerError
break NGLOOP
}
}
}
if len(errMsg) > 0 {
outJSONError(w, status, errMsg)
return return
} }
outJSON(w, nil) if g == nil {
outJSONWithCode(w, http.StatusNotFound, nil)
return
}
outJSON(w, g)
} }
func (n *Node) LeaveGroup(w http.ResponseWriter, r *http.Request) {} func (n *Node) DeleteGroup(w http.ResponseWriter, r *http.Request) {
}

View File

@ -25,16 +25,17 @@ func InitRouters() (s *http.Server, err error) {
h = BaseHandler{Handle: jobHandler.Update} h = BaseHandler{Handle: jobHandler.Update}
subrouter.Handle("/job", h).Methods("PUT") subrouter.Handle("/job", h).Methods("PUT")
// get node group list
h = BaseHandler{Handle: nodeHandler.GetGroups} h = BaseHandler{Handle: nodeHandler.GetGroups}
subrouter.Handle("/node/groups", h).Methods("GET") subrouter.Handle("/node/groups", h).Methods("GET")
// get a node group by group id
h = BaseHandler{Handle: nodeHandler.GetGroupByGroupName} h = BaseHandler{Handle: nodeHandler.GetGroupByGroupId}
subrouter.Handle("/node/group/{name}", h).Methods("GET") subrouter.Handle("/node/group/{id}", h).Methods("GET")
// create/update a node group
h = BaseHandler{Handle: nodeHandler.JoinGroup} h = BaseHandler{Handle: nodeHandler.UpdateGroup}
subrouter.Handle("/node/group", h).Methods("PUT") subrouter.Handle("/node/group", h).Methods("PUT")
// delete a node group
h = BaseHandler{Handle: nodeHandler.LeaveGroup} h = BaseHandler{Handle: nodeHandler.DeleteGroup}
subrouter.Handle("/node/group", h).Methods("DELETE") subrouter.Handle("/node/group", h).Methods("DELETE")
uidir := conf.Config.Web.UIDir uidir := conf.Config.Web.UIDir