From 28bc0af0c46900384aa1a7d688cb698b46f7e99a Mon Sep 17 00:00:00 2001 From: miraclesu Date: Mon, 9 Jan 2017 17:13:56 +0800 Subject: [PATCH] =?UTF-8?q?models:=20=E5=AE=9A=E4=B9=89=E6=95=B4=E4=BD=93?= =?UTF-8?q?=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- conf/conf.go | 6 +-- conf/files/base.json.sample | 2 +- models/client.go | 81 +++++++++++++++++++++++++++++++++++++ models/group.go | 10 +++++ models/id.go | 19 +++++++++ models/job.go | 17 ++++---- models/node.go | 7 ++-- node/node.go | 6 +-- web/handlers.go | 47 +++++---------------- 9 files changed, 141 insertions(+), 54 deletions(-) create mode 100644 models/client.go create mode 100644 models/group.go create mode 100644 models/id.go diff --git a/conf/conf.go b/conf/conf.go index c5d0990..9fc44dd 100644 --- a/conf/conf.go +++ b/conf/conf.go @@ -41,9 +41,9 @@ func Init() error { type Conf struct { Root string // 项目根目录 - Proc string // proc 路径 - Cmd string // cmd 路径 - NodeGroup string // 节点分组 + Proc string // proc 路径 + Cmd string // cmd 路径 + Group string // 节点分组 Ttl int64 // 节点超时时间,单位秒 ReqTimeout int // 请求超时时间,单位秒 diff --git a/conf/files/base.json.sample b/conf/files/base.json.sample index cba79ff..f8ab1a8 100644 --- a/conf/files/base.json.sample +++ b/conf/files/base.json.sample @@ -2,7 +2,7 @@ "Web": "@extend:web.json", "Proc": "/cronsun/proc/", "Cmd": "/cronsun/cmd/", - "NodeGroup": "/cronsun/nodeGroup/", + "Group": "/cronsun/group/", "Ttl": 10, "ReqTimeout": 2, "Log": "@extend:log.json", diff --git a/models/client.go b/models/client.go new file mode 100644 index 0000000..14094d6 --- /dev/null +++ b/models/client.go @@ -0,0 +1,81 @@ +package models + +import ( + "context" + "time" + + client "github.com/coreos/etcd/clientv3" + + "sunteng/cronsun/conf" +) + +var ( + DefalutClient *Client + + initialized bool +) + +func Init() (err error) { + if initialized { + return + } + + if err = initID(); err != nil { + return + } + + if err = conf.Init(); err != nil { + return + } + + if DefalutClient, err = NewClient(conf.Config); err != nil { + return + } + + initialized = true + return +} + +type Client struct { + *client.Client + + reqTimeout time.Duration +} + +func NewClient(cfg *conf.Conf) (c *Client, err error) { + cli, err := client.New(cfg.Etcd) + if err != nil { + return + } + + c = &Client{ + Client: cli, + + reqTimeout: time.Duration(cfg.ReqTimeout) * time.Second, + } + return +} + +func (c *Client) Put(key, val string, opts ...client.OpOption) (*client.PutResponse, error) { + ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout) + defer cancel() + return c.Client.Put(ctx, key, val, opts...) +} + +func (c *Client) Get(key string, opts ...client.OpOption) (*client.GetResponse, error) { + ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout) + defer cancel() + return c.Client.Get(ctx, key, opts...) +} + +func (c *Client) Del(key string, opts ...client.OpOption) (*client.DeleteResponse, error) { + ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout) + defer cancel() + return c.Client.Delete(ctx, key, opts...) +} + +func (c *Client) Watch(key string, opts ...client.OpOption) client.WatchChan { + ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout) + defer cancel() + return c.Client.Watch(ctx, key, opts...) +} diff --git a/models/group.go b/models/group.go new file mode 100644 index 0000000..0f02373 --- /dev/null +++ b/models/group.go @@ -0,0 +1,10 @@ +package models + +// 结点类型分组 +// 注册到 /cronsun/group/ +type Group struct { + ID string `json:"-"` + Name string `json:"name"` + + NodeIDs []string `json:"nids"` +} diff --git a/models/id.go b/models/id.go new file mode 100644 index 0000000..c39520a --- /dev/null +++ b/models/id.go @@ -0,0 +1,19 @@ +package models + +import ( + "encoding/hex" + + "github.com/rogpeppe/fastuuid" +) + +var generator *fastuuid.Generator + +func initID() (err error) { + generator, err = fastuuid.NewGenerator() + return +} + +func NextID() string { + id := generator.Next() + return hex.EncodeToString(id[:4]) +} diff --git a/models/job.go b/models/job.go index 3b074ff..456cb6a 100644 --- a/models/job.go +++ b/models/job.go @@ -1,16 +1,19 @@ package models +// 需要执行的 cron cmd 命令 +// 注册到 /cronsun/cmd/ type Job struct { - Id string `json:"id"` - Group string `json:"group"` + ID string `json:"-"` Name string `json:"name"` - Command string `json:"command"` + Group string `json:"group"` + Command string `json:"cmd"` Rule *JobRule `json:"rule"` + Status int `json:"status"` } type JobRule struct { - Timer []string `json:"timer"` - Nodes []string `json:"nodes"` - Groups []string `json:"groups"` - ExcludeNodes []string `json:"excludeNodes"` + Timer []string `json:"timer"` + NodeIDs []string `json:"nids"` + GroupIDs []string `json:"gids"` + ExcludeNodeIDs []string `json:"exclude_bids"` } diff --git a/models/node.go b/models/node.go index a2bf84d..ad3e588 100644 --- a/models/node.go +++ b/models/node.go @@ -1,7 +1,8 @@ package models +// 执行 cron cmd 的进程 +// 注册到 /cronsun/proc/ type Node struct { - Pid int `json:"pid"` - IP string `json:"ip"` - Port int `json:"port"` + ID string `json:"-"` // ip + PID string `json:"pid"` // 进程 pid } diff --git a/node/node.go b/node/node.go index 7b92831..a4b9a99 100644 --- a/node/node.go +++ b/node/node.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "strconv" + "syscall" "time" "golang.org/x/net/context" @@ -13,7 +14,6 @@ import ( "sunteng/commons/log" "sunteng/commons/util" "sunteng/cronsun/conf" - "syscall" ) // Node 执行 cron 命令服务的结构体 @@ -49,9 +49,9 @@ func NewNode(cfg *conf.Conf) (n *Node, err error) { ttl: cfg.Ttl, reqTimeout: time.Duration(cfg.ReqTimeout) * time.Second, - prefix: cfg.Proc + cfg.Sep, + prefix: cfg.Proc, - Key: cfg.Proc + cfg.Sep + ip.String(), + Key: cfg.Proc + ip.String(), PID: strconv.Itoa(os.Getpid()), done: make(chan struct{}), diff --git a/web/handlers.go b/web/handlers.go index dd4fedd..2b16555 100644 --- a/web/handlers.go +++ b/web/handlers.go @@ -1,47 +1,22 @@ package web import ( - "crypto/sha1" - "encoding/hex" "encoding/json" "fmt" "net/http" "path" "sort" - "strconv" "strings" - "time" "github.com/coreos/etcd/clientv3" "github.com/gorilla/mux" - "golang.org/x/net/context" "sunteng/commons/log" "sunteng/cronsun/conf" "sunteng/cronsun/models" ) -var etcdClient *clientv3.Client - -func EtcdInstance() (*clientv3.Client, error) { - if etcdClient != nil { - return etcdClient, nil - } - - if err := conf.Init(); err != nil { - return nil, err - } - - etcdClient, err := clientv3.New(conf.Config.Etcd) - return etcdClient, err -} - func InitRouters() (s *http.Server, err error) { - etcdClient, err = EtcdInstance() - if err != nil { - return nil, err - } - r := mux.NewRouter() subrouter := r.PathPrefix("/v1").Subrouter() @@ -75,7 +50,7 @@ func InitRouters() (s *http.Server, err error) { var cmdKeyDeepLen = len(strings.Split(conf.Config.Cmd, "/")) func getJobGroups(w http.ResponseWriter, r *http.Request) { - resp, err := etcdClient.Get(context.TODO(), conf.Config.Cmd, clientv3.WithPrefix(), clientv3.WithKeysOnly()) + resp, err := models.DefalutClient.Get(conf.Config.Cmd, clientv3.WithPrefix(), clientv3.WithKeysOnly()) if err != nil { outJSONError(w, http.StatusInternalServerError, err.Error()) return @@ -98,7 +73,7 @@ func getJobGroups(w http.ResponseWriter, r *http.Request) { func getJobsByGroupName(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) - resp, err := etcdClient.Get(context.TODO(), path.Join(conf.Config.Cmd, vars["name"]), clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)) + resp, err := models.DefalutClient.Get(path.Join(conf.Config.Cmd, vars["name"]), clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)) if err != nil { outJSONError(w, http.StatusInternalServerError, err.Error()) return @@ -129,11 +104,9 @@ func updateJob(w http.ResponseWriter, r *http.Request) { r.Body.Close() var creation bool - if len(job.Id) == 0 { + if len(job.ID) == 0 { creation = true - now := time.Now() - h := sha1.Sum([]byte(strconv.FormatInt(now.Unix(), 10) + strconv.FormatInt(now.UnixNano(), 10))) - job.Id = hex.EncodeToString(h[:]) + job.ID = models.NextID() } jobb, err := json.Marshal(job) @@ -142,7 +115,7 @@ func updateJob(w http.ResponseWriter, r *http.Request) { return } - _, err = etcdClient.Put(context.TODO(), path.Join(conf.Config.Cmd, job.Group, job.Id), string(jobb)) + _, err = models.DefalutClient.Put(path.Join(conf.Config.Cmd, job.Group, job.ID), string(jobb)) if err != nil { outJSONError(w, http.StatusInternalServerError, err.Error()) return @@ -155,10 +128,10 @@ func updateJob(w http.ResponseWriter, r *http.Request) { outJSONWithCode(w, statusCode, nil) } -var ngKeyDeepLen = len(conf.Config.NodeGroup) +var ngKeyDeepLen = len(conf.Config.Group) func getNodeGroups(w http.ResponseWriter, r *http.Request) { - resp, err := etcdClient.Get(context.TODO(), conf.Config.NodeGroup, clientv3.WithPrefix(), clientv3.WithKeysOnly()) + resp, err := models.DefalutClient.Get(conf.Config.Group, clientv3.WithPrefix(), clientv3.WithKeysOnly()) if err != nil { outJSONError(w, http.StatusInternalServerError, err.Error()) return @@ -181,7 +154,7 @@ func getNodeGroups(w http.ResponseWriter, r *http.Request) { func getNodeGroupByName(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) - resp, err := etcdClient.Get(context.TODO(), path.Join(conf.Config.NodeGroup, vars["name"]), clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)) + resp, err := models.DefalutClient.Get(path.Join(conf.Config.Group, vars["name"]), clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend)) if err != nil { outJSONError(w, http.StatusInternalServerError, err.Error()) return @@ -214,7 +187,7 @@ func nodeJoinGroup(w http.ResponseWriter, r *http.Request) { return } - gresp, err := etcdClient.Get(context.TODO(), conf.Config.Proc, clientv3.WithPrefix(), clientv3.WithKeysOnly()) + gresp, err := models.DefalutClient.Get(conf.Config.Proc, clientv3.WithPrefix(), clientv3.WithKeysOnly()) if err != nil { log.Errorf("get nodes list failed: %s", err.Error()) outJSONError(w, http.StatusInternalServerError, err.Error()) @@ -251,7 +224,7 @@ NGLOOP: break NGLOOP } - _, err = etcdClient.Put(context.TODO(), path.Join(conf.Config.NodeGroup, g, n), "") + _, err = models.DefalutClient.Put(path.Join(conf.Config.Group, g, n), "") if err != nil { errMsg = "join failed: " + err.Error() status = http.StatusInternalServerError