Add Until based on RetryWatcher

pull/564/head
Tomas Nozicka 2019-01-03 13:45:46 +01:00
parent 836db5c90e
commit 09af8485f2
5 changed files with 922 additions and 2 deletions

View File

@ -27,15 +27,25 @@ import (
"k8s.io/client-go/tools/pager"
)
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
type ListerWatcher interface {
// Lister is any object that knows how to perform an initial list.
type Lister interface {
// List should return a list type object; the Items field will be extracted, and the
// ResourceVersion field will be used to start the watch in the right place.
List(options metav1.ListOptions) (runtime.Object, error)
}
// Watcher is any object that knows how to start a watch on a resource.
type Watcher interface {
// Watch should begin a watch at the specified version.
Watch(options metav1.ListOptions) (watch.Interface, error)
}
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
type ListerWatcher interface {
Lister
Watcher
}
// ListFunc knows how to list resources
type ListFunc func(options metav1.ListOptions) (runtime.Object, error)

View File

@ -0,0 +1,283 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package watch
import (
"context"
"errors"
"fmt"
"io"
"net/http"
"time"
"github.com/davecgh/go-spew/spew"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
)
// resourceVersionGetter is an interface used to get resource version from events.
// We can't reuse an interface from meta otherwise it would be a cyclic dependency and we need just this one method
type resourceVersionGetter interface {
GetResourceVersion() string
}
// RetryWatcher will make sure that in case the underlying watcher is closed (e.g. due to API timeout or etcd timeout)
// it will get restarted from the last point without the consumer even knowing about it.
// RetryWatcher does that by inspecting events and keeping track of resourceVersion.
// Especially useful when using watch.UntilWithoutRetry where premature termination is causing issues and flakes.
// Please note that this is not resilient to etcd cache not having the resource version anymore - you would need to
// use Informers for that.
type RetryWatcher struct {
lastResourceVersion string
watcherClient cache.Watcher
resultChan chan watch.Event
stopChan chan struct{}
doneChan chan struct{}
minRestartDelay time.Duration
}
// NewRetryWatcher creates a new RetryWatcher.
// It will make sure that watches gets restarted in case of recoverable errors.
// The initialResourceVersion will be given to watch method when first called.
func NewRetryWatcher(initialResourceVersion string, watcherClient cache.Watcher) (*RetryWatcher, error) {
return newRetryWatcher(initialResourceVersion, watcherClient, 1*time.Second)
}
func newRetryWatcher(initialResourceVersion string, watcherClient cache.Watcher, minRestartDelay time.Duration) (*RetryWatcher, error) {
switch initialResourceVersion {
case "", "0":
// TODO: revisit this if we ever get WATCH v2 where it means start "now"
// without doing the synthetic list of objects at the beginning (see #74022)
return nil, fmt.Errorf("initial RV %q is not supported due to issues with underlying WATCH", initialResourceVersion)
default:
break
}
rw := &RetryWatcher{
lastResourceVersion: initialResourceVersion,
watcherClient: watcherClient,
stopChan: make(chan struct{}),
doneChan: make(chan struct{}),
resultChan: make(chan watch.Event, 0),
minRestartDelay: minRestartDelay,
}
go rw.receive()
return rw, nil
}
func (rw *RetryWatcher) send(event watch.Event) bool {
// Writing to an unbuffered channel is blocking operation
// and we need to check if stop wasn't requested while doing so.
select {
case rw.resultChan <- event:
return true
case <-rw.stopChan:
return false
}
}
// doReceive returns true when it is done, false otherwise.
// If it is not done the second return value holds the time to wait before calling it again.
func (rw *RetryWatcher) doReceive() (bool, time.Duration) {
watcher, err := rw.watcherClient.Watch(metav1.ListOptions{
ResourceVersion: rw.lastResourceVersion,
})
// We are very unlikely to hit EOF here since we are just establishing the call,
// but it may happen that the apiserver is just shutting down (e.g. being restarted)
// This is consistent with how it is handled for informers
switch err {
case nil:
break
case io.EOF:
// watch closed normally
return false, 0
case io.ErrUnexpectedEOF:
klog.V(1).Infof("Watch closed with unexpected EOF: %v", err)
return false, 0
default:
msg := "Watch failed: %v"
if net.IsProbableEOF(err) {
klog.V(5).Infof(msg, err)
// Retry
return false, 0
}
klog.Errorf(msg, err)
// Retry
return false, 0
}
if watcher == nil {
klog.Error("Watch returned nil watcher")
// Retry
return false, 0
}
ch := watcher.ResultChan()
defer watcher.Stop()
for {
select {
case <-rw.stopChan:
klog.V(4).Info("Stopping RetryWatcher.")
return true, 0
case event, ok := <-ch:
if !ok {
klog.V(4).Infof("Failed to get event! Re-creating the watcher. Last RV: %s", rw.lastResourceVersion)
return false, 0
}
// We need to inspect the event and get ResourceVersion out of it
switch event.Type {
case watch.Added, watch.Modified, watch.Deleted:
metaObject, ok := event.Object.(resourceVersionGetter)
if !ok {
_ = rw.send(watch.Event{
Type: watch.Error,
Object: &apierrors.NewInternalError(errors.New("retryWatcher: doesn't support resourceVersion")).ErrStatus,
})
// We have to abort here because this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data!
return true, 0
}
resourceVersion := metaObject.GetResourceVersion()
if resourceVersion == "" {
_ = rw.send(watch.Event{
Type: watch.Error,
Object: &apierrors.NewInternalError(fmt.Errorf("retryWatcher: object %#v doesn't support resourceVersion", event.Object)).ErrStatus,
})
// We have to abort here because this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data!
return true, 0
}
// All is fine; send the event and update lastResourceVersion
ok = rw.send(event)
if !ok {
return true, 0
}
rw.lastResourceVersion = resourceVersion
continue
case watch.Error:
status, ok := event.Object.(*metav1.Status)
if !ok {
klog.Error(spew.Sprintf("Received an error which is not *metav1.Status but %#+v", event.Object))
// Retry unknown errors
return false, 0
}
statusDelay := time.Duration(0)
if status.Details != nil {
statusDelay = time.Duration(status.Details.RetryAfterSeconds) * time.Second
}
switch status.Code {
case http.StatusGone:
// Never retry RV too old errors
_ = rw.send(event)
return true, 0
case http.StatusGatewayTimeout, http.StatusInternalServerError:
// Retry
return false, statusDelay
default:
// We retry by default. RetryWatcher is meant to proceed unless it is certain
// that it can't. If we are not certain, we proceed with retry and leave it
// up to the user to timeout if needed.
// Log here so we have a record of hitting the unexpected error
// and we can whitelist some error codes if we missed any that are expected.
klog.V(5).Info(spew.Sprintf("Retrying after unexpected error: %#+v", event.Object))
// Retry
return false, statusDelay
}
default:
klog.Errorf("Failed to recognize Event type %q", event.Type)
_ = rw.send(watch.Event{
Type: watch.Error,
Object: &apierrors.NewInternalError(fmt.Errorf("retryWatcher failed to recognize Event type %q", event.Type)).ErrStatus,
})
// We are unable to restart the watch and have to stop the loop or this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data!
return true, 0
}
}
}
}
// receive reads the result from a watcher, restarting it if necessary.
func (rw *RetryWatcher) receive() {
defer close(rw.doneChan)
defer close(rw.resultChan)
klog.V(4).Info("Starting RetryWatcher.")
defer klog.V(4).Info("Stopping RetryWatcher.")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
select {
case <-rw.stopChan:
cancel()
return
case <-ctx.Done():
return
}
}()
// We use non sliding until so we don't introduce delays on happy path when WATCH call
// timeouts or gets closed and we need to reestablish it while also avoiding hot loops.
wait.NonSlidingUntilWithContext(ctx, func(ctx context.Context) {
done, retryAfter := rw.doReceive()
if done {
cancel()
return
}
time.Sleep(retryAfter)
klog.V(4).Infof("Restarting RetryWatcher at RV=%q", rw.lastResourceVersion)
}, rw.minRestartDelay)
}
// ResultChan implements Interface.
func (rw *RetryWatcher) ResultChan() <-chan watch.Event {
return rw.resultChan
}
// Stop implements Interface.
func (rw *RetryWatcher) Stop() {
close(rw.stopChan)
}
// Done allows the caller to be notified when Retry watcher stops.
func (rw *RetryWatcher) Done() <-chan struct{} {
return rw.doneChan
}

