From fe16425072f8761a7bdcbc8f0870d117be28d9f5 Mon Sep 17 00:00:00 2001 From: QLeelulu Date: Sat, 24 Feb 2018 14:31:22 +0800 Subject: [PATCH 1/3] remove github.com/coreos/etcd/mvcc/mvccpb --- .travis.yml | 1 - group.go | 7 +++---- job.go | 7 +++---- node/node.go | 8 ++++---- 4 files changed, 10 insertions(+), 13 deletions(-) diff --git a/.travis.yml b/.travis.yml index e9f1f0c..5e9922f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,7 +3,6 @@ go: - 1.9 install: - go get -u github.com/coreos/etcd/clientv3 -- go get github.com/coreos/etcd/mvcc/mvccpb - go get github.com/rogpeppe/fastuuid - go get golang.org/x/net/context - go get gopkg.in/mgo.v2 diff --git a/group.go b/group.go index e2b99c0..a2ffeea 100644 --- a/group.go +++ b/group.go @@ -6,7 +6,6 @@ import ( "strings" client "github.com/coreos/etcd/clientv3" - "github.com/coreos/etcd/mvcc/mvccpb" "github.com/shunfei/cronsun/conf" "github.com/shunfei/cronsun/log" @@ -65,10 +64,10 @@ func WatchGroups() client.WatchChan { return DefalutClient.Watch(conf.Config.Group, client.WithPrefix(), client.WithPrevKV()) } -func GetGroupFromKv(kv *mvccpb.KeyValue) (g *Group, err error) { +func GetGroupFromKv(key, value []byte) (g *Group, err error) { g = new(Group) - if err = json.Unmarshal(kv.Value, g); err != nil { - err = fmt.Errorf("group[%s] umarshal err: %s", string(kv.Key), err.Error()) + if err = json.Unmarshal(value, g); err != nil { + err = fmt.Errorf("group[%s] umarshal err: %s", string(key), err.Error()) } return } diff --git a/job.go b/job.go index 0cb4b8b..6abce69 100644 --- a/job.go +++ b/job.go @@ -16,7 +16,6 @@ import ( "golang.org/x/net/context" client "github.com/coreos/etcd/clientv3" - "github.com/coreos/etcd/mvcc/mvccpb" "github.com/shunfei/cronsun/conf" "github.com/shunfei/cronsun/log" @@ -371,10 +370,10 @@ func WatchJobs() client.WatchChan { return DefalutClient.Watch(conf.Config.Cmd, client.WithPrefix()) } -func GetJobFromKv(kv *mvccpb.KeyValue) (job *Job, err error) { +func GetJobFromKv(key, value []byte) (job *Job, err error) { job = new(Job) - if err = json.Unmarshal(kv.Value, job); err != nil { - err = fmt.Errorf("job[%s] umarshal err: %s", string(kv.Key), err.Error()) + if err = json.Unmarshal(value, job); err != nil { + err = fmt.Errorf("job[%s] umarshal err: %s", string(key), err.Error()) return } diff --git a/node/node.go b/node/node.go index da6de2d..5c06a3b 100644 --- a/node/node.go +++ b/node/node.go @@ -364,7 +364,7 @@ func (n *Node) watchJobs() { for _, ev := range wresp.Events { switch { case ev.IsCreate(): - job, err := cronsun.GetJobFromKv(ev.Kv) + job, err := cronsun.GetJobFromKv(ev.Kv.Key, ev.Kv.Value) if err != nil { log.Warnf("err: %s, kv: %s", err.Error(), ev.Kv.String()) continue @@ -373,7 +373,7 @@ func (n *Node) watchJobs() { job.Init(n.ID) n.addJob(job, true) case ev.IsModify(): - job, err := cronsun.GetJobFromKv(ev.Kv) + job, err := cronsun.GetJobFromKv(ev.Kv.Key, ev.Kv.Value) if err != nil { log.Warnf("err: %s, kv: %s", err.Error(), ev.Kv.String()) continue @@ -396,7 +396,7 @@ func (n *Node) watchGroups() { for _, ev := range wresp.Events { switch { case ev.IsCreate(): - g, err := cronsun.GetGroupFromKv(ev.Kv) + g, err := cronsun.GetGroupFromKv(ev.Kv.Key, ev.Kv.Value) if err != nil { log.Warnf("err: %s, kv: %s", err.Error(), ev.Kv.String()) continue @@ -404,7 +404,7 @@ func (n *Node) watchGroups() { n.addGroup(g) case ev.IsModify(): - g, err := cronsun.GetGroupFromKv(ev.Kv) + g, err := cronsun.GetGroupFromKv(ev.Kv.Key, ev.Kv.Value) if err != nil { log.Warnf("err: %s, kv: %s", err.Error(), ev.Kv.String()) continue From 9ff3c7eec190d53f996e22b2a5c7cd5d5539dbc5 Mon Sep 17 00:00:00 2001 From: QLeelulu Date: Sat, 24 Feb 2018 15:29:16 +0800 Subject: [PATCH 2/3] change golang.org/x/net/context to context. etcd better "context deadline exceeded" error info. --- .travis.yml | 1 - client.go | 43 +++++++++++++++++++++++++++++++++--------- job.go | 3 +-- web/session/session.go | 3 ++- 4 files changed, 37 insertions(+), 13 deletions(-) diff --git a/.travis.yml b/.travis.yml index 5e9922f..79ed13d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,7 +4,6 @@ go: install: - go get -u github.com/coreos/etcd/clientv3 - go get github.com/rogpeppe/fastuuid -- go get golang.org/x/net/context - go get gopkg.in/mgo.v2 - go get github.com/fsnotify/fsnotify - go get github.com/go-gomail/gomail diff --git a/client.go b/client.go index f980b2e..a14ea91 100644 --- a/client.go +++ b/client.go @@ -1,11 +1,11 @@ package cronsun import ( + "context" + "fmt" "strings" "time" - "golang.org/x/net/context" - client "github.com/coreos/etcd/clientv3" "github.com/shunfei/cronsun/conf" @@ -36,7 +36,7 @@ func NewClient(cfg *conf.Conf) (c *Client, err error) { } func (c *Client) Put(key, val string, opts ...client.OpOption) (*client.PutResponse, error) { - ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout) + ctx, cancel := NewEtcdTimeoutContext(c) defer cancel() return c.Client.Put(ctx, key, val, opts...) } @@ -46,7 +46,7 @@ func (c *Client) PutWithModRev(key, val string, rev int64) (*client.PutResponse, return c.Put(key, val) } - ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout) + ctx, cancel := NewEtcdTimeoutContext(c) tresp, err := DefalutClient.Txn(ctx). If(client.Compare(client.ModRevision(key), "=", rev)). Then(client.OpPut(key, val)). @@ -65,13 +65,13 @@ func (c *Client) PutWithModRev(key, val string, rev int64) (*client.PutResponse, } func (c *Client) Get(key string, opts ...client.OpOption) (*client.GetResponse, error) { - ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout) + ctx, cancel := NewEtcdTimeoutContext(c) defer cancel() return c.Client.Get(ctx, key, opts...) } func (c *Client) Delete(key string, opts ...client.OpOption) (*client.DeleteResponse, error) { - ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout) + ctx, cancel := NewEtcdTimeoutContext(c) defer cancel() return c.Client.Delete(ctx, key, opts...) } @@ -81,20 +81,20 @@ func (c *Client) Watch(key string, opts ...client.OpOption) client.WatchChan { } func (c *Client) Grant(ttl int64) (*client.LeaseGrantResponse, error) { - ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout) + ctx, cancel := NewEtcdTimeoutContext(c) defer cancel() return c.Client.Grant(ctx, ttl) } func (c *Client) KeepAliveOnce(id client.LeaseID) (*client.LeaseKeepAliveResponse, error) { - ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout) + ctx, cancel := NewEtcdTimeoutContext(c) defer cancel() return c.Client.KeepAliveOnce(ctx, id) } func (c *Client) GetLock(key string, id client.LeaseID) (bool, error) { key = conf.Config.Lock + key - ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout) + ctx, cancel := NewEtcdTimeoutContext(c) resp, err := DefalutClient.Txn(ctx). If(client.Compare(client.CreateRevision(key), "=", 0)). Then(client.OpPut(key, "", client.WithLease(id))). @@ -116,3 +116,28 @@ func (c *Client) DelLock(key string) error { func IsValidAsKeyPath(s string) bool { return strings.IndexByte(s, '/') == -1 } + +// etcdTimeoutContext return better error info +type etcdTimeoutContext struct { + context.Context + + etcdEndpoints []string +} + +func (c *etcdTimeoutContext) Err() error { + err := c.Context.Err() + if err == context.DeadlineExceeded { + err = fmt.Errorf("%s: etcd(%v) maybe lost", + err, c.etcdEndpoints) + } + return err +} + +// NewEtcdTimeoutContext return a new etcdTimeoutContext +func NewEtcdTimeoutContext(c *Client) (context.Context, context.CancelFunc) { + ctx, cancel := context.WithTimeout(context.Background(), c.reqTimeout) + etcdCtx := &etcdTimeoutContext{} + etcdCtx.Context = ctx + etcdCtx.etcdEndpoints = c.Endpoints() + return etcdCtx, cancel +} diff --git a/job.go b/job.go index 6abce69..0ff7f14 100644 --- a/job.go +++ b/job.go @@ -2,6 +2,7 @@ package cronsun import ( "bytes" + "context" "encoding/json" "fmt" "os/exec" @@ -13,8 +14,6 @@ import ( "syscall" "time" - "golang.org/x/net/context" - client "github.com/coreos/etcd/clientv3" "github.com/shunfei/cronsun/conf" diff --git a/web/session/session.go b/web/session/session.go index 660eb76..434fa76 100644 --- a/web/session/session.go +++ b/web/session/session.go @@ -3,6 +3,7 @@ package session import ( "bytes" "encoding/gob" + "errors" "net/http" client "github.com/coreos/etcd/clientv3" @@ -120,7 +121,7 @@ func (this *EtcdStore) Store(sess *Session) (err error) { if sess.leaseID == 0 { lresp, err := this.client.Grant(int64(this.conf.Expiration)) if err != nil { - return err + return errors.New("etcd create new lease faild: " + err.Error()) // err } sess.leaseID = lresp.ID } From 5a1d0fe5908f7121e1d8527d8984b8cdef11e73e Mon Sep 17 00:00:00 2001 From: QLeelulu Date: Sat, 24 Feb 2018 15:49:58 +0800 Subject: [PATCH 3/3] bugfix: emit nil event --- event/event.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/event/event.go b/event/event.go index 8f23c63..13fea80 100644 --- a/event/event.go +++ b/event/event.go @@ -24,7 +24,7 @@ func On(name string, fs ...func(interface{})) error { } for _, f := range fs { - if fs == nil { + if f == nil { continue }