cronsun/node/node.go

155 lines
2.4 KiB
Go
Raw Normal View History

2017-01-04 09:27:36 +00:00
package node
import (
2017-01-05 12:35:41 +00:00
"fmt"
2017-01-05 02:37:48 +00:00
"os"
2017-01-05 12:35:41 +00:00
"strconv"
"time"
"golang.org/x/net/context"
2017-01-05 02:37:48 +00:00
2017-01-04 09:27:36 +00:00
client "github.com/coreos/etcd/clientv3"
2017-01-05 02:37:48 +00:00
2017-01-06 08:44:42 +00:00
"sunteng/commons/log"
2017-01-05 02:37:48 +00:00
"sunteng/commons/util"
2017-01-05 12:35:41 +00:00
"sunteng/cronsun/conf"
2017-01-09 10:51:52 +00:00
"sunteng/cronsun/models"
"sunteng/cronsun/node/cron"
2017-01-05 12:35:41 +00:00
)
2017-01-04 09:27:36 +00:00
// Node 执行 cron 命令服务的结构体
type Node struct {
2017-01-09 10:51:52 +00:00
*models.Client
*models.Node
*cron.Cron
jobs Job
groups Group
2017-01-05 02:37:48 +00:00
2017-01-09 10:51:52 +00:00
ttl int64
2017-01-05 12:35:41 +00:00
lID client.LeaseID // lease id
2017-01-06 08:44:42 +00:00
lch <-chan *client.LeaseKeepAliveResponse
done chan struct{}
2017-01-04 09:27:36 +00:00
}
2017-01-06 08:44:42 +00:00
func NewNode(cfg *conf.Conf) (n *Node, err error) {
2017-01-05 02:37:48 +00:00
ip, err := util.GetLocalIP()
if err != nil {
return
}
n = &Node{
2017-01-09 10:51:52 +00:00
Client: models.DefalutClient,
Node: &models.Node{
ID: ip.String(),
PID: strconv.Itoa(os.Getpid()),
},
Cron: cron.New(),
2017-01-06 08:44:42 +00:00
2017-01-09 10:51:52 +00:00
ttl: cfg.Ttl,
2017-01-06 08:44:42 +00:00
done: make(chan struct{}),
2017-01-05 02:37:48 +00:00
}
return
2017-01-04 09:27:36 +00:00
}
// 注册到 /cronsun/proc/xx
2017-01-05 12:35:41 +00:00
func (n *Node) Register() (err error) {
2017-01-09 10:51:52 +00:00
pid, err := n.Node.Exist()
2017-01-05 12:35:41 +00:00
if err != nil {
return
}
if pid != -1 {
2017-01-09 10:51:52 +00:00
return fmt.Errorf("node[%s] pid[%d] exist", n.Node.ID, pid)
2017-01-05 12:35:41 +00:00
}
2017-01-06 08:59:31 +00:00
resp, err := n.Client.Grant(context.TODO(), n.ttl)
2017-01-05 12:35:41 +00:00
if err != nil {
return
}
2017-01-09 10:51:52 +00:00
if _, err = n.Node.Put(client.WithLease(resp.ID)); err != nil {
2017-01-05 12:35:41 +00:00
return
}
2017-01-06 08:44:42 +00:00
ch, err := n.Client.KeepAlive(context.TODO(), resp.ID)
if err != nil {
2017-01-05 12:35:41 +00:00
return
}
2017-01-06 08:44:42 +00:00
n.lID, n.lch = resp.ID, ch
2017-01-05 12:35:41 +00:00
return
2017-01-04 09:27:36 +00:00
}
func (n *Node) addJobs() {
for _, job := range n.jobs {
schs, ok := job.Schedule(n.ID)
if !ok {
log.Warnf("job[%s] has no schedules, will skip", job.ID)
continue
}
for _, sch := range schs {
if err := n.Cron.AddJob(sch, job); err != nil {
log.Warnf("job[%s] timer[%s] parse err: %s", job.ID, sch)
continue
}
}
}
}
2017-01-05 02:37:48 +00:00
// 启动服务
func (n *Node) Run() (err error) {
2017-01-06 08:44:42 +00:00
go n.keepAlive()
defer func() {
if err != nil {
n.Stop(nil)
}
}()
if n.groups, err = models.GetGroups(); err != nil {
return
}
if n.jobs, err = newJob(n.ID, n.groups); err != nil {
return
}
n.addJobs()
// TODO add&del job
n.Cron.Start()
return
2017-01-06 08:44:42 +00:00
}
// 断网掉线重新注册
func (n *Node) keepAlive() {
for {
for _ = range n.lch {
}
2017-01-05 02:37:48 +00:00
2017-01-06 08:44:42 +00:00
select {
case <-n.done:
return
default:
}
time.Sleep(time.Duration(n.ttl+1) * time.Second)
log.Noticef("%s has dropped, try to reconnect...", n.String())
if err := n.Register(); err != nil {
log.Warn(err.Error())
} else {
log.Noticef("%s reconnected", n.String())
}
}
2017-01-05 02:37:48 +00:00
}
// 启动服务
func (n *Node) Stop(i interface{}) {
2017-01-06 08:44:42 +00:00
close(n.done)
2017-01-09 10:51:52 +00:00
n.Node.Del()
2017-01-05 02:37:48 +00:00
n.Client.Close()
n.Cron.Stop()
2017-01-05 02:37:48 +00:00
}