models: 定义整体结构

pull/1/head
miraclesu 8 years ago
parent 413254b9d2
commit 28bc0af0c4

@ -41,9 +41,9 @@ func Init() error {
type Conf struct {
Root string // 项目根目录
Proc string // proc 路径
Cmd string // cmd 路径
NodeGroup string // 节点分组
Proc string // proc 路径
Cmd string // cmd 路径
Group string // 节点分组
Ttl int64 // 节点超时时间,单位秒
ReqTimeout int // 请求超时时间,单位秒

@ -2,7 +2,7 @@
"Web": "@extend:web.json",
"Proc": "/cronsun/proc/",
"Cmd": "/cronsun/cmd/",
"NodeGroup": "/cronsun/nodeGroup/",
"Group": "/cronsun/group/",
"Ttl": 10,
"ReqTimeout": 2,
"Log": "@extend:log.json",

@ -0,0 +1,81 @@
package models
import (
"context"
"time"
client "github.com/coreos/etcd/clientv3"
"sunteng/cronsun/conf"
)
var (
DefalutClient *Client
initialized bool
)
func Init() (err error) {
if initialized {
return
}
if err = initID(); err != nil {
return
}
if err = conf.Init(); err != nil {
return
}
if DefalutClient, err = NewClient(conf.Config); err != nil {
return
}
initialized = true
return
}
type Client struct {
*client.Client
reqTimeout time.Duration
}
func NewClient(cfg *conf.Conf) (c *Client, err error) {
cli, err := client.New(cfg.Etcd)
if err != nil {
return
}
c = &Client{
Client: cli,
reqTimeout: time.Duration(cfg.ReqTimeout) * time.Second,
}
return
}
func (c *Client) Put(key, val string, opts ...client.OpOption) (*client.PutResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout)
defer cancel()
return c.Client.Put(ctx, key, val, opts...)
}
func (c *Client) Get(key string, opts ...client.OpOption) (*client.GetResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout)
defer cancel()
return c.Client.Get(ctx, key, opts...)
}
func (c *Client) Del(key string, opts ...client.OpOption) (*client.DeleteResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout)
defer cancel()
return c.Client.Delete(ctx, key, opts...)
}
func (c *Client) Watch(key string, opts ...client.OpOption) client.WatchChan {
ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout)
defer cancel()
return c.Client.Watch(ctx, key, opts...)
}

@ -0,0 +1,10 @@
package models
// 结点类型分组
// 注册到 /cronsun/group/<id>
type Group struct {
ID string `json:"-"`
Name string `json:"name"`
NodeIDs []string `json:"nids"`
}

@ -0,0 +1,19 @@
package models
import (
"encoding/hex"
"github.com/rogpeppe/fastuuid"
)
var generator *fastuuid.Generator
func initID() (err error) {
generator, err = fastuuid.NewGenerator()
return
}
func NextID() string {
id := generator.Next()
return hex.EncodeToString(id[:4])
}

@ -1,16 +1,19 @@
package models
// 需要执行的 cron cmd 命令
// 注册到 /cronsun/cmd/<id>
type Job struct {
Id string `json:"id"`
Group string `json:"group"`
ID string `json:"-"`
Name string `json:"name"`
Command string `json:"command"`
Group string `json:"group"`
Command string `json:"cmd"`
Rule *JobRule `json:"rule"`
Status int `json:"status"`
}
type JobRule struct {
Timer []string `json:"timer"`
Nodes []string `json:"nodes"`
Groups []string `json:"groups"`
ExcludeNodes []string `json:"excludeNodes"`
Timer []string `json:"timer"`
NodeIDs []string `json:"nids"`
GroupIDs []string `json:"gids"`
ExcludeNodeIDs []string `json:"exclude_bids"`
}

@ -1,7 +1,8 @@
package models
// 执行 cron cmd 的进程
// 注册到 /cronsun/proc/<id>
type Node struct {
Pid int `json:"pid"`
IP string `json:"ip"`
Port int `json:"port"`
ID string `json:"-"` // ip
PID string `json:"pid"` // 进程 pid
}

@ -4,6 +4,7 @@ import (
"fmt"
"os"
"strconv"
"syscall"
"time"
"golang.org/x/net/context"
@ -13,7 +14,6 @@ import (
"sunteng/commons/log"
"sunteng/commons/util"
"sunteng/cronsun/conf"
"syscall"
)
// Node 执行 cron 命令服务的结构体
@ -49,9 +49,9 @@ func NewNode(cfg *conf.Conf) (n *Node, err error) {
ttl: cfg.Ttl,
reqTimeout: time.Duration(cfg.ReqTimeout) * time.Second,
prefix: cfg.Proc + cfg.Sep,
prefix: cfg.Proc,
Key: cfg.Proc + cfg.Sep + ip.String(),
Key: cfg.Proc + ip.String(),
PID: strconv.Itoa(os.Getpid()),
done: make(chan struct{}),

@ -1,47 +1,22 @@
package web
import (
"crypto/sha1"
"encoding/hex"
"encoding/json"
"fmt"
"net/http"
"path"
"sort"
"strconv"
"strings"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/gorilla/mux"
"golang.org/x/net/context"
"sunteng/commons/log"
"sunteng/cronsun/conf"
"sunteng/cronsun/models"
)
var etcdClient *clientv3.Client
func EtcdInstance() (*clientv3.Client, error) {
if etcdClient != nil {
return etcdClient, nil
}
if err := conf.Init(); err != nil {
return nil, err
}
etcdClient, err := clientv3.New(conf.Config.Etcd)
return etcdClient, err
}
func InitRouters() (s *http.Server, err error) {
etcdClient, err = EtcdInstance()
if err != nil {
return nil, err
}
r := mux.NewRouter()
subrouter := r.PathPrefix("/v1").Subrouter()
@ -75,7 +50,7 @@ func InitRouters() (s *http.Server, err error) {
var cmdKeyDeepLen = len(strings.Split(conf.Config.Cmd, "/"))
func getJobGroups(w http.ResponseWriter, r *http.Request) {
resp, err := etcdClient.Get(context.TODO(), conf.Config.Cmd, clientv3.WithPrefix(), clientv3.WithKeysOnly())
resp, err := models.DefalutClient.Get(conf.Config.Cmd, clientv3.WithPrefix(), clientv3.WithKeysOnly())
if err != nil {
outJSONError(w, http.StatusInternalServerError, err.Error())
return
@ -98,7 +73,7 @@ func getJobGroups(w http.ResponseWriter, r *http.Request) {
func getJobsByGroupName(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
resp, err := etcdClient.Get(context.TODO(), path.Join(conf.Config.Cmd, vars["name"]), clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))
resp, err := models.DefalutClient.Get(path.Join(conf.Config.Cmd, vars["name"]), clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))
if err != nil {
outJSONError(w, http.StatusInternalServerError, err.Error())
return
@ -129,11 +104,9 @@ func updateJob(w http.ResponseWriter, r *http.Request) {
r.Body.Close()
var creation bool
if len(job.Id) == 0 {
if len(job.ID) == 0 {
creation = true
now := time.Now()
h := sha1.Sum([]byte(strconv.FormatInt(now.Unix(), 10) + strconv.FormatInt(now.UnixNano(), 10)))
job.Id = hex.EncodeToString(h[:])
job.ID = models.NextID()
}
jobb, err := json.Marshal(job)
@ -142,7 +115,7 @@ func updateJob(w http.ResponseWriter, r *http.Request) {
return
}
_, err = etcdClient.Put(context.TODO(), path.Join(conf.Config.Cmd, job.Group, job.Id), string(jobb))
_, err = models.DefalutClient.Put(path.Join(conf.Config.Cmd, job.Group, job.ID), string(jobb))
if err != nil {
outJSONError(w, http.StatusInternalServerError, err.Error())
return
@ -155,10 +128,10 @@ func updateJob(w http.ResponseWriter, r *http.Request) {
outJSONWithCode(w, statusCode, nil)
}
var ngKeyDeepLen = len(conf.Config.NodeGroup)
var ngKeyDeepLen = len(conf.Config.Group)
func getNodeGroups(w http.ResponseWriter, r *http.Request) {
resp, err := etcdClient.Get(context.TODO(), conf.Config.NodeGroup, clientv3.WithPrefix(), clientv3.WithKeysOnly())
resp, err := models.DefalutClient.Get(conf.Config.Group, clientv3.WithPrefix(), clientv3.WithKeysOnly())
if err != nil {
outJSONError(w, http.StatusInternalServerError, err.Error())
return
@ -181,7 +154,7 @@ func getNodeGroups(w http.ResponseWriter, r *http.Request) {
func getNodeGroupByName(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
resp, err := etcdClient.Get(context.TODO(), path.Join(conf.Config.NodeGroup, vars["name"]), clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))
resp, err := models.DefalutClient.Get(path.Join(conf.Config.Group, vars["name"]), clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))
if err != nil {
outJSONError(w, http.StatusInternalServerError, err.Error())
return
@ -214,7 +187,7 @@ func nodeJoinGroup(w http.ResponseWriter, r *http.Request) {
return
}
gresp, err := etcdClient.Get(context.TODO(), conf.Config.Proc, clientv3.WithPrefix(), clientv3.WithKeysOnly())
gresp, err := models.DefalutClient.Get(conf.Config.Proc, clientv3.WithPrefix(), clientv3.WithKeysOnly())
if err != nil {
log.Errorf("get nodes list failed: %s", err.Error())
outJSONError(w, http.StatusInternalServerError, err.Error())
@ -251,7 +224,7 @@ NGLOOP:
break NGLOOP
}
_, err = etcdClient.Put(context.TODO(), path.Join(conf.Config.NodeGroup, g, n), "")
_, err = models.DefalutClient.Put(path.Join(conf.Config.Group, g, n), "")
if err != nil {
errMsg = "join failed: " + err.Error()
status = http.StatusInternalServerError

Loading…
Cancel
Save