mirror of https://github.com/shunfei/cronsun
Merge branch 'master' of github.com:shunfei/cronsun into fix/close-smtp
commit
7a0947fde7
|
@ -3,9 +3,7 @@ go:
|
|||
- 1.9
|
||||
install:
|
||||
- go get -u github.com/coreos/etcd/clientv3
|
||||
- go get github.com/coreos/etcd/mvcc/mvccpb
|
||||
- go get github.com/rogpeppe/fastuuid
|
||||
- go get golang.org/x/net/context
|
||||
- go get gopkg.in/mgo.v2
|
||||
- go get github.com/fsnotify/fsnotify
|
||||
- go get github.com/go-gomail/gomail
|
||||
|
|
43
client.go
43
client.go
|
@ -1,11 +1,11 @@
|
|||
package cronsun
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
|
||||
client "github.com/coreos/etcd/clientv3"
|
||||
|
||||
"github.com/shunfei/cronsun/conf"
|
||||
|
@ -36,7 +36,7 @@ func NewClient(cfg *conf.Conf) (c *Client, err error) {
|
|||
}
|
||||
|
||||
func (c *Client) Put(key, val string, opts ...client.OpOption) (*client.PutResponse, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout)
|
||||
ctx, cancel := NewEtcdTimeoutContext(c)
|
||||
defer cancel()
|
||||
return c.Client.Put(ctx, key, val, opts...)
|
||||
}
|
||||
|
@ -46,7 +46,7 @@ func (c *Client) PutWithModRev(key, val string, rev int64) (*client.PutResponse,
|
|||
return c.Put(key, val)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout)
|
||||
ctx, cancel := NewEtcdTimeoutContext(c)
|
||||
tresp, err := DefalutClient.Txn(ctx).
|
||||
If(client.Compare(client.ModRevision(key), "=", rev)).
|
||||
Then(client.OpPut(key, val)).
|
||||
|
@ -65,13 +65,13 @@ func (c *Client) PutWithModRev(key, val string, rev int64) (*client.PutResponse,
|
|||
}
|
||||
|
||||
func (c *Client) Get(key string, opts ...client.OpOption) (*client.GetResponse, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout)
|
||||
ctx, cancel := NewEtcdTimeoutContext(c)
|
||||
defer cancel()
|
||||
return c.Client.Get(ctx, key, opts...)
|
||||
}
|
||||
|
||||
func (c *Client) Delete(key string, opts ...client.OpOption) (*client.DeleteResponse, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout)
|
||||
ctx, cancel := NewEtcdTimeoutContext(c)
|
||||
defer cancel()
|
||||
return c.Client.Delete(ctx, key, opts...)
|
||||
}
|
||||
|
@ -81,20 +81,20 @@ func (c *Client) Watch(key string, opts ...client.OpOption) client.WatchChan {
|
|||
}
|
||||
|
||||
func (c *Client) Grant(ttl int64) (*client.LeaseGrantResponse, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout)
|
||||
ctx, cancel := NewEtcdTimeoutContext(c)
|
||||
defer cancel()
|
||||
return c.Client.Grant(ctx, ttl)
|
||||
}
|
||||
|
||||
func (c *Client) KeepAliveOnce(id client.LeaseID) (*client.LeaseKeepAliveResponse, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout)
|
||||
ctx, cancel := NewEtcdTimeoutContext(c)
|
||||
defer cancel()
|
||||
return c.Client.KeepAliveOnce(ctx, id)
|
||||
}
|
||||
|
||||
func (c *Client) GetLock(key string, id client.LeaseID) (bool, error) {
|
||||
key = conf.Config.Lock + key
|
||||
ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout)
|
||||
ctx, cancel := NewEtcdTimeoutContext(c)
|
||||
resp, err := DefalutClient.Txn(ctx).
|
||||
If(client.Compare(client.CreateRevision(key), "=", 0)).
|
||||
Then(client.OpPut(key, "", client.WithLease(id))).
|
||||
|
@ -116,3 +116,28 @@ func (c *Client) DelLock(key string) error {
|
|||
func IsValidAsKeyPath(s string) bool {
|
||||
return strings.IndexByte(s, '/') == -1
|
||||
}
|
||||
|
||||
// etcdTimeoutContext return better error info
|
||||
type etcdTimeoutContext struct {
|
||||
context.Context
|
||||
|
||||
etcdEndpoints []string
|
||||
}
|
||||
|
||||
func (c *etcdTimeoutContext) Err() error {
|
||||
err := c.Context.Err()
|
||||
if err == context.DeadlineExceeded {
|
||||
err = fmt.Errorf("%s: etcd(%v) maybe lost",
|
||||
err, c.etcdEndpoints)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// NewEtcdTimeoutContext return a new etcdTimeoutContext
|
||||
func NewEtcdTimeoutContext(c *Client) (context.Context, context.CancelFunc) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout)
|
||||
etcdCtx := &etcdTimeoutContext{}
|
||||
etcdCtx.Context = ctx
|
||||
etcdCtx.etcdEndpoints = c.Endpoints()
|
||||
return etcdCtx, cancel
|
||||
}
|
||||
|
|
|
@ -24,7 +24,7 @@ func On(name string, fs ...func(interface{})) error {
|
|||
}
|
||||
|
||||
for _, f := range fs {
|
||||
if fs == nil {
|
||||
if f == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
7
group.go
7
group.go
|
@ -6,7 +6,6 @@ import (
|
|||
"strings"
|
||||
|
||||
client "github.com/coreos/etcd/clientv3"
|
||||
"github.com/coreos/etcd/mvcc/mvccpb"
|
||||
|
||||
"github.com/shunfei/cronsun/conf"
|
||||
"github.com/shunfei/cronsun/log"
|
||||
|
@ -65,10 +64,10 @@ func WatchGroups() client.WatchChan {
|
|||
return DefalutClient.Watch(conf.Config.Group, client.WithPrefix(), client.WithPrevKV())
|
||||
}
|
||||
|
||||
func GetGroupFromKv(kv *mvccpb.KeyValue) (g *Group, err error) {
|
||||
func GetGroupFromKv(key, value []byte) (g *Group, err error) {
|
||||
g = new(Group)
|
||||
if err = json.Unmarshal(kv.Value, g); err != nil {
|
||||
err = fmt.Errorf("group[%s] umarshal err: %s", string(kv.Key), err.Error())
|
||||
if err = json.Unmarshal(value, g); err != nil {
|
||||
err = fmt.Errorf("group[%s] umarshal err: %s", string(key), err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
|
|
10
job.go
10
job.go
|
@ -2,6 +2,7 @@ package cronsun
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os/exec"
|
||||
|
@ -13,10 +14,7 @@ import (
|
|||
"syscall"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
|
||||
client "github.com/coreos/etcd/clientv3"
|
||||
"github.com/coreos/etcd/mvcc/mvccpb"
|
||||
|
||||
"github.com/shunfei/cronsun/conf"
|
||||
"github.com/shunfei/cronsun/log"
|
||||
|
@ -371,10 +369,10 @@ func WatchJobs() client.WatchChan {
|
|||
return DefalutClient.Watch(conf.Config.Cmd, client.WithPrefix())
|
||||
}
|
||||
|
||||
func GetJobFromKv(kv *mvccpb.KeyValue) (job *Job, err error) {
|
||||
func GetJobFromKv(key, value []byte) (job *Job, err error) {
|
||||
job = new(Job)
|
||||
if err = json.Unmarshal(kv.Value, job); err != nil {
|
||||
err = fmt.Errorf("job[%s] umarshal err: %s", string(kv.Key), err.Error())
|
||||
if err = json.Unmarshal(value, job); err != nil {
|
||||
err = fmt.Errorf("job[%s] umarshal err: %s", string(key), err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -364,7 +364,7 @@ func (n *Node) watchJobs() {
|
|||
for _, ev := range wresp.Events {
|
||||
switch {
|
||||
case ev.IsCreate():
|
||||
job, err := cronsun.GetJobFromKv(ev.Kv)
|
||||
job, err := cronsun.GetJobFromKv(ev.Kv.Key, ev.Kv.Value)
|
||||
if err != nil {
|
||||
log.Warnf("err: %s, kv: %s", err.Error(), ev.Kv.String())
|
||||
continue
|
||||
|
@ -373,7 +373,7 @@ func (n *Node) watchJobs() {
|
|||
job.Init(n.ID)
|
||||
n.addJob(job, true)
|
||||
case ev.IsModify():
|
||||
job, err := cronsun.GetJobFromKv(ev.Kv)
|
||||
job, err := cronsun.GetJobFromKv(ev.Kv.Key, ev.Kv.Value)
|
||||
if err != nil {
|
||||
log.Warnf("err: %s, kv: %s", err.Error(), ev.Kv.String())
|
||||
continue
|
||||
|
@ -396,7 +396,7 @@ func (n *Node) watchGroups() {
|
|||
for _, ev := range wresp.Events {
|
||||
switch {
|
||||
case ev.IsCreate():
|
||||
g, err := cronsun.GetGroupFromKv(ev.Kv)
|
||||
g, err := cronsun.GetGroupFromKv(ev.Kv.Key, ev.Kv.Value)
|
||||
if err != nil {
|
||||
log.Warnf("err: %s, kv: %s", err.Error(), ev.Kv.String())
|
||||
continue
|
||||
|
@ -404,7 +404,7 @@ func (n *Node) watchGroups() {
|
|||
|
||||
n.addGroup(g)
|
||||
case ev.IsModify():
|
||||
g, err := cronsun.GetGroupFromKv(ev.Kv)
|
||||
g, err := cronsun.GetGroupFromKv(ev.Kv.Key, ev.Kv.Value)
|
||||
if err != nil {
|
||||
log.Warnf("err: %s, kv: %s", err.Error(), ev.Kv.String())
|
||||
continue
|
||||
|
|
|
@ -3,6 +3,7 @@ package session
|
|||
import (
|
||||
"bytes"
|
||||
"encoding/gob"
|
||||
"errors"
|
||||
"net/http"
|
||||
|
||||
client "github.com/coreos/etcd/clientv3"
|
||||
|
@ -120,7 +121,7 @@ func (this *EtcdStore) Store(sess *Session) (err error) {
|
|||
if sess.leaseID == 0 {
|
||||
lresp, err := this.client.Grant(int64(this.conf.Expiration))
|
||||
if err != nil {
|
||||
return err
|
||||
return errors.New("etcd create new lease faild: " + err.Error()) // err
|
||||
}
|
||||
sess.leaseID = lresp.ID
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue