diff --git a/pkg/client/tests/listwatch_test.go b/pkg/client/tests/listwatch_test.go index 35a8a256e4..1adaf67adb 100644 --- a/pkg/client/tests/listwatch_test.go +++ b/pkg/client/tests/listwatch_test.go @@ -17,6 +17,7 @@ limitations under the License. package tests import ( + "context" "net/http/httptest" "net/url" "testing" @@ -194,11 +195,20 @@ func (w lw) Watch(options metav1.ListOptions) (watch.Interface, error) { func TestListWatchUntil(t *testing.T) { fw := watch.NewFake() go func() { - var obj *v1.Pod + obj := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "2", + }, + } fw.Modify(obj) }() listwatch := lw{ - list: &v1.PodList{Items: []v1.Pod{{}}}, + list: &v1.PodList{ + ListMeta: metav1.ListMeta{ + ResourceVersion: "1", + }, + Items: []v1.Pod{{}}, + }, watch: fw, } @@ -213,8 +223,9 @@ func TestListWatchUntil(t *testing.T) { }, } - timeout := 10 * time.Second - lastEvent, err := watchtools.ListWatchUntil(timeout, listwatch, conditions...) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + lastEvent, err := watchtools.ListWatchUntil(ctx, listwatch, conditions...) if err != nil { t.Fatalf("expected nil error, got %#v", err) } diff --git a/staging/src/k8s.io/client-go/tools/cache/listwatch.go b/staging/src/k8s.io/client-go/tools/cache/listwatch.go index f86791650e..8227b73b69 100644 --- a/staging/src/k8s.io/client-go/tools/cache/listwatch.go +++ b/staging/src/k8s.io/client-go/tools/cache/listwatch.go @@ -27,15 +27,25 @@ import ( "k8s.io/client-go/tools/pager" ) -// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource. -type ListerWatcher interface { +// Lister is any object that knows how to perform an initial list. +type Lister interface { // List should return a list type object; the Items field will be extracted, and the // ResourceVersion field will be used to start the watch in the right place. List(options metav1.ListOptions) (runtime.Object, error) +} + +// Watcher is any object that knows how to start a watch on a resource. +type Watcher interface { // Watch should begin a watch at the specified version. Watch(options metav1.ListOptions) (watch.Interface, error) } +// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource. +type ListerWatcher interface { + Lister + Watcher +} + // ListFunc knows how to list resources type ListFunc func(options metav1.ListOptions) (runtime.Object, error) diff --git a/staging/src/k8s.io/client-go/tools/watch/BUILD b/staging/src/k8s.io/client-go/tools/watch/BUILD index 9f7a97cd4a..f31c836986 100644 --- a/staging/src/k8s.io/client-go/tools/watch/BUILD +++ b/staging/src/k8s.io/client-go/tools/watch/BUILD @@ -4,18 +4,22 @@ go_library( name = "go_default_library", srcs = [ "informerwatcher.go", + "retrywatcher.go", "until.go", ], importmap = "k8s.io/kubernetes/vendor/k8s.io/client-go/tools/watch", importpath = "k8s.io/client-go/tools/watch", visibility = ["//visibility:public"], deps = [ + "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", + "//vendor/github.com/davecgh/go-spew/spew:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], ) @@ -24,11 +28,13 @@ go_test( name = "go_default_test", srcs = [ "informerwatcher_test.go", + "retrywatcher_test.go", "until_test.go", ], embed = [":go_default_library"], deps = [ "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", @@ -39,6 +45,7 @@ go_test( "//staging/src/k8s.io/client-go/testing:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//vendor/github.com/davecgh/go-spew/spew:go_default_library", + "//vendor/k8s.io/klog:go_default_library", ], ) diff --git a/staging/src/k8s.io/client-go/tools/watch/retrywatcher.go b/staging/src/k8s.io/client-go/tools/watch/retrywatcher.go new file mode 100644 index 0000000000..62c14b0784 --- /dev/null +++ b/staging/src/k8s.io/client-go/tools/watch/retrywatcher.go @@ -0,0 +1,283 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import ( + "context" + "errors" + "fmt" + "io" + "net/http" + "time" + + "github.com/davecgh/go-spew/spew" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/net" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" + "k8s.io/klog" +) + +// resourceVersionGetter is an interface used to get resource version from events. +// We can't reuse an interface from meta otherwise it would be a cyclic dependency and we need just this one method +type resourceVersionGetter interface { + GetResourceVersion() string +} + +// RetryWatcher will make sure that in case the underlying watcher is closed (e.g. due to API timeout or etcd timeout) +// it will get restarted from the last point without the consumer even knowing about it. +// RetryWatcher does that by inspecting events and keeping track of resourceVersion. +// Especially useful when using watch.UntilWithoutRetry where premature termination is causing issues and flakes. +// Please note that this is not resilient to etcd cache not having the resource version anymore - you would need to +// use Informers for that. +type RetryWatcher struct { + lastResourceVersion string + watcherClient cache.Watcher + resultChan chan watch.Event + stopChan chan struct{} + doneChan chan struct{} + minRestartDelay time.Duration +} + +// NewRetryWatcher creates a new RetryWatcher. +// It will make sure that watches gets restarted in case of recoverable errors. +// The initialResourceVersion will be given to watch method when first called. +func NewRetryWatcher(initialResourceVersion string, watcherClient cache.Watcher) (*RetryWatcher, error) { + return newRetryWatcher(initialResourceVersion, watcherClient, 1*time.Second) +} + +func newRetryWatcher(initialResourceVersion string, watcherClient cache.Watcher, minRestartDelay time.Duration) (*RetryWatcher, error) { + switch initialResourceVersion { + case "", "0": + // TODO: revisit this if we ever get WATCH v2 where it means start "now" + // without doing the synthetic list of objects at the beginning (see #74022) + return nil, fmt.Errorf("initial RV %q is not supported due to issues with underlying WATCH", initialResourceVersion) + default: + break + } + + rw := &RetryWatcher{ + lastResourceVersion: initialResourceVersion, + watcherClient: watcherClient, + stopChan: make(chan struct{}), + doneChan: make(chan struct{}), + resultChan: make(chan watch.Event, 0), + minRestartDelay: minRestartDelay, + } + + go rw.receive() + return rw, nil +} + +func (rw *RetryWatcher) send(event watch.Event) bool { + // Writing to an unbuffered channel is blocking operation + // and we need to check if stop wasn't requested while doing so. + select { + case rw.resultChan <- event: + return true + case <-rw.stopChan: + return false + } +} + +// doReceive returns true when it is done, false otherwise. +// If it is not done the second return value holds the time to wait before calling it again. +func (rw *RetryWatcher) doReceive() (bool, time.Duration) { + watcher, err := rw.watcherClient.Watch(metav1.ListOptions{ + ResourceVersion: rw.lastResourceVersion, + }) + // We are very unlikely to hit EOF here since we are just establishing the call, + // but it may happen that the apiserver is just shutting down (e.g. being restarted) + // This is consistent with how it is handled for informers + switch err { + case nil: + break + + case io.EOF: + // watch closed normally + return false, 0 + + case io.ErrUnexpectedEOF: + klog.V(1).Infof("Watch closed with unexpected EOF: %v", err) + return false, 0 + + default: + msg := "Watch failed: %v" + if net.IsProbableEOF(err) { + klog.V(5).Infof(msg, err) + // Retry + return false, 0 + } + + klog.Errorf(msg, err) + // Retry + return false, 0 + } + + if watcher == nil { + klog.Error("Watch returned nil watcher") + // Retry + return false, 0 + } + + ch := watcher.ResultChan() + defer watcher.Stop() + + for { + select { + case <-rw.stopChan: + klog.V(4).Info("Stopping RetryWatcher.") + return true, 0 + case event, ok := <-ch: + if !ok { + klog.V(4).Infof("Failed to get event! Re-creating the watcher. Last RV: %s", rw.lastResourceVersion) + return false, 0 + } + + // We need to inspect the event and get ResourceVersion out of it + switch event.Type { + case watch.Added, watch.Modified, watch.Deleted: + metaObject, ok := event.Object.(resourceVersionGetter) + if !ok { + _ = rw.send(watch.Event{ + Type: watch.Error, + Object: &apierrors.NewInternalError(errors.New("retryWatcher: doesn't support resourceVersion")).ErrStatus, + }) + // We have to abort here because this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data! + return true, 0 + } + + resourceVersion := metaObject.GetResourceVersion() + if resourceVersion == "" { + _ = rw.send(watch.Event{ + Type: watch.Error, + Object: &apierrors.NewInternalError(fmt.Errorf("retryWatcher: object %#v doesn't support resourceVersion", event.Object)).ErrStatus, + }) + // We have to abort here because this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data! + return true, 0 + } + + // All is fine; send the event and update lastResourceVersion + ok = rw.send(event) + if !ok { + return true, 0 + } + rw.lastResourceVersion = resourceVersion + + continue + + case watch.Error: + status, ok := event.Object.(*metav1.Status) + if !ok { + klog.Error(spew.Sprintf("Received an error which is not *metav1.Status but %#+v", event.Object)) + // Retry unknown errors + return false, 0 + } + + statusDelay := time.Duration(0) + if status.Details != nil { + statusDelay = time.Duration(status.Details.RetryAfterSeconds) * time.Second + } + + switch status.Code { + case http.StatusGone: + // Never retry RV too old errors + _ = rw.send(event) + return true, 0 + + case http.StatusGatewayTimeout, http.StatusInternalServerError: + // Retry + return false, statusDelay + + default: + // We retry by default. RetryWatcher is meant to proceed unless it is certain + // that it can't. If we are not certain, we proceed with retry and leave it + // up to the user to timeout if needed. + + // Log here so we have a record of hitting the unexpected error + // and we can whitelist some error codes if we missed any that are expected. + klog.V(5).Info(spew.Sprintf("Retrying after unexpected error: %#+v", event.Object)) + + // Retry + return false, statusDelay + } + + default: + klog.Errorf("Failed to recognize Event type %q", event.Type) + _ = rw.send(watch.Event{ + Type: watch.Error, + Object: &apierrors.NewInternalError(fmt.Errorf("retryWatcher failed to recognize Event type %q", event.Type)).ErrStatus, + }) + // We are unable to restart the watch and have to stop the loop or this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data! + return true, 0 + } + } + } +} + +// receive reads the result from a watcher, restarting it if necessary. +func (rw *RetryWatcher) receive() { + defer close(rw.doneChan) + defer close(rw.resultChan) + + klog.V(4).Info("Starting RetryWatcher.") + defer klog.V(4).Info("Stopping RetryWatcher.") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + select { + case <-rw.stopChan: + cancel() + return + case <-ctx.Done(): + return + } + }() + + // We use non sliding until so we don't introduce delays on happy path when WATCH call + // timeouts or gets closed and we need to reestablish it while also avoiding hot loops. + wait.NonSlidingUntilWithContext(ctx, func(ctx context.Context) { + done, retryAfter := rw.doReceive() + if done { + cancel() + return + } + + time.Sleep(retryAfter) + + klog.V(4).Infof("Restarting RetryWatcher at RV=%q", rw.lastResourceVersion) + }, rw.minRestartDelay) +} + +// ResultChan implements Interface. +func (rw *RetryWatcher) ResultChan() <-chan watch.Event { + return rw.resultChan +} + +// Stop implements Interface. +func (rw *RetryWatcher) Stop() { + close(rw.stopChan) +} + +// Done allows the caller to be notified when Retry watcher stops. +func (rw *RetryWatcher) Done() <-chan struct{} { + return rw.doneChan +} diff --git a/staging/src/k8s.io/client-go/tools/watch/retrywatcher_test.go b/staging/src/k8s.io/client-go/tools/watch/retrywatcher_test.go new file mode 100644 index 0000000000..cd57e51c52 --- /dev/null +++ b/staging/src/k8s.io/client-go/tools/watch/retrywatcher_test.go @@ -0,0 +1,593 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import ( + "errors" + "flag" + "fmt" + "reflect" + "strconv" + "sync/atomic" + "testing" + "time" + + "github.com/davecgh/go-spew/spew" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/diff" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" + "k8s.io/klog" +) + +func init() { + // Enable klog which is used in dependencies + klog.InitFlags(nil) + flag.Set("logtostderr", "true") + flag.Set("v", "9") +} + +type testObject struct { + resourceVersion string +} + +func (o testObject) GetObjectKind() schema.ObjectKind { return schema.EmptyObjectKind } +func (o testObject) DeepCopyObject() runtime.Object { return o } +func (o testObject) GetResourceVersion() string { return o.resourceVersion } + +func withCounter(w cache.Watcher) (*uint32, cache.Watcher) { + var counter uint32 + return &counter, &cache.ListWatch{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + atomic.AddUint32(&counter, 1) + return w.Watch(options) + }, + } +} + +func makeTestEvent(rv int) watch.Event { + return watch.Event{ + Type: watch.Added, + Object: testObject{ + resourceVersion: fmt.Sprintf("%d", rv), + }, + } +} + +func arrayToChannel(array []watch.Event) chan watch.Event { + ch := make(chan watch.Event, len(array)) + + for _, event := range array { + ch <- event + } + + return ch +} + +// parseResourceVersionOrDie is test-only that code simulating the server and thus can interpret resourceVersion +func parseResourceVersionOrDie(resourceVersion string) uint64 { + // We can't use etcdstorage.Versioner.ParseResourceVersion() because of imports restrictions + + if resourceVersion == "" { + return 0 + } + version, err := strconv.ParseUint(resourceVersion, 10, 64) + if err != nil { + panic(fmt.Errorf("failed to parse resourceVersion %q", resourceVersion)) + } + return version +} + +func fromRV(resourceVersion string, array []watch.Event) []watch.Event { + var result []watch.Event + rv := parseResourceVersionOrDie(resourceVersion) + for _, event := range array { + if event.Type == watch.Error { + if len(result) == 0 { + // Skip error events until we find an object matching RV requirement + continue + } + } else { + rvGetter, ok := event.Object.(resourceVersionGetter) + if ok { + if parseResourceVersionOrDie(rvGetter.GetResourceVersion()) <= rv { + continue + } + } + } + + result = append(result, event) + } + + return result +} + +func closeAfterN(n int, source chan watch.Event) chan watch.Event { + result := make(chan watch.Event, 0) + go func() { + defer close(result) + defer close(source) + for i := 0; i < n; i++ { + result <- <-source + } + }() + return result +} + +type unexpectedError struct { + // Inheriting any struct fulfilling runtime.Object interface would do. + metav1.Status +} + +var _ runtime.Object = &unexpectedError{} + +func TestNewRetryWatcher(t *testing.T) { + tt := []struct { + name string + initialRV string + err error + }{ + { + name: "empty RV should fail", + initialRV: "", + err: errors.New("initial RV \"\" is not supported due to issues with underlying WATCH"), + }, + { + name: "RV \"0\" should fail", + initialRV: "0", + err: errors.New("initial RV \"0\" is not supported due to issues with underlying WATCH"), + }, + } + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + _, err := NewRetryWatcher(tc.initialRV, nil) + if !reflect.DeepEqual(err, tc.err) { + t.Errorf("Expected error: %v, got: %v", tc.err, err) + } + }) + } +} + +func TestRetryWatcher(t *testing.T) { + tt := []struct { + name string + initialRV string + watchClient cache.Watcher + watchCount uint32 + expected []watch.Event + }{ + { + name: "recovers if watchClient returns error", + initialRV: "1", + watchClient: &cache.ListWatch{ + WatchFunc: func() func(options metav1.ListOptions) (watch.Interface, error) { + firstRun := true + return func(options metav1.ListOptions) (watch.Interface, error) { + if firstRun { + firstRun = false + return nil, fmt.Errorf("test error") + } + + return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{ + makeTestEvent(2), + }))), nil + } + }(), + }, + watchCount: 2, + expected: []watch.Event{ + makeTestEvent(2), + }, + }, + { + name: "recovers if watchClient returns nil watcher", + initialRV: "1", + watchClient: &cache.ListWatch{ + WatchFunc: func() func(options metav1.ListOptions) (watch.Interface, error) { + firstRun := true + return func(options metav1.ListOptions) (watch.Interface, error) { + if firstRun { + firstRun = false + return nil, nil + } + + return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{ + makeTestEvent(2), + }))), nil + } + }(), + }, + watchCount: 2, + expected: []watch.Event{ + makeTestEvent(2), + }, + }, + { + name: "works with empty initialRV", + initialRV: "1", + watchClient: &cache.ListWatch{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{ + makeTestEvent(2), + }))), nil + }, + }, + watchCount: 1, + expected: []watch.Event{ + makeTestEvent(2), + }, + }, + { + name: "works with initialRV set, skipping the preceding items but reading those directly following", + initialRV: "1", + watchClient: &cache.ListWatch{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{ + makeTestEvent(1), + makeTestEvent(2), + }))), nil + }, + }, + watchCount: 1, + expected: []watch.Event{ + makeTestEvent(2), + }, + }, + { + name: "works with initialRV set, skipping the preceding items with none following", + initialRV: "3", + watchClient: &cache.ListWatch{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{ + makeTestEvent(2), + }))), nil + }, + }, + watchCount: 1, + expected: nil, + }, + { + name: "fails on Gone (RV too old error)", + initialRV: "5", + watchClient: &cache.ListWatch{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{ + makeTestEvent(5), + makeTestEvent(6), + {Type: watch.Error, Object: &apierrors.NewGone("").ErrStatus}, + makeTestEvent(7), + makeTestEvent(8), + }))), nil + }, + }, + watchCount: 1, + expected: []watch.Event{ + makeTestEvent(6), + { + Type: watch.Error, + Object: &apierrors.NewGone("").ErrStatus, + }, + }, + }, + { + name: "recovers from timeout error", + initialRV: "5", + watchClient: &cache.ListWatch{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{ + makeTestEvent(6), + { + Type: watch.Error, + Object: &apierrors.NewTimeoutError("", 0).ErrStatus, + }, + makeTestEvent(7), + }))), nil + }, + }, + watchCount: 2, + expected: []watch.Event{ + makeTestEvent(6), + makeTestEvent(7), + }, + }, + { + name: "recovers from internal server error", + initialRV: "5", + watchClient: &cache.ListWatch{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{ + makeTestEvent(6), + { + Type: watch.Error, + Object: &apierrors.NewInternalError(errors.New("")).ErrStatus, + }, + makeTestEvent(7), + }))), nil + }, + }, + watchCount: 2, + expected: []watch.Event{ + makeTestEvent(6), + makeTestEvent(7), + }, + }, + { + name: "recovers from unexpected error code", + initialRV: "5", + watchClient: &cache.ListWatch{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{ + makeTestEvent(6), + { + Type: watch.Error, + Object: &metav1.Status{ + Code: 666, + }, + }, + makeTestEvent(7), + }))), nil + }, + }, + watchCount: 2, + expected: []watch.Event{ + makeTestEvent(6), + makeTestEvent(7), + }, + }, + { + name: "recovers from unexpected error type", + initialRV: "5", + watchClient: &cache.ListWatch{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{ + makeTestEvent(6), + { + Type: watch.Error, + Object: &unexpectedError{}, + }, + makeTestEvent(7), + }))), nil + }, + }, + watchCount: 2, + expected: []watch.Event{ + makeTestEvent(6), + makeTestEvent(7), + }, + }, + { + name: "survives 1 closed watch and reads 1 item", + initialRV: "5", + watchClient: &cache.ListWatch{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return watch.NewProxyWatcher(closeAfterN(1, arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{ + makeTestEvent(6), + })))), nil + }, + }, + watchCount: 2, + expected: []watch.Event{ + makeTestEvent(6), + }, + }, + { + name: "survives 2 closed watches and reads 2 items", + initialRV: "4", + watchClient: &cache.ListWatch{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return watch.NewProxyWatcher(closeAfterN(1, arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{ + makeTestEvent(5), + makeTestEvent(6), + })))), nil + }, + }, + watchCount: 3, + expected: []watch.Event{ + makeTestEvent(5), + makeTestEvent(6), + }, + }, + { + name: "survives 2 closed watches and reads 2 items for nonconsecutive RVs", + initialRV: "4", + watchClient: &cache.ListWatch{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return watch.NewProxyWatcher(closeAfterN(1, arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{ + makeTestEvent(5), + makeTestEvent(7), + })))), nil + }, + }, + watchCount: 3, + expected: []watch.Event{ + makeTestEvent(5), + makeTestEvent(7), + }, + }, + { + name: "survives 2 closed watches and reads 2 items for nonconsecutive RVs starting at much lower RV", + initialRV: "2", + watchClient: &cache.ListWatch{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return watch.NewProxyWatcher(closeAfterN(1, arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{ + makeTestEvent(5), + makeTestEvent(7), + })))), nil + }, + }, + watchCount: 3, + expected: []watch.Event{ + makeTestEvent(5), + makeTestEvent(7), + }, + }, + { + name: "survives 4 closed watches and reads 4 items for nonconsecutive, spread RVs", + initialRV: "2", + watchClient: &cache.ListWatch{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return watch.NewProxyWatcher(closeAfterN(1, arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{ + makeTestEvent(5), + makeTestEvent(6), + makeTestEvent(7), + makeTestEvent(11), + })))), nil + }, + }, + watchCount: 5, + expected: []watch.Event{ + makeTestEvent(5), + makeTestEvent(6), + makeTestEvent(7), + makeTestEvent(11), + }, + }, + { + name: "survives 4 closed watches and reads 4 items for nonconsecutive, spread RVs and skips those with lower or equal RV", + initialRV: "2", + watchClient: &cache.ListWatch{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return watch.NewProxyWatcher(closeAfterN(1, arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{ + makeTestEvent(1), + makeTestEvent(2), + makeTestEvent(5), + makeTestEvent(6), + makeTestEvent(7), + makeTestEvent(11), + })))), nil + }, + }, + watchCount: 5, + expected: []watch.Event{ + makeTestEvent(5), + makeTestEvent(6), + makeTestEvent(7), + makeTestEvent(11), + }, + }, + { + name: "survives 2 closed watches and reads 2+2+1 items skipping those with equal RV", + initialRV: "1", + watchClient: &cache.ListWatch{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return watch.NewProxyWatcher(closeAfterN(2, arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{ + makeTestEvent(1), + makeTestEvent(2), + makeTestEvent(5), + makeTestEvent(6), + makeTestEvent(7), + makeTestEvent(11), + })))), nil + }, + }, + watchCount: 3, + expected: []watch.Event{ + makeTestEvent(2), + makeTestEvent(5), + makeTestEvent(6), + makeTestEvent(7), + makeTestEvent(11), + }, + }, + } + + for _, tc := range tt { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + atomicCounter, watchFunc := withCounter(tc.watchClient) + watcher, err := newRetryWatcher(tc.initialRV, watchFunc, time.Duration(0)) + if err != nil { + t.Fatalf("failed to create a RetryWatcher: %v", err) + } + defer func() { + watcher.Stop() + t.Log("Waiting on RetryWatcher to stop...") + <-watcher.Done() + }() + + var got []watch.Event + for i := 0; i < len(tc.expected); i++ { + event, ok := <-watcher.ResultChan() + if !ok { + t.Error(spew.Errorf("expected event %#+v, but channel is closed"), tc.expected[i]) + break + } + + got = append(got, event) + } + + // (Sanity check, best effort) Make sure there are no more events to be received + // RetryWatcher proxies the source channel so we can't try reading it immediately + // but have to tolerate some delay. Given this is best effort detection we can use short duration. + // It also makes sure that for 0 events the watchFunc has time to be called. + select { + case event, ok := <-watcher.ResultChan(): + if ok { + t.Error(spew.Errorf("Unexpected event received after reading all the expected ones: %#+v", event)) + } + case <-time.After(10 * time.Millisecond): + break + } + + counter := atomic.LoadUint32(atomicCounter) + if counter != tc.watchCount { + t.Errorf("expected %d watcher starts, but it has started %d times", tc.watchCount, counter) + } + + if !reflect.DeepEqual(tc.expected, got) { + t.Fatal(spew.Errorf("expected %#+v, got %#+v;\ndiff: %s", tc.expected, got, diff.ObjectReflectDiff(tc.expected, got))) + } + }) + } +} + +func TestRetryWatcherToFinishWithUnreadEvents(t *testing.T) { + watcher, err := NewRetryWatcher("1", &cache.ListWatch{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return watch.NewProxyWatcher(arrayToChannel([]watch.Event{ + makeTestEvent(2), + })), nil + }, + }) + if err != nil { + t.Fatalf("failed to create a RetryWatcher: %v", err) + } + + // Give the watcher a chance to get to sending events (blocking) + time.Sleep(10 * time.Millisecond) + + watcher.Stop() + + select { + case <-watcher.Done(): + break + case <-time.After(10 * time.Millisecond): + t.Error("Failed to close the watcher") + } + + // RetryWatcher result channel should be closed + _, ok := <-watcher.ResultChan() + if ok { + t.Error("ResultChan is not closed") + } +} diff --git a/staging/src/k8s.io/client-go/tools/watch/until.go b/staging/src/k8s.io/client-go/tools/watch/until.go index 541343711d..e12d82aca4 100644 --- a/staging/src/k8s.io/client-go/tools/watch/until.go +++ b/staging/src/k8s.io/client-go/tools/watch/until.go @@ -95,6 +95,25 @@ func UntilWithoutRetry(ctx context.Context, watcher watch.Interface, conditions return lastEvent, nil } +// Until wraps the watcherClient's watch function with RetryWatcher making sure that watcher gets restarted in case of errors. +// The initialResourceVersion will be given to watch method when first called. It shall not be "" or "0" +// given the underlying WATCH call issues (#74022). If you want the initial list ("", "0") done for you use ListWatchUntil instead. +// Remaining behaviour is identical to function UntilWithoutRetry. (See above.) +// Until can deal with API timeouts and lost connections. +// It guarantees you to see all events and in the order they happened. +// Due to this guarantee there is no way it can deal with 'Resource version too old error'. It will fail in this case. +// (See `UntilWithSync` if you'd prefer to recover from all the errors including RV too old by re-listing +// those items. In normal code you should care about being level driven so you'd not care about not seeing all the edges.) +// The most frequent usage for Until would be a test where you want to verify exact order of events ("edges"). +func Until(ctx context.Context, initialResourceVersion string, watcherClient cache.Watcher, conditions ...ConditionFunc) (*watch.Event, error) { + w, err := NewRetryWatcher(initialResourceVersion, watcherClient) + if err != nil { + return nil, err + } + + return UntilWithoutRetry(ctx, w, conditions...) +} + // UntilWithSync creates an informer from lw, optionally checks precondition when the store is synced, // and watches the output until each provided condition succeeds, in a way that is identical // to function UntilWithoutRetry. (See above.) @@ -149,13 +168,14 @@ func ContextWithOptionalTimeout(parent context.Context, timeout time.Duration) ( return context.WithTimeout(parent, timeout) } -// ListWatchUntil checks the provided conditions against the items returned by the list watcher, returning wait.ErrWaitTimeout -// if timeout is exceeded without all conditions returning true, or an error if an error occurs. -// TODO: check for watch expired error and retry watch from latest point? Same issue exists for Until. -// TODO: remove when no longer used -// -// Deprecated: Use UntilWithSync instead. -func ListWatchUntil(timeout time.Duration, lw cache.ListerWatcher, conditions ...ConditionFunc) (*watch.Event, error) { +// ListWatchUntil first lists objects, converts them into synthetic ADDED events +// and checks conditions for those synthetic events. If the conditions have not been reached so far +// it continues by calling Until which establishes a watch from resourceVersion of the list call +// to evaluate those conditions based on new events. +// ListWatchUntil provides the same guarantees as Until and replaces the old WATCH from RV "" (or "0") +// which was mixing list and watch calls internally and having severe design issues. (see #74022) +// There is no resourceVersion order guarantee for the initial list and those synthetic events. +func ListWatchUntil(ctx context.Context, lw cache.ListerWatcher, conditions ...ConditionFunc) (*watch.Event, error) { if len(conditions) == 0 { return nil, nil } @@ -212,17 +232,5 @@ func ListWatchUntil(timeout time.Duration, lw cache.ListerWatcher, conditions .. } currResourceVersion := metaObj.GetResourceVersion() - watchInterface, err := lw.Watch(metav1.ListOptions{ResourceVersion: currResourceVersion}) - if err != nil { - return nil, err - } - - ctx, cancel := ContextWithOptionalTimeout(context.Background(), timeout) - defer cancel() - evt, err := UntilWithoutRetry(ctx, watchInterface, remainingConditions...) - if err == ErrWatchClosed { - // present a consistent error interface to callers - err = wait.ErrWaitTimeout - } - return evt, err + return Until(ctx, currResourceVersion, lw, remainingConditions...) } diff --git a/test/integration/apimachinery/watch_restart_test.go b/test/integration/apimachinery/watch_restart_test.go index 1bd288788a..2d723c9611 100644 --- a/test/integration/apimachinery/watch_restart_test.go +++ b/test/integration/apimachinery/watch_restart_test.go @@ -172,6 +172,21 @@ func TestWatchRestartsIfTimeoutNotReached(t *testing.T) { }, // regular watcher; unfortunately destined to fail normalizeOutputFunc: noopNormalization, }, + { + name: "RetryWatcher survives closed watches", + succeed: true, + secret: newTestSecret("secret-02"), + getWatcher: func(c *kubernetes.Clientset, secret *v1.Secret) (watch.Interface, error, func()) { + lw := &cache.ListWatch{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return getWatchFunc(c, secret)(options) + }, + } + w, err := watchtools.NewRetryWatcher(secret.ResourceVersion, lw) + return w, err, func() { <-w.Done() } + }, + normalizeOutputFunc: noopNormalization, + }, { name: "InformerWatcher survives closed watches", succeed: true,