View File

@ -0,0 +1,593 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package watch
import (
"errors"
"flag"
"fmt"
"reflect"
"strconv"
"sync/atomic"
"testing"
"time"
"github.com/davecgh/go-spew/spew"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
)
func init() {
// Enable klog which is used in dependencies
klog.InitFlags(nil)
flag.Set("logtostderr", "true")
flag.Set("v", "9")
}
type testObject struct {
resourceVersion string
}
func (o testObject) GetObjectKind() schema.ObjectKind { return schema.EmptyObjectKind }
func (o testObject) DeepCopyObject() runtime.Object { return o }
func (o testObject) GetResourceVersion() string { return o.resourceVersion }
func withCounter(w cache.Watcher) (*uint32, cache.Watcher) {
var counter uint32
return &counter, &cache.ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
atomic.AddUint32(&counter, 1)
return w.Watch(options)
},
}
}
func makeTestEvent(rv int) watch.Event {
return watch.Event{
Type: watch.Added,
Object: testObject{
resourceVersion: fmt.Sprintf("%d", rv),
},
}
}
func arrayToChannel(array []watch.Event) chan watch.Event {
ch := make(chan watch.Event, len(array))
for _, event := range array {
ch <- event
}
return ch
}
// parseResourceVersionOrDie is test-only that code simulating the server and thus can interpret resourceVersion
func parseResourceVersionOrDie(resourceVersion string) uint64 {
// We can't use etcdstorage.Versioner.ParseResourceVersion() because of imports restrictions
if resourceVersion == "" {
return 0
}
version, err := strconv.ParseUint(resourceVersion, 10, 64)
if err != nil {
panic(fmt.Errorf("failed to parse resourceVersion %q", resourceVersion))
}
return version
}
func fromRV(resourceVersion string, array []watch.Event) []watch.Event {
var result []watch.Event
rv := parseResourceVersionOrDie(resourceVersion)
for _, event := range array {
if event.Type == watch.Error {
if len(result) == 0 {
// Skip error events until we find an object matching RV requirement
continue
}
} else {
rvGetter, ok := event.Object.(resourceVersionGetter)
if ok {
if parseResourceVersionOrDie(rvGetter.GetResourceVersion()) <= rv {
continue
}
}
}
result = append(result, event)
}
return result
}
func closeAfterN(n int, source chan watch.Event) chan watch.Event {
result := make(chan watch.Event, 0)
go func() {
defer close(result)
defer close(source)
for i := 0; i < n; i++ {
result <- <-source
}
}()
return result
}
type unexpectedError struct {
// Inheriting any struct fulfilling runtime.Object interface would do.
metav1.Status
}
var _ runtime.Object = &unexpectedError{}
func TestNewRetryWatcher(t *testing.T) {
tt := []struct {
name string
initialRV string
err error
}{
{
name: "empty RV should fail",
initialRV: "",
err: errors.New("initial RV \"\" is not supported due to issues with underlying WATCH"),
},
{
name: "RV \"0\" should fail",
initialRV: "0",
err: errors.New("initial RV \"0\" is not supported due to issues with underlying WATCH"),
},
}
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
_, err := NewRetryWatcher(tc.initialRV, nil)
if !reflect.DeepEqual(err, tc.err) {
t.Errorf("Expected error: %v, got: %v", tc.err, err)
}
})
}
}
func TestRetryWatcher(t *testing.T) {
tt := []struct {
name string
initialRV string
watchClient cache.Watcher
watchCount uint32
expected []watch.Event
}{
{
name: "recovers if watchClient returns error",
initialRV: "1",
watchClient: &cache.ListWatch{
WatchFunc: func() func(options metav1.ListOptions) (watch.Interface, error) {
firstRun := true
return func(options metav1.ListOptions) (watch.Interface, error) {
if firstRun {
firstRun = false
return nil, fmt.Errorf("test error")
}
return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
makeTestEvent(2),
}))), nil
}
}(),
},
watchCount: 2,
expected: []watch.Event{
makeTestEvent(2),
},
},
{
name: "recovers if watchClient returns nil watcher",
initialRV: "1",
watchClient: &cache.ListWatch{
WatchFunc: func() func(options metav1.ListOptions) (watch.Interface, error) {
firstRun := true
return func(options metav1.ListOptions) (watch.Interface, error) {
if firstRun {
firstRun = false
return nil, nil
}
return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
makeTestEvent(2),
}))), nil
}
}(),
},
watchCount: 2,
expected: []watch.Event{
makeTestEvent(2),
},
},
{
name: "works with empty initialRV",
initialRV: "1",
watchClient: &cache.ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
makeTestEvent(2),
}))), nil
},
},
watchCount: 1,
expected: []watch.Event{
makeTestEvent(2),
},
},
{
name: "works with initialRV set, skipping the preceding items but reading those directly following",
initialRV: "1",
watchClient: &cache.ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
makeTestEvent(1),
makeTestEvent(2),
}))), nil
},
},
watchCount: 1,
expected: []watch.Event{
makeTestEvent(2),
},
},
{
name: "works with initialRV set, skipping the preceding items with none following",
initialRV: "3",
watchClient: &cache.ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
makeTestEvent(2),
}))), nil
},
},
watchCount: 1,
expected: nil,
},
{
name: "fails on Gone (RV too old error)",
initialRV: "5",
watchClient: &cache.ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
makeTestEvent(5),
makeTestEvent(6),
{Type: watch.Error, Object: &apierrors.NewGone("").ErrStatus},
makeTestEvent(7),
makeTestEvent(8),
}))), nil
},
},
watchCount: 1,
expected: []watch.Event{
makeTestEvent(6),
{
Type: watch.Error,
Object: &apierrors.NewGone("").ErrStatus,
},
},
},
{
name: "recovers from timeout error",
initialRV: "5",
watchClient: &cache.ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
makeTestEvent(6),
{
Type: watch.Error,
Object: &apierrors.NewTimeoutError("", 0).ErrStatus,
},
makeTestEvent(7),
}))), nil
},
},
watchCount: 2,
expected: []watch.Event{
makeTestEvent(6),
makeTestEvent(7),
},
},
{
name: "recovers from internal server error",
initialRV: "5",
watchClient: &cache.ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
makeTestEvent(6),
{
Type: watch.Error,
Object: &apierrors.NewInternalError(errors.New("")).ErrStatus,
},
makeTestEvent(7),
}))), nil
},
},
watchCount: 2,
expected: []watch.Event{
makeTestEvent(6),
makeTestEvent(7),
},
},
{
name: "recovers from unexpected error code",
initialRV: "5",
watchClient: &cache.ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
makeTestEvent(6),
{
Type: watch.Error,
Object: &metav1.Status{
Code: 666,
},
},
makeTestEvent(7),
}))), nil
},
},
watchCount: 2,
expected: []watch.Event{
makeTestEvent(6),
makeTestEvent(7),
},
},
{
name: "recovers from unexpected error type",
initialRV: "5",
watchClient: &cache.ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return watch.NewProxyWatcher(arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
makeTestEvent(6),
{
Type: watch.Error,
Object: &unexpectedError{},
},
makeTestEvent(7),
}))), nil
},
},
watchCount: 2,
expected: []watch.Event{
makeTestEvent(6),
makeTestEvent(7),
},
},
{
name: "survives 1 closed watch and reads 1 item",
initialRV: "5",
watchClient: &cache.ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return watch.NewProxyWatcher(closeAfterN(1, arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
makeTestEvent(6),
})))), nil
},
},
watchCount: 2,
expected: []watch.Event{
makeTestEvent(6),
},
},
{
name: "survives 2 closed watches and reads 2 items",
initialRV: "4",
watchClient: &cache.ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return watch.NewProxyWatcher(closeAfterN(1, arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
makeTestEvent(5),
makeTestEvent(6),
})))), nil
},
},
watchCount: 3,
expected: []watch.Event{
makeTestEvent(5),
makeTestEvent(6),
},
},
{
name: "survives 2 closed watches and reads 2 items for nonconsecutive RVs",
initialRV: "4",
watchClient: &cache.ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return watch.NewProxyWatcher(closeAfterN(1, arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
makeTestEvent(5),
makeTestEvent(7),
})))), nil
},
},
watchCount: 3,
expected: []watch.Event{
makeTestEvent(5),
makeTestEvent(7),
},
},
{
name: "survives 2 closed watches and reads 2 items for nonconsecutive RVs starting at much lower RV",
initialRV: "2",
watchClient: &cache.ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return watch.NewProxyWatcher(closeAfterN(1, arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
makeTestEvent(5),
makeTestEvent(7),
})))), nil
},
},
watchCount: 3,
expected: []watch.Event{
makeTestEvent(5),
makeTestEvent(7),
},
},
{
name: "survives 4 closed watches and reads 4 items for nonconsecutive, spread RVs",
initialRV: "2",
watchClient: &cache.ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return watch.NewProxyWatcher(closeAfterN(1, arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
makeTestEvent(5),
makeTestEvent(6),
makeTestEvent(7),
makeTestEvent(11),
})))), nil
},
},
watchCount: 5,
expected: []watch.Event{
makeTestEvent(5),
makeTestEvent(6),
makeTestEvent(7),
makeTestEvent(11),
},
},
{
name: "survives 4 closed watches and reads 4 items for nonconsecutive, spread RVs and skips those with lower or equal RV",
initialRV: "2",
watchClient: &cache.ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return watch.NewProxyWatcher(closeAfterN(1, arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
makeTestEvent(1),
makeTestEvent(2),
makeTestEvent(5),
makeTestEvent(6),
makeTestEvent(7),
makeTestEvent(11),
})))), nil
},
},
watchCount: 5,
expected: []watch.Event{
makeTestEvent(5),
makeTestEvent(6),
makeTestEvent(7),
makeTestEvent(11),
},
},
{
name: "survives 2 closed watches and reads 2+2+1 items skipping those with equal RV",
initialRV: "1",
watchClient: &cache.ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return watch.NewProxyWatcher(closeAfterN(2, arrayToChannel(fromRV(options.ResourceVersion, []watch.Event{
makeTestEvent(1),
makeTestEvent(2),
makeTestEvent(5),
makeTestEvent(6),
makeTestEvent(7),
makeTestEvent(11),
})))), nil
},
},
watchCount: 3,
expected: []watch.Event{
makeTestEvent(2),
makeTestEvent(5),
makeTestEvent(6),
makeTestEvent(7),
makeTestEvent(11),
},
},
}
for _, tc := range tt {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
atomicCounter, watchFunc := withCounter(tc.watchClient)
watcher, err := newRetryWatcher(tc.initialRV, watchFunc, time.Duration(0))
if err != nil {
t.Fatalf("failed to create a RetryWatcher: %v", err)
}
defer func() {
watcher.Stop()
t.Log("Waiting on RetryWatcher to stop...")
<-watcher.Done()
}()
var got []watch.Event
for i := 0; i < len(tc.expected); i++ {
event, ok := <-watcher.ResultChan()
if !ok {
t.Error(spew.Errorf("expected event %#+v, but channel is closed"), tc.expected[i])
break
}
got = append(got, event)
}
// (Sanity check, best effort) Make sure there are no more events to be received
// RetryWatcher proxies the source channel so we can't try reading it immediately
// but have to tolerate some delay. Given this is best effort detection we can use short duration.
// It also makes sure that for 0 events the watchFunc has time to be called.
select {
case event, ok := <-watcher.ResultChan():
if ok {
t.Error(spew.Errorf("Unexpected event received after reading all the expected ones: %#+v", event))
}
case <-time.After(10 * time.Millisecond):
break
}
counter := atomic.LoadUint32(atomicCounter)
if counter != tc.watchCount {
t.Errorf("expected %d watcher starts, but it has started %d times", tc.watchCount, counter)
}
if !reflect.DeepEqual(tc.expected, got) {
t.Fatal(spew.Errorf("expected %#+v, got %#+v;\ndiff: %s", tc.expected, got, diff.ObjectReflectDiff(tc.expected, got)))
}
})
}
}
func TestRetryWatcherToFinishWithUnreadEvents(t *testing.T) {
watcher, err := NewRetryWatcher("1", &cache.ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return watch.NewProxyWatcher(arrayToChannel([]watch.Event{
makeTestEvent(2),
})), nil
},
})
if err != nil {
t.Fatalf("failed to create a RetryWatcher: %v", err)
}
// Give the watcher a chance to get to sending events (blocking)
time.Sleep(10 * time.Millisecond)
watcher.Stop()
select {
case <-watcher.Done():
break
case <-time.After(10 * time.Millisecond):
t.Error("Failed to close the watcher")
}
// RetryWatcher result channel should be closed
_, ok := <-watcher.ResultChan()
if ok {
t.Error("ResultChan is not closed")
}
}

View File

@ -95,6 +95,25 @@ func UntilWithoutRetry(ctx context.Context, watcher watch.Interface, conditions
return lastEvent, nil
}
// Until wraps the watcherClient's watch function with RetryWatcher making sure that watcher gets restarted in case of errors.
// The initialResourceVersion will be given to watch method when first called. It shall not be "" or "0"
// given the underlying WATCH call issues (#74022). If you want the initial list ("", "0") done for you use ListWatchUntil instead.
// Remaining behaviour is identical to function UntilWithoutRetry. (See above.)
// Until can deal with API timeouts and lost connections.
// It guarantees you to see all events and in the order they happened.
// Due to this guarantee there is no way it can deal with 'Resource version too old error'. It will fail in this case.
// (See `UntilWithSync` if you'd prefer to recover from all the errors including RV too old by re-listing
// those items. In normal code you should care about being level driven so you'd not care about not seeing all the edges.)
// The most frequent usage for Until would be a test where you want to verify exact order of events ("edges").
func Until(ctx context.Context, initialResourceVersion string, watcherClient cache.Watcher, conditions ...ConditionFunc) (*watch.Event, error) {
w, err := NewRetryWatcher(initialResourceVersion, watcherClient)
if err != nil {
return nil, err
}
return UntilWithoutRetry(ctx, w, conditions...)
}
// UntilWithSync creates an informer from lw, optionally checks precondition when the store is synced,
// and watches the output until each provided condition succeeds, in a way that is identical
// to function UntilWithoutRetry. (See above.)

View File

@ -172,6 +172,21 @@ func TestWatchRestartsIfTimeoutNotReached(t *testing.T) {
}, // regular watcher; unfortunately destined to fail
normalizeOutputFunc: noopNormalization,
},
{
name: "RetryWatcher survives closed watches",
succeed: true,
secret: newTestSecret("secret-02"),
getWatcher: func(c *kubernetes.Clientset, secret *v1.Secret) (watch.Interface, error, func()) {
lw := &cache.ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return getWatchFunc(c, secret)(options)
},
}
w, err := watchtools.NewRetryWatcher(secret.ResourceVersion, lw)
return w, err, func() { <-w.Done() }
},
normalizeOutputFunc: noopNormalization,
},
{
name: "InformerWatcher survives closed watches",
succeed: true,