diff --git a/.travis.yml b/.travis.yml index e9f1f0c..79ed13d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,9 +3,7 @@ go: - 1.9 install: - go get -u github.com/coreos/etcd/clientv3 -- go get github.com/coreos/etcd/mvcc/mvccpb - go get github.com/rogpeppe/fastuuid -- go get golang.org/x/net/context - go get gopkg.in/mgo.v2 - go get github.com/fsnotify/fsnotify - go get github.com/go-gomail/gomail diff --git a/client.go b/client.go index f980b2e..c2b30ec 100644 --- a/client.go +++ b/client.go @@ -1,11 +1,11 @@ package cronsun import ( + "context" + "fmt" "strings" "time" - "golang.org/x/net/context" - client "github.com/coreos/etcd/clientv3" "github.com/shunfei/cronsun/conf" @@ -36,7 +36,7 @@ func NewClient(cfg *conf.Conf) (c *Client, err error) { } func (c *Client) Put(key, val string, opts ...client.OpOption) (*client.PutResponse, error) { - ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout) + ctx, cancel := NewEtcdTimeoutContext(c) defer cancel() return c.Client.Put(ctx, key, val, opts...) } @@ -46,7 +46,7 @@ func (c *Client) PutWithModRev(key, val string, rev int64) (*client.PutResponse, return c.Put(key, val) } - ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout) + ctx, cancel := NewEtcdTimeoutContext(c) tresp, err := DefalutClient.Txn(ctx). If(client.Compare(client.ModRevision(key), "=", rev)). Then(client.OpPut(key, val)). @@ -65,13 +65,13 @@ func (c *Client) PutWithModRev(key, val string, rev int64) (*client.PutResponse, } func (c *Client) Get(key string, opts ...client.OpOption) (*client.GetResponse, error) { - ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout) + ctx, cancel := NewEtcdTimeoutContext(c) defer cancel() return c.Client.Get(ctx, key, opts...) } func (c *Client) Delete(key string, opts ...client.OpOption) (*client.DeleteResponse, error) { - ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout) + ctx, cancel := NewEtcdTimeoutContext(c) defer cancel() return c.Client.Delete(ctx, key, opts...) } @@ -81,20 +81,26 @@ func (c *Client) Watch(key string, opts ...client.OpOption) client.WatchChan { } func (c *Client) Grant(ttl int64) (*client.LeaseGrantResponse, error) { - ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout) + ctx, cancel := NewEtcdTimeoutContext(c) defer cancel() return c.Client.Grant(ctx, ttl) } -func (c *Client) KeepAliveOnce(id client.LeaseID) (*client.LeaseKeepAliveResponse, error) { +func (c *Client) Revoke(id client.LeaseID) (*client.LeaseRevokeResponse, error) { ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout) defer cancel() + return c.Client.Revoke(ctx, id) +} + +func (c *Client) KeepAliveOnce(id client.LeaseID) (*client.LeaseKeepAliveResponse, error) { + ctx, cancel := NewEtcdTimeoutContext(c) + defer cancel() return c.Client.KeepAliveOnce(ctx, id) } func (c *Client) GetLock(key string, id client.LeaseID) (bool, error) { key = conf.Config.Lock + key - ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout) + ctx, cancel := NewEtcdTimeoutContext(c) resp, err := DefalutClient.Txn(ctx). If(client.Compare(client.CreateRevision(key), "=", 0)). Then(client.OpPut(key, "", client.WithLease(id))). @@ -116,3 +122,28 @@ func (c *Client) DelLock(key string) error { func IsValidAsKeyPath(s string) bool { return strings.IndexByte(s, '/') == -1 } + +// etcdTimeoutContext return better error info +type etcdTimeoutContext struct { + context.Context + + etcdEndpoints []string +} + +func (c *etcdTimeoutContext) Err() error { + err := c.Context.Err() + if err == context.DeadlineExceeded { + err = fmt.Errorf("%s: etcd(%v) maybe lost", + err, c.etcdEndpoints) + } + return err +} + +// NewEtcdTimeoutContext return a new etcdTimeoutContext +func NewEtcdTimeoutContext(c *Client) (context.Context, context.CancelFunc) { + ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout) + etcdCtx := &etcdTimeoutContext{} + etcdCtx.Context = ctx + etcdCtx.etcdEndpoints = c.Endpoints() + return etcdCtx, cancel +} diff --git a/event/event.go b/event/event.go index 8f23c63..13fea80 100644 --- a/event/event.go +++ b/event/event.go @@ -24,7 +24,7 @@ func On(name string, fs ...func(interface{})) error { } for _, f := range fs { - if fs == nil { + if f == nil { continue } diff --git a/group.go b/group.go index e2b99c0..a2ffeea 100644 --- a/group.go +++ b/group.go @@ -6,7 +6,6 @@ import ( "strings" client "github.com/coreos/etcd/clientv3" - "github.com/coreos/etcd/mvcc/mvccpb" "github.com/shunfei/cronsun/conf" "github.com/shunfei/cronsun/log" @@ -65,10 +64,10 @@ func WatchGroups() client.WatchChan { return DefalutClient.Watch(conf.Config.Group, client.WithPrefix(), client.WithPrevKV()) } -func GetGroupFromKv(kv *mvccpb.KeyValue) (g *Group, err error) { +func GetGroupFromKv(key, value []byte) (g *Group, err error) { g = new(Group) - if err = json.Unmarshal(kv.Value, g); err != nil { - err = fmt.Errorf("group[%s] umarshal err: %s", string(kv.Key), err.Error()) + if err = json.Unmarshal(value, g); err != nil { + err = fmt.Errorf("group[%s] umarshal err: %s", string(key), err.Error()) } return } diff --git a/job.go b/job.go index a371022..fed083c 100644 --- a/job.go +++ b/job.go @@ -2,6 +2,7 @@ package cronsun import ( "bytes" + "context" "encoding/json" "fmt" "os/exec" @@ -13,10 +14,7 @@ import ( "syscall" "time" - "golang.org/x/net/context" - client "github.com/coreos/etcd/clientv3" - "github.com/coreos/etcd/mvcc/mvccpb" "github.com/shunfei/cronsun/conf" "github.com/shunfei/cronsun/log" @@ -120,8 +118,8 @@ func (l *locker) unlock() { close(l.done) l.timer.Stop() - if _, err := DefalutClient.KeepAliveOnce(l.lID); err != nil { - log.Warnf("unlock keep alive err: %s", err.Error()) + if _, err := DefalutClient.Revoke(l.lID); err != nil { + log.Warnf("unlock revoke err: %s", err.Error()) } } @@ -372,10 +370,10 @@ func WatchJobs() client.WatchChan { return DefalutClient.Watch(conf.Config.Cmd, client.WithPrefix()) } -func GetJobFromKv(kv *mvccpb.KeyValue) (job *Job, err error) { +func GetJobFromKv(key, value []byte) (job *Job, err error) { job = new(Job) - if err = json.Unmarshal(kv.Value, job); err != nil { - err = fmt.Errorf("job[%s] umarshal err: %s", string(kv.Key), err.Error()) + if err = json.Unmarshal(value, job); err != nil { + err = fmt.Errorf("job[%s] umarshal err: %s", string(key), err.Error()) return } diff --git a/node/node.go b/node/node.go index bffdccc..184ea69 100644 --- a/node/node.go +++ b/node/node.go @@ -371,7 +371,7 @@ func (n *Node) watchJobs() { for _, ev := range wresp.Events { switch { case ev.IsCreate(): - job, err := cronsun.GetJobFromKv(ev.Kv) + job, err := cronsun.GetJobFromKv(ev.Kv.Key, ev.Kv.Value) if err != nil { log.Warnf("err: %s, kv: %s", err.Error(), ev.Kv.String()) continue @@ -380,7 +380,7 @@ func (n *Node) watchJobs() { job.Init(n.ID) n.addJob(job, true) case ev.IsModify(): - job, err := cronsun.GetJobFromKv(ev.Kv) + job, err := cronsun.GetJobFromKv(ev.Kv.Key, ev.Kv.Value) if err != nil { log.Warnf("err: %s, kv: %s", err.Error(), ev.Kv.String()) continue @@ -403,7 +403,7 @@ func (n *Node) watchGroups() { for _, ev := range wresp.Events { switch { case ev.IsCreate(): - g, err := cronsun.GetGroupFromKv(ev.Kv) + g, err := cronsun.GetGroupFromKv(ev.Kv.Key, ev.Kv.Value) if err != nil { log.Warnf("err: %s, kv: %s", err.Error(), ev.Kv.String()) continue @@ -411,7 +411,7 @@ func (n *Node) watchGroups() { n.addGroup(g) case ev.IsModify(): - g, err := cronsun.GetGroupFromKv(ev.Kv) + g, err := cronsun.GetGroupFromKv(ev.Kv.Key, ev.Kv.Value) if err != nil { log.Warnf("err: %s, kv: %s", err.Error(), ev.Kv.String()) continue diff --git a/noticer.go b/noticer.go index f95e666..53377ea 100644 --- a/noticer.go +++ b/noticer.go @@ -94,9 +94,8 @@ func (m *Mail) Serve() { if m.open { if err = m.sc.Close(); err != nil { log.Warnf("close smtp server err: %s", err.Error()) - } else { - m.open = false } + m.open = false } m.timer.Reset(time.Duration(m.cf.Keepalive) * time.Second) } diff --git a/web/session/session.go b/web/session/session.go index 660eb76..434fa76 100644 --- a/web/session/session.go +++ b/web/session/session.go @@ -3,6 +3,7 @@ package session import ( "bytes" "encoding/gob" + "errors" "net/http" client "github.com/coreos/etcd/clientv3" @@ -120,7 +121,7 @@ func (this *EtcdStore) Store(sess *Session) (err error) { if sess.leaseID == 0 { lresp, err := this.client.Grant(int64(this.conf.Expiration)) if err != nil { - return err + return errors.New("etcd create new lease faild: " + err.Error()) // err } sess.leaseID = lresp.ID }