mirror of https://github.com/shunfei/cronsun
web
parent
3bc0c0558e
commit
1196bed1a2
|
@ -0,0 +1,38 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"net"
|
||||
|
||||
"github.com/cockroachdb/cmux"
|
||||
|
||||
"sunteng/commons/event"
|
||||
"sunteng/commons/log"
|
||||
"sunteng/cronsun"
|
||||
"sunteng/cronsun/conf"
|
||||
"sunteng/cronsun/web"
|
||||
)
|
||||
|
||||
func main() {
|
||||
l, err := net.Listen("tcp", conf.Config.Web.BindAddr)
|
||||
if err != nil {
|
||||
cronsun.Fatalln(err)
|
||||
}
|
||||
|
||||
// Create a cmux.
|
||||
m := cmux.New(l)
|
||||
httpL := m.Match(cmux.HTTP1Fast())
|
||||
httpServer, err := web.InitRouters()
|
||||
if err != nil {
|
||||
cronsun.Fatalln(err)
|
||||
}
|
||||
|
||||
go httpServer.Serve(httpL)
|
||||
|
||||
log.Noticef("cronsun web server started on %s, Ctrl+C or send kill sign to exit", conf.Config.Web.BindAddr)
|
||||
// 注册退出事件
|
||||
// event.On(event.EXIT, n.Stop)
|
||||
// 监听退出信号
|
||||
event.Wait()
|
||||
event.Emit(event.EXIT, nil)
|
||||
log.Notice("exit success")
|
||||
}
|
20
conf/conf.go
20
conf/conf.go
|
@ -12,10 +12,15 @@ import (
|
|||
)
|
||||
|
||||
var (
|
||||
Config = new(Conf)
|
||||
Config = new(Conf)
|
||||
initialized bool
|
||||
)
|
||||
|
||||
func Init() error {
|
||||
if initialized {
|
||||
return nil
|
||||
}
|
||||
|
||||
Config.Root = util.CurDir()
|
||||
|
||||
confFile := path.Join(Config.Root, "files", "base.json")
|
||||
|
@ -29,19 +34,26 @@ func Init() error {
|
|||
}
|
||||
log.InitConf(&Config.Log)
|
||||
|
||||
initialized = true
|
||||
return nil
|
||||
}
|
||||
|
||||
type Conf struct {
|
||||
Root string // 项目根目录
|
||||
|
||||
Proc string // proc 路径
|
||||
Cmd string // cmd 路径
|
||||
Sep string // etcd key 的连接符
|
||||
Proc string // proc 路径
|
||||
Cmd string // cmd 路径
|
||||
NodeGroup string // 节点分组
|
||||
|
||||
Ttl int64 // 节点超时时间,单位秒
|
||||
ReqTimeout int // 请求超时时间,单位秒
|
||||
|
||||
Log log.Config
|
||||
Etcd client.Config
|
||||
Web webConfig
|
||||
}
|
||||
|
||||
type webConfig struct {
|
||||
BindAddr string
|
||||
UIDir string
|
||||
}
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
{
|
||||
"Proc": "/cronsun/proc",
|
||||
"Cmd": "/cronsun/cmd",
|
||||
"Sep": "/",
|
||||
"Web": "@extend:web.json",
|
||||
"Proc": "/cronsun/proc/",
|
||||
"Cmd": "/cronsun/cmd/",
|
||||
"NodeGroup": "/cronsun/nodeGroup/",
|
||||
"Ttl": 10,
|
||||
"ReqTimeout": 2,
|
||||
"Log": "@extend:log.json",
|
||||
|
|
|
@ -0,0 +1,4 @@
|
|||
{
|
||||
BindAddr: ":7079",
|
||||
UIDir: ""
|
||||
}
|
|
@ -0,0 +1,22 @@
|
|||
package cronsun
|
||||
|
||||
import (
|
||||
"sunteng/cronsun/conf"
|
||||
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
)
|
||||
|
||||
var etcdClient *clientv3.Client
|
||||
|
||||
func EtcdInstance() (*clientv3.Client, error) {
|
||||
if etcdClient != nil {
|
||||
return etcdClient, nil
|
||||
}
|
||||
|
||||
if err := conf.Init(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
etcdClient, err := clientv3.New(conf.Config.Etcd)
|
||||
return etcdClient, err
|
||||
}
|
|
@ -0,0 +1,16 @@
|
|||
package cronsun
|
||||
|
||||
type Job struct {
|
||||
Id string `json:"id"`
|
||||
Group string `json:"group"`
|
||||
Name string `json:"name"`
|
||||
Command string `json:"command"`
|
||||
Rule *JobRule `json:"rule"`
|
||||
}
|
||||
|
||||
type JobRule struct {
|
||||
Timer []string `json:"timer"`
|
||||
Nodes []string `json:"nodes"`
|
||||
Groups []string `json:"groups"`
|
||||
ExcludeNodes []string `json:"excludeNodes"`
|
||||
}
|
|
@ -0,0 +1,7 @@
|
|||
package cronsun
|
||||
|
||||
type Node struct {
|
||||
Pid int `json:"pid"`
|
||||
IP string `json:"ip"`
|
||||
Port int `json:"port"`
|
||||
}
|
|
@ -0,0 +1,62 @@
|
|||
package web
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"runtime/debug"
|
||||
|
||||
"sunteng/commons/log"
|
||||
)
|
||||
|
||||
type BaseHandler struct {
|
||||
Handle func(w http.ResponseWriter, r *http.Request)
|
||||
}
|
||||
|
||||
func (b BaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
defer func() {
|
||||
// handle all the error
|
||||
err_ := recover()
|
||||
if err_ == nil {
|
||||
return
|
||||
}
|
||||
|
||||
var stack string
|
||||
var buf bytes.Buffer
|
||||
buf.Write(debug.Stack())
|
||||
stack = buf.String()
|
||||
|
||||
outJSONError(w, http.StatusInternalServerError, "Internal Server Error")
|
||||
|
||||
log.Errorf("%v\n\n%s\n", err_, stack)
|
||||
return
|
||||
}()
|
||||
|
||||
b.Handle(w, r)
|
||||
}
|
||||
|
||||
func outJSONWithCode(w http.ResponseWriter, httpCode int, data interface{}) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
s := ""
|
||||
b, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
s = `{"error":"json.Marshal error"}`
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
} else {
|
||||
s = string(b)
|
||||
w.WriteHeader(httpCode)
|
||||
}
|
||||
fmt.Fprint(w, s)
|
||||
}
|
||||
|
||||
func outJSON(w http.ResponseWriter, data interface{}) {
|
||||
outJSONWithCode(w, http.StatusOK, data)
|
||||
}
|
||||
|
||||
func outJSONError(w http.ResponseWriter, httpCode int, msg string) {
|
||||
r := map[string]string{
|
||||
"error": msg,
|
||||
}
|
||||
outJSONWithCode(w, httpCode, r)
|
||||
}
|
|
@ -0,0 +1,258 @@
|
|||
package web
|
||||
|
||||
import (
|
||||
"crypto/sha1"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"path"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
"github.com/gorilla/mux"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"fmt"
|
||||
"sunteng/commons/log"
|
||||
"sunteng/cronsun"
|
||||
"sunteng/cronsun/conf"
|
||||
)
|
||||
|
||||
var etcdClient *clientv3.Client
|
||||
|
||||
func InitRouters() (s *http.Server, err error) {
|
||||
etcdClient, err = cronsun.EtcdInstance()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
r := mux.NewRouter()
|
||||
subrouter := r.PathPrefix("/v1").Subrouter()
|
||||
|
||||
h := BaseHandler{Handle: getJobGroups}
|
||||
subrouter.Handle("/job/groups", h).Methods("GET")
|
||||
|
||||
h = BaseHandler{Handle: getJobsByGroupName}
|
||||
subrouter.Handle("/job/group/{name}", h).Methods("GET")
|
||||
|
||||
h = BaseHandler{Handle: updateJob}
|
||||
subrouter.Handle("/job", h).Methods("PUT")
|
||||
|
||||
h = BaseHandler{Handle: getNodeGroups}
|
||||
subrouter.Handle("/node/groups", h).Methods("GET")
|
||||
|
||||
h = BaseHandler{Handle: getNodeGroupByName}
|
||||
subrouter.Handle("/node/group/{name}", h).Methods("GET")
|
||||
|
||||
h = BaseHandler{Handle: nodeJoinGroup}
|
||||
subrouter.Handle("/node/group", h).Methods("PUT")
|
||||
|
||||
h = BaseHandler{Handle: nodeLeaveGroup}
|
||||
subrouter.Handle("/node/group", h).Methods("DELETE")
|
||||
|
||||
s = &http.Server{
|
||||
Handler: r,
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
var cmdKeyDeepLen = len(strings.Split(conf.Config.Cmd, "/"))
|
||||
|
||||
func getJobGroups(w http.ResponseWriter, r *http.Request) {
|
||||
resp, err := etcdClient.Get(context.TODO(), conf.Config.Cmd, clientv3.WithPrefix(), clientv3.WithKeysOnly())
|
||||
if err != nil {
|
||||
outJSONError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
var groupMap = make(map[string]bool, 8)
|
||||
for i := range resp.Kvs {
|
||||
ss := strings.Split(string(resp.Kvs[i].Key), "/")
|
||||
groupMap[ss[cmdKeyDeepLen]] = true
|
||||
}
|
||||
|
||||
var groupList = make([]string, 0, len(groupMap))
|
||||
for k := range groupMap {
|
||||
groupList = append(groupList, k)
|
||||
}
|
||||
|
||||
sort.Strings(groupList)
|
||||
outJSON(w, groupList)
|
||||
}
|
||||
|
||||
func getJobsByGroupName(w http.ResponseWriter, r *http.Request) {
|
||||
vars := mux.Vars(r)
|
||||
resp, err := etcdClient.Get(context.TODO(), path.Join(conf.Config.Cmd, vars["name"]), clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))
|
||||
if err != nil {
|
||||
outJSONError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
var jobList = make([]*cronsun.Job, 0, resp.Count)
|
||||
for i := range resp.Kvs {
|
||||
job := &cronsun.Job{}
|
||||
err = json.Unmarshal(resp.Kvs[i].Value, &job)
|
||||
if err != nil {
|
||||
outJSONError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
jobList = append(jobList)
|
||||
}
|
||||
|
||||
outJSON(w, jobList)
|
||||
}
|
||||
|
||||
func updateJob(w http.ResponseWriter, r *http.Request) {
|
||||
job := &cronsun.Job{}
|
||||
decoder := json.NewDecoder(r.Body)
|
||||
err := decoder.Decode(&job)
|
||||
if err != nil {
|
||||
outJSONError(w, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
r.Body.Close()
|
||||
|
||||
var creation bool
|
||||
if len(job.Id) == 0 {
|
||||
creation = true
|
||||
now := time.Now()
|
||||
h := sha1.Sum([]byte(strconv.FormatInt(now.Unix(), 10) + strconv.FormatInt(now.UnixNano(), 10)))
|
||||
job.Id = hex.EncodeToString(h[:])
|
||||
}
|
||||
|
||||
jobb, err := json.Marshal(job)
|
||||
if err != nil {
|
||||
outJSONError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
_, err = etcdClient.Put(context.TODO(), path.Join(conf.Config.Cmd, job.Group, job.Id), string(jobb))
|
||||
if err != nil {
|
||||
outJSONError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
statusCode := http.StatusOK
|
||||
if creation {
|
||||
statusCode = http.StatusCreated
|
||||
}
|
||||
outJSONWithCode(w, statusCode, nil)
|
||||
}
|
||||
|
||||
var ngKeyDeepLen = len(conf.Config.NodeGroup)
|
||||
|
||||
func getNodeGroups(w http.ResponseWriter, r *http.Request) {
|
||||
resp, err := etcdClient.Get(context.TODO(), conf.Config.NodeGroup, clientv3.WithPrefix(), clientv3.WithKeysOnly())
|
||||
if err != nil {
|
||||
outJSONError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
var groupMap = make(map[string]bool, 8)
|
||||
for i := range resp.Kvs {
|
||||
ss := strings.Split(string(resp.Kvs[i].Key), "/")
|
||||
groupMap[ss[ngKeyDeepLen]] = true
|
||||
}
|
||||
|
||||
var groupList = make([]string, 0, len(groupMap))
|
||||
for k := range groupMap {
|
||||
groupList = append(groupList, k)
|
||||
}
|
||||
|
||||
sort.Strings(groupList)
|
||||
outJSON(w, groupList)
|
||||
}
|
||||
|
||||
func getNodeGroupByName(w http.ResponseWriter, r *http.Request) {
|
||||
vars := mux.Vars(r)
|
||||
resp, err := etcdClient.Get(context.TODO(), path.Join(conf.Config.NodeGroup, vars["name"]), clientv3.WithPrefix(), clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))
|
||||
if err != nil {
|
||||
outJSONError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
var nodeList = make([]*cronsun.Node, 0, resp.Count)
|
||||
for i := range resp.Kvs {
|
||||
node := &cronsun.Node{}
|
||||
err = json.Unmarshal(resp.Kvs[i].Value, &node)
|
||||
if err != nil {
|
||||
outJSONError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
nodeList = append(nodeList)
|
||||
}
|
||||
|
||||
outJSON(w, nodeList)
|
||||
}
|
||||
|
||||
func nodeJoinGroup(w http.ResponseWriter, r *http.Request) {
|
||||
ng := []struct {
|
||||
Nodes []string
|
||||
Group string
|
||||
}{}
|
||||
|
||||
de := json.NewDecoder(r.Body)
|
||||
err := de.Decode(&ng)
|
||||
if err != nil {
|
||||
outJSONError(w, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
gresp, err := etcdClient.Get(context.TODO(), conf.Config.Proc, clientv3.WithPrefix(), clientv3.WithKeysOnly())
|
||||
if err != nil {
|
||||
log.Errorf("get nodes list failed: %s", err.Error())
|
||||
outJSONError(w, http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
var nodes map[string]bool
|
||||
for i := range gresp.Kvs {
|
||||
ip := strings.TrimLeft(string(gresp.Kvs[i].Key), conf.Config.Proc)
|
||||
nodes[ip] = true
|
||||
}
|
||||
|
||||
var errMsg string
|
||||
var status int
|
||||
NGLOOP:
|
||||
for i := range ng {
|
||||
g := strings.TrimSpace(ng[i].Group)
|
||||
if len(g) == 0 {
|
||||
errMsg = "group name is emtpy."
|
||||
status = http.StatusBadRequest
|
||||
break
|
||||
}
|
||||
|
||||
for _, n := range ng[i].Nodes {
|
||||
n = strings.TrimSpace(n)
|
||||
if len(n) == 0 {
|
||||
errMsg = fmt.Sprintf("[%s] node ip is emtpy.", g)
|
||||
status = http.StatusBadRequest
|
||||
break NGLOOP
|
||||
}
|
||||
|
||||
if _, ok := nodes[n]; !ok {
|
||||
errMsg = fmt.Sprintf("node[%s] not found.", n)
|
||||
status = http.StatusBadRequest
|
||||
break NGLOOP
|
||||
}
|
||||
|
||||
_, err = etcdClient.Put(context.TODO(), path.Join(conf.Config.NodeGroup, g, n), "")
|
||||
if err != nil {
|
||||
errMsg = "join failed: " + err.Error()
|
||||
status = http.StatusInternalServerError
|
||||
break NGLOOP
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(errMsg) > 0 {
|
||||
outJSONError(w, status, errMsg)
|
||||
return
|
||||
}
|
||||
|
||||
outJSON(w, nil)
|
||||
}
|
||||
|
||||
func nodeLeaveGroup(w http.ResponseWriter, r *http.Request) {}
|
|
@ -0,0 +1 @@
|
|||
package web
|
Loading…
Reference in New Issue