From 5a4a095e29627f13c480b8a1140bc0d628107aef Mon Sep 17 00:00:00 2001 From: Hongchao Deng Date: Fri, 16 Sep 2016 14:51:17 -0700 Subject: [PATCH] etcd watcher: centralize error handling --- pkg/storage/etcd3/watcher.go | 12 ++++-------- pkg/storage/etcd3/watcher_test.go | 23 +++++++++++++---------- 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/pkg/storage/etcd3/watcher.go b/pkg/storage/etcd3/watcher.go index c2880a4205..544787925b 100644 --- a/pkg/storage/etcd3/watcher.go +++ b/pkg/storage/etcd3/watcher.go @@ -31,8 +31,6 @@ import ( etcdrpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" "github.com/golang/glog" "golang.org/x/net/context" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" ) const ( @@ -109,6 +107,10 @@ func (wc *watchChan) run() { select { case err := <-wc.errChan: + if err == context.Canceled { + wc.cancel() // just in case + break + } errResult := parseError(err) if errResult != nil { // error result is guaranteed to be received by user before closing ResultChan. @@ -294,12 +296,6 @@ func parseError(err error) *watch.Event { } func (wc *watchChan) sendError(err error) { - // Context.canceled is an expected behavior. - // We should just stop all goroutines in watchChan without returning error. - // TODO: etcd client should return context.Canceled instead of grpc specific error. - if grpc.Code(err) == codes.Canceled || err == context.Canceled { - return - } select { case wc.errChan <- err: case <-wc.ctx.Done(): diff --git a/pkg/storage/etcd3/watcher_test.go b/pkg/storage/etcd3/watcher_test.go index 8781750e03..4f77e9f4b4 100644 --- a/pkg/storage/etcd3/watcher_test.go +++ b/pkg/storage/etcd3/watcher_test.go @@ -196,17 +196,20 @@ func TestWatchContextCancel(t *testing.T) { defer cluster.Terminate(t) canceledCtx, cancel := context.WithCancel(ctx) cancel() - w := store.watcher.createWatchChan(canceledCtx, "/abc", 0, false, storage.Everything) - // When we do a client.Get with a canceled context, it will return error. - // Nonetheless, when we try to send it over internal errChan, we should detect - // it's context canceled and not send it. - err := w.sync() - w.ctx = ctx - w.sendError(err) + // When we watch with a canceled context, we should detect that it's context canceled. + // We won't take it as error and also close the watcher. + w, err := store.watcher.Watch(canceledCtx, "/abc", 0, false, storage.Everything) + if err != nil { + t.Fatal(err) + } + select { - case err := <-w.errChan: - t.Errorf("cancelling context shouldn't return any error. Err: %v", err) - default: + case _, ok := <-w.ResultChan(): + if ok { + t.Error("ResultChan() should be closed") + } + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("timeout after %v", wait.ForeverTestTimeout) } }