node: 调整 node 结构

pull/1/head
miraclesu 8 years ago
parent 28bc0af0c4
commit 45e7a8b1f5

@ -10,6 +10,7 @@ import (
"sunteng/commons/log"
"sunteng/cronsun/conf"
"sunteng/cronsun/models"
"sunteng/cronsun/node"
)
@ -23,7 +24,7 @@ func main() {
//set cpu usage
runtime.GOMAXPROCS(*gomax)
if err := conf.Init(); err != nil {
if err := models.Init(); err != nil {
log.Error(err.Error())
return
}

@ -8,10 +8,16 @@ import (
"sunteng/commons/event"
"sunteng/commons/log"
"sunteng/cronsun/conf"
"sunteng/cronsun/models"
"sunteng/cronsun/web"
)
func main() {
if err := models.Init(); err != nil {
log.Error(err.Error())
return
}
l, err := net.Listen("tcp", conf.Config.Web.BindAddr)
if err != nil {
log.Error(err.Error())

@ -68,7 +68,7 @@ func (c *Client) Get(key string, opts ...client.OpOption) (*client.GetResponse,
return c.Client.Get(ctx, key, opts...)
}
func (c *Client) Del(key string, opts ...client.OpOption) (*client.DeleteResponse, error) {
func (c *Client) Delete(key string, opts ...client.OpOption) (*client.DeleteResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout)
defer cancel()
return c.Client.Delete(ctx, key, opts...)

@ -1,8 +1,62 @@
package models
import (
"os"
"strconv"
"syscall"
client "github.com/coreos/etcd/clientv3"
"sunteng/cronsun/conf"
)
// 执行 cron cmd 的进程
// 注册到 /cronsun/proc/<id>
type Node struct {
ID string `json:"-"` // ip
PID string `json:"pid"` // 进程 pid
}
func (n *Node) String() string {
return "node[" + n.ID + "] pid[" + n.PID + "]"
}
func (n *Node) Put(opts ...client.OpOption) (*client.PutResponse, error) {
return DefalutClient.Put(conf.Config.Proc+n.ID, n.PID, opts...)
}
func (n *Node) Del() (*client.DeleteResponse, error) {
return DefalutClient.Delete(conf.Config.Proc+n.ID, client.WithFromKey())
}
// 判断 node 是否已注册到 etcd
// 存在则返回进行 pid不存在返回 -1
func (n *Node) Exist() (pid int, err error) {
resp, err := DefalutClient.Get(conf.Config.Proc+n.ID, client.WithFromKey())
if err != nil {
return
}
if len(resp.Kvs) == 0 {
return -1, nil
}
if pid, err = strconv.Atoi(string(resp.Kvs[0].Value)); err != nil {
if _, err = DefalutClient.Delete(conf.Config.Proc+n.ID, client.WithFromKey()); err != nil {
return
}
return -1, nil
}
p, err := os.FindProcess(pid)
if err != nil {
return -1, nil
}
// TODO: 暂时不考虑 linux/unix 以外的系统
if p != nil && p.Signal(syscall.Signal(0)) == nil {
return
}
return -1, nil
}

@ -4,7 +4,6 @@ import (
"fmt"
"os"
"strconv"
"syscall"
"time"
"golang.org/x/net/context"
@ -14,18 +13,15 @@ import (
"sunteng/commons/log"
"sunteng/commons/util"
"sunteng/cronsun/conf"
"sunteng/cronsun/models"
)
// Node 执行 cron 命令服务的结构体
type Node struct {
*client.Client
*models.Client
*models.Node
ttl int64
reqTimeout time.Duration
prefix string
Key string
PID string
ttl int64
lID client.LeaseID // lease id
lch <-chan *client.LeaseKeepAliveResponse
@ -39,39 +35,28 @@ func NewNode(cfg *conf.Conf) (n *Node, err error) {
return
}
cli, err := client.New(cfg.Etcd)
if err != nil {
return
}
n = &Node{
Client: cli,
ttl: cfg.Ttl,
reqTimeout: time.Duration(cfg.ReqTimeout) * time.Second,
prefix: cfg.Proc,
Key: cfg.Proc + ip.String(),
PID: strconv.Itoa(os.Getpid()),
Client: models.DefalutClient,
Node: &models.Node{
ID: ip.String(),
PID: strconv.Itoa(os.Getpid()),
},
ttl: cfg.Ttl,
done: make(chan struct{}),
}
return
}
func (n *Node) String() string {
return "node[" + n.Key[len(n.prefix):] + "] pid[" + n.PID + "]"
}
// 注册到 /cronsun/proc/xx
func (n *Node) Register() (err error) {
pid, err := n.Exist()
pid, err := n.Node.Exist()
if err != nil {
return
}
if pid != -1 {
return fmt.Errorf("node[%s] pid[%d] exist", n.Key[len(n.prefix):], pid)
return fmt.Errorf("node[%s] pid[%d] exist", n.Node.ID, pid)
}
resp, err := n.Client.Grant(context.TODO(), n.ttl)
@ -79,7 +64,7 @@ func (n *Node) Register() (err error) {
return
}
if _, err = n.Client.Put(context.TODO(), n.Key, n.PID, client.WithLease(resp.ID)); err != nil {
if _, err = n.Node.Put(client.WithLease(resp.ID)); err != nil {
return
}
@ -92,40 +77,6 @@ func (n *Node) Register() (err error) {
return
}
// 判断 node 是否已注册到 etcd
// 存在则返回进行 pid不存在返回 -1
func (n *Node) Exist() (pid int, err error) {
ctx, cancel := context.WithTimeout(context.Background(), n.reqTimeout)
resp, err := n.Client.Get(ctx, n.Key, client.WithFromKey())
defer cancel()
if err != nil {
return
}
if len(resp.Kvs) == 0 {
return -1, nil
}
if pid, err = strconv.Atoi(string(resp.Kvs[0].Value)); err != nil {
if _, err = n.Client.Delete(ctx, n.Key, client.WithFromKey()); err != nil {
return
}
return -1, nil
}
p, err := os.FindProcess(pid)
if err != nil {
return -1, nil
}
// TODO: 暂时不考虑 linux/unix 以外的系统
if p != nil && p.Signal(syscall.Signal(0)) == nil {
return
}
return -1, nil
}
// 启动服务
func (n *Node) Run() {
go n.keepAlive()
@ -156,9 +107,6 @@ func (n *Node) keepAlive() {
// 启动服务
func (n *Node) Stop(i interface{}) {
close(n.done)
// 防止断网时卡住
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
n.Client.Delete(ctx, n.Key, client.WithFromKey())
cancel()
n.Node.Del()
n.Client.Close()
}

Loading…
Cancel
Save