diff --git a/pkg/client/cache/reflector.go b/pkg/client/cache/reflector.go index 98b2533971..a3cf8483b9 100644 --- a/pkg/client/cache/reflector.go +++ b/pkg/client/cache/reflector.go @@ -85,13 +85,13 @@ func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyn // Run starts a watch and handles watch events. Will restart the watch if it is closed. // Run starts a goroutine and returns immediately. func (r *Reflector) Run() { - go util.Forever(func() { r.listAndWatch() }, r.period) + go util.Forever(func() { r.listAndWatch(util.NeverStop) }, r.period) } // RunUntil starts a watch and handles watch events. Will restart the watch if it is closed. // RunUntil starts a goroutine and returns immediately. It will exit when stopCh is closed. func (r *Reflector) RunUntil(stopCh <-chan struct{}) { - go util.Until(func() { r.listAndWatch() }, r.period, stopCh) + go util.Until(func() { r.listAndWatch(stopCh) }, r.period, stopCh) } var ( @@ -100,6 +100,10 @@ var ( // Used to indicate that watching stopped so that a resync could happen. errorResyncRequested = errors.New("resync channel fired") + + // Used to indicate that watching stopped because of a signal from the stop + // channel passed in from a client of the reflector. + errorStopRequested = errors.New("Stop requested") ) // resyncChan returns a channel which will receive something when a resync is required. @@ -110,9 +114,9 @@ func (r *Reflector) resyncChan() <-chan time.Time { return time.After(r.resyncPeriod) } -func (r *Reflector) listAndWatch() { +func (r *Reflector) listAndWatch(stopCh <-chan struct{}) { var resourceVersion string - exitWatch := r.resyncChan() + resyncCh := r.resyncChan() list, err := r.listerWatcher.List() if err != nil { @@ -149,9 +153,9 @@ func (r *Reflector) listAndWatch() { } return } - if err := r.watchHandler(w, &resourceVersion, exitWatch); err != nil { + if err := r.watchHandler(w, &resourceVersion, resyncCh, stopCh); err != nil { if err != errorResyncRequested { - glog.Errorf("watch of %v ended with error: %v", r.expectedType, err) + glog.Errorf("watch of %v ended with: %v", r.expectedType, err) } return } @@ -169,14 +173,20 @@ func (r *Reflector) syncWith(items []runtime.Object) error { } // watchHandler watches w and keeps *resourceVersion up to date. -func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, exitWatch <-chan time.Time) error { +func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, resyncCh <-chan time.Time, stopCh <-chan struct{}) error { start := time.Now() eventCount := 0 + + // Stopping the watcher should be idempotent and if we return from this function there's no way + // we're coming back in with the same watch interface. + defer w.Stop() + loop: for { select { - case <-exitWatch: - w.Stop() + case <-stopCh: + return errorStopRequested + case <-resyncCh: return errorResyncRequested case event, ok := <-w.ResultChan(): if !ok { diff --git a/pkg/client/cache/reflector_test.go b/pkg/client/cache/reflector_test.go index 7337a7b1f5..ac22c38ee1 100644 --- a/pkg/client/cache/reflector_test.go +++ b/pkg/client/cache/reflector_test.go @@ -24,6 +24,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) @@ -37,6 +38,60 @@ func (t *testLW) Watch(resourceVersion string) (watch.Interface, error) { return t.WatchFunc(resourceVersion) } +func TestCloseWatchChannelOnError(t *testing.T) { + r := NewReflector(&testLW{}, &api.Pod{}, NewStore(MetaNamespaceKeyFunc), 0) + pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "bar"}} + fw := watch.NewFake() + r.listerWatcher = &testLW{ + WatchFunc: func(rv string) (watch.Interface, error) { + return fw, nil + }, + ListFunc: func() (runtime.Object, error) { + return &api.PodList{ListMeta: api.ListMeta{ResourceVersion: "1"}}, nil + }, + } + go r.listAndWatch(util.NeverStop) + fw.Error(pod) + select { + case _, ok := <-fw.ResultChan(): + if ok { + t.Errorf("Watch channel left open after cancellation") + } + case <-time.After(100 * time.Millisecond): + t.Errorf("the cancellation is at least 99 milliseconds late") + break + } +} + +func TestRunUntil(t *testing.T) { + stopCh := make(chan struct{}) + store := NewStore(MetaNamespaceKeyFunc) + r := NewReflector(&testLW{}, &api.Pod{}, store, 0) + fw := watch.NewFake() + r.listerWatcher = &testLW{ + WatchFunc: func(rv string) (watch.Interface, error) { + return fw, nil + }, + ListFunc: func() (runtime.Object, error) { + return &api.PodList{ListMeta: api.ListMeta{ResourceVersion: "1"}}, nil + }, + } + r.RunUntil(stopCh) + // Synchronously add a dummy pod into the watch channel so we + // know the RunUntil go routine is in the watch handler. + fw.Add(&api.Pod{ObjectMeta: api.ObjectMeta{Name: "bar"}}) + stopCh <- struct{}{} + select { + case _, ok := <-fw.ResultChan(): + if ok { + t.Errorf("Watch channel left open after stopping the watch") + } + case <-time.After(100 * time.Millisecond): + t.Errorf("the cancellation is at least 99 milliseconds late") + break + } +} + func TestReflector_resyncChan(t *testing.T) { s := NewStore(MetaNamespaceKeyFunc) g := NewReflector(&testLW{}, &api.Pod{}, s, time.Millisecond) @@ -57,7 +112,7 @@ func TestReflector_watchHandlerError(t *testing.T) { fw.Stop() }() var resumeRV string - err := g.watchHandler(fw, &resumeRV, neverExitWatch) + err := g.watchHandler(fw, &resumeRV, neverExitWatch, util.NeverStop) if err == nil { t.Errorf("unexpected non-error") } @@ -77,7 +132,7 @@ func TestReflector_watchHandler(t *testing.T) { fw.Stop() }() var resumeRV string - err := g.watchHandler(fw, &resumeRV, neverExitWatch) + err := g.watchHandler(fw, &resumeRV, neverExitWatch, util.NeverStop) if err != nil { t.Errorf("unexpected error %v", err) } @@ -126,12 +181,25 @@ func TestReflector_watchHandlerTimeout(t *testing.T) { var resumeRV string exit := make(chan time.Time, 1) exit <- time.Now() - err := g.watchHandler(fw, &resumeRV, exit) + err := g.watchHandler(fw, &resumeRV, exit, util.NeverStop) if err != errorResyncRequested { t.Errorf("expected timeout error, but got %q", err) } } +func TestReflectorStopWatch(t *testing.T) { + s := NewStore(MetaNamespaceKeyFunc) + g := NewReflector(&testLW{}, &api.Pod{}, s, 0) + fw := watch.NewFake() + var resumeRV string + stopWatch := make(chan struct{}, 1) + stopWatch <- struct{}{} + err := g.watchHandler(fw, &resumeRV, neverExitWatch, stopWatch) + if err != errorStopRequested { + t.Errorf("expected stop error, got %q", err) + } +} + func TestReflector_listAndWatch(t *testing.T) { createdFakes := make(chan *watch.FakeWatcher) @@ -157,7 +225,7 @@ func TestReflector_listAndWatch(t *testing.T) { } s := NewFIFO(MetaNamespaceKeyFunc) r := NewReflector(lw, &api.Pod{}, s, 0) - go r.listAndWatch() + go r.listAndWatch(util.NeverStop) ids := []string{"foo", "bar", "baz", "qux", "zoo"} var fw *watch.FakeWatcher @@ -274,6 +342,6 @@ func TestReflector_listAndWatchWithErrors(t *testing.T) { }, } r := NewReflector(lw, &api.Pod{}, s, 0) - r.listAndWatch() + r.listAndWatch(util.NeverStop) } }