From 866cc1acab6c1c30a7550b2de7160c8052be884d Mon Sep 17 00:00:00 2001 From: Tomas Nozicka Date: Fri, 3 Aug 2018 16:45:41 +0200 Subject: [PATCH] Add UntilWithSync (informer based) --- .../k8s.io/apimachinery/pkg/watch/watch.go | 47 ++++ .../apimachinery/pkg/watch/watch_test.go | 38 +++ .../client-go/tools/watch/informerwatcher.go | 114 ++++++++ .../tools/watch/informerwatcher_test.go | 236 ++++++++++++++++ .../src/k8s.io/client-go/tools/watch/until.go | 42 +++ .../client-go/tools/watch/until_test.go | 129 +++++++++ test/integration/apimachinery/main_test.go | 27 ++ .../apimachinery/watch_restart_test.go | 258 ++++++++++++++++++ 8 files changed, 891 insertions(+) create mode 100644 staging/src/k8s.io/client-go/tools/watch/informerwatcher.go create mode 100644 staging/src/k8s.io/client-go/tools/watch/informerwatcher_test.go create mode 100644 test/integration/apimachinery/main_test.go create mode 100644 test/integration/apimachinery/watch_restart_test.go diff --git a/staging/src/k8s.io/apimachinery/pkg/watch/watch.go b/staging/src/k8s.io/apimachinery/pkg/watch/watch.go index 5c1380b234..a627d1d572 100644 --- a/staging/src/k8s.io/apimachinery/pkg/watch/watch.go +++ b/staging/src/k8s.io/apimachinery/pkg/watch/watch.go @@ -268,3 +268,50 @@ func (f *RaceFreeFakeWatcher) Action(action EventType, obj runtime.Object) { } } } + +// ProxyWatcher lets you wrap your channel in watch Interface. Threadsafe. +type ProxyWatcher struct { + result chan Event + stopCh chan struct{} + + mutex sync.Mutex + stopped bool +} + +var _ Interface = &ProxyWatcher{} + +// NewProxyWatcher creates new ProxyWatcher by wrapping a channel +func NewProxyWatcher(ch chan Event) *ProxyWatcher { + return &ProxyWatcher{ + result: ch, + stopCh: make(chan struct{}), + stopped: false, + } +} + +// Stop implements Interface +func (pw *ProxyWatcher) Stop() { + pw.mutex.Lock() + defer pw.mutex.Unlock() + if !pw.stopped { + pw.stopped = true + close(pw.stopCh) + } +} + +// Stopping returns true if Stop() has been called +func (pw *ProxyWatcher) Stopping() bool { + pw.mutex.Lock() + defer pw.mutex.Unlock() + return pw.stopped +} + +// ResultChan implements Interface +func (pw *ProxyWatcher) ResultChan() <-chan Event { + return pw.result +} + +// StopChan returns stop channel +func (pw *ProxyWatcher) StopChan() <-chan struct{} { + return pw.stopCh +} diff --git a/staging/src/k8s.io/apimachinery/pkg/watch/watch_test.go b/staging/src/k8s.io/apimachinery/pkg/watch/watch_test.go index bdf7fedd4a..4fb159b0dd 100644 --- a/staging/src/k8s.io/apimachinery/pkg/watch/watch_test.go +++ b/staging/src/k8s.io/apimachinery/pkg/watch/watch_test.go @@ -17,6 +17,7 @@ limitations under the License. package watch_test import ( + "reflect" "testing" "k8s.io/apimachinery/pkg/runtime" @@ -135,3 +136,40 @@ func TestEmpty(t *testing.T) { t.Errorf("unexpected result channel result") } } + +func TestProxyWatcher(t *testing.T) { + events := []Event{ + {Added, testType("foo")}, + {Modified, testType("qux")}, + {Modified, testType("bar")}, + {Deleted, testType("bar")}, + {Error, testType("error: blah")}, + } + + ch := make(chan Event, len(events)) + w := NewProxyWatcher(ch) + + for _, e := range events { + ch <- e + } + + for _, e := range events { + g := <-w.ResultChan() + if !reflect.DeepEqual(e, g) { + t.Errorf("Expected %#v, got %#v", e, g) + continue + } + } + + w.Stop() + + select { + // Closed channel always reads immediately + case <-w.StopChan(): + default: + t.Error("Channel isn't closed") + } + + // Test double close + w.Stop() +} diff --git a/staging/src/k8s.io/client-go/tools/watch/informerwatcher.go b/staging/src/k8s.io/client-go/tools/watch/informerwatcher.go new file mode 100644 index 0000000000..35a3469493 --- /dev/null +++ b/staging/src/k8s.io/client-go/tools/watch/informerwatcher.go @@ -0,0 +1,114 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import ( + "sync" + "sync/atomic" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" +) + +func newTicketer() *ticketer { + return &ticketer{ + cond: sync.NewCond(&sync.Mutex{}), + } +} + +type ticketer struct { + counter uint64 + + cond *sync.Cond + current uint64 +} + +func (t *ticketer) GetTicket() uint64 { + // -1 to start from 0 + return atomic.AddUint64(&t.counter, 1) - 1 +} + +func (t *ticketer) WaitForTicket(ticket uint64, f func()) { + t.cond.L.Lock() + defer t.cond.L.Unlock() + for ticket != t.current { + t.cond.Wait() + } + + f() + + t.current++ + t.cond.Broadcast() +} + +// NewIndexerInformerWatcher will create an IndexerInformer and wrap it into watch.Interface +// so you can use it anywhere where you'd have used a regular Watcher returned from Watch method. +func NewIndexerInformerWatcher(lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface) { + ch := make(chan watch.Event) + w := watch.NewProxyWatcher(ch) + t := newTicketer() + + indexer, informer := cache.NewIndexerInformer(lw, objType, 0, cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + go t.WaitForTicket(t.GetTicket(), func() { + select { + case ch <- watch.Event{ + Type: watch.Added, + Object: obj.(runtime.Object), + }: + case <-w.StopChan(): + } + }) + }, + UpdateFunc: func(old, new interface{}) { + go t.WaitForTicket(t.GetTicket(), func() { + select { + case ch <- watch.Event{ + Type: watch.Modified, + Object: new.(runtime.Object), + }: + case <-w.StopChan(): + } + }) + }, + DeleteFunc: func(obj interface{}) { + go t.WaitForTicket(t.GetTicket(), func() { + staleObj, stale := obj.(cache.DeletedFinalStateUnknown) + if stale { + // We have no means of passing the additional information down using watch API based on watch.Event + // but the caller can filter such objects by checking if metadata.deletionTimestamp is set + obj = staleObj + } + + select { + case ch <- watch.Event{ + Type: watch.Deleted, + Object: obj.(runtime.Object), + }: + case <-w.StopChan(): + } + }) + }, + }, cache.Indexers{}) + + go func() { + informer.Run(w.StopChan()) + }() + + return indexer, informer, w +} diff --git a/staging/src/k8s.io/client-go/tools/watch/informerwatcher_test.go b/staging/src/k8s.io/client-go/tools/watch/informerwatcher_test.go new file mode 100644 index 0000000000..e94b4d2563 --- /dev/null +++ b/staging/src/k8s.io/client-go/tools/watch/informerwatcher_test.go @@ -0,0 +1,236 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import ( + "math/rand" + "reflect" + "sort" + "testing" + "time" + + "github.com/davecgh/go-spew/spew" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/diff" + "k8s.io/apimachinery/pkg/watch" + fakeclientset "k8s.io/client-go/kubernetes/fake" + testcore "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" +) + +type byEventTypeAndName []watch.Event + +func (a byEventTypeAndName) Len() int { return len(a) } +func (a byEventTypeAndName) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a byEventTypeAndName) Less(i, j int) bool { + if a[i].Type < a[j].Type { + return true + } + + if a[i].Type > a[j].Type { + return false + } + + return a[i].Object.(*corev1.Secret).Name < a[j].Object.(*corev1.Secret).Name +} + +func TestTicketer(t *testing.T) { + tg := newTicketer() + + const numTickets = 100 // current golang limit for race detector is 8192 simultaneously alive goroutines + var tickets []uint64 + for i := 0; i < numTickets; i++ { + ticket := tg.GetTicket() + tickets = append(tickets, ticket) + + exp, got := uint64(i), ticket + if got != exp { + t.Fatalf("expected ticket %d, got %d", exp, got) + } + } + + // shuffle tickets + rand.Shuffle(len(tickets), func(i, j int) { + tickets[i], tickets[j] = tickets[j], tickets[i] + }) + + res := make(chan uint64, len(tickets)) + for _, ticket := range tickets { + go func(ticket uint64) { + time.Sleep(time.Duration(rand.Intn(50)) * time.Millisecond) + tg.WaitForTicket(ticket, func() { + res <- ticket + }) + }(ticket) + } + + for i := 0; i < numTickets; i++ { + exp, got := uint64(i), <-res + if got != exp { + t.Fatalf("expected ticket %d, got %d", exp, got) + } + } +} + +func TestNewInformerWatcher(t *testing.T) { + // Make sure there are no 2 same types of events on a secret with the same name or that might be flaky. + tt := []struct { + name string + objects []runtime.Object + events []watch.Event + }{ + { + name: "basic test", + objects: []runtime.Object{ + &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + }, + StringData: map[string]string{ + "foo-1": "initial", + }, + }, + &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-2", + }, + StringData: map[string]string{ + "foo-2": "initial", + }, + }, + &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-3", + }, + StringData: map[string]string{ + "foo-3": "initial", + }, + }, + }, + events: []watch.Event{ + { + Type: watch.Added, + Object: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-4", + }, + StringData: map[string]string{ + "foo-4": "initial", + }, + }, + }, + { + Type: watch.Modified, + Object: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-2", + }, + StringData: map[string]string{ + "foo-2": "new", + }, + }, + }, + { + Type: watch.Deleted, + Object: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-3", + }, + }, + }, + }, + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + var expected []watch.Event + for _, o := range tc.objects { + expected = append(expected, watch.Event{ + Type: watch.Added, + Object: o.DeepCopyObject(), + }) + } + for _, e := range tc.events { + expected = append(expected, *e.DeepCopy()) + } + + fake := fakeclientset.NewSimpleClientset(tc.objects...) + fakeWatch := watch.NewFakeWithChanSize(len(tc.events), false) + fake.PrependWatchReactor("secrets", testcore.DefaultWatchReactor(fakeWatch, nil)) + + for _, e := range tc.events { + fakeWatch.Action(e.Type, e.Object) + } + + lw := &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return fake.Core().Secrets("").List(options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return fake.Core().Secrets("").Watch(options) + }, + } + _, _, w := NewIndexerInformerWatcher(lw, &corev1.Secret{}) + + var result []watch.Event + loop: + for { + var event watch.Event + var ok bool + select { + case event, ok = <-w.ResultChan(): + if !ok { + t.Errorf("Failed to read event: channel is already closed!") + return + } + + result = append(result, *event.DeepCopy()) + case <-time.After(time.Second * 1): + // All the events are buffered -> this means we are done + // Also the one sec will make sure that we would detect RetryWatcher's incorrect behaviour after last event + break loop + } + } + + // Informers don't guarantee event order so we need to sort these arrays to compare them + sort.Sort(byEventTypeAndName(expected)) + sort.Sort(byEventTypeAndName(result)) + + if !reflect.DeepEqual(expected, result) { + t.Error(spew.Errorf("\nexpected: %#v,\ngot: %#v,\ndiff: %s", expected, result, diff.ObjectReflectDiff(expected, result))) + return + } + + // Fill in some data to test watch closing while there are some events to be read + for _, e := range tc.events { + fakeWatch.Action(e.Type, e.Object) + } + + // Stop before reading all the data to make sure the informer can deal with closed channel + w.Stop() + + // Wait a bit to see if the informer won't panic + // TODO: Try to figure out a more reliable mechanism than time.Sleep (https://github.com/kubernetes/kubernetes/pull/50102/files#r184716591) + time.Sleep(1 * time.Second) + }) + } + +} diff --git a/staging/src/k8s.io/client-go/tools/watch/until.go b/staging/src/k8s.io/client-go/tools/watch/until.go index 1907b68a3f..9335788439 100644 --- a/staging/src/k8s.io/client-go/tools/watch/until.go +++ b/staging/src/k8s.io/client-go/tools/watch/until.go @@ -19,16 +19,22 @@ package watch import ( "context" "errors" + "fmt" "time" "github.com/golang/glog" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/cache" ) +// PreconditionFunc returns true if the condition has been reached, false if it has not been reached yet, +// or an error if the condition failed or detected an error state. +type PreconditionFunc func(store cache.Store) (bool, error) + // ConditionFunc returns true if the condition has been reached, false if it has not been reached yet, // or an error if the condition cannot be checked and should terminate. In general, it is better to define // level driven conditions over edge driven conditions (pod has ready=true, vs pod modified and ready changed @@ -89,6 +95,42 @@ func UntilWithoutRetry(ctx context.Context, watcher watch.Interface, conditions return lastEvent, nil } +// UntilWithSync creates an informer from lw, optionally checks precondition when the store is synced, +// and watches the output until each provided condition succeeds, in a way that is identical +// to function UntilWithoutRetry. (See above.) +// UntilWithSync can deal with all errors like API timeout, lost connections and 'Resource version too old'. +// It is the only function that can recover from 'Resource version too old', Until and UntilWithoutRetry will +// just fail in that case. On the other hand it can't provide you with guarantees as strong as using simple +// Watch method with Until. It can skip some intermediate events in case of watch function failing but it will +// re-list to recover and you always get an event, if there has been a change, after recovery. +// Also with the current implementation based on DeltaFIFO, order of the events you receive is guaranteed only for +// particular object, not between more of them even it's the same resource. +// The most frequent usage would be a command that needs to watch the "state of the world" and should't fail, like: +// waiting for object reaching a state, "small" controllers, ... +func UntilWithSync(ctx context.Context, lw cache.ListerWatcher, objType runtime.Object, precondition PreconditionFunc, conditions ...ConditionFunc) (*watch.Event, error) { + indexer, informer, watcher := NewIndexerInformerWatcher(lw, objType) + // Proxy watcher can be stopped multiple times so it's fine to use defer here to cover alternative branches and + // let UntilWithoutRetry to stop it + defer watcher.Stop() + + if precondition != nil { + if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) { + return nil, fmt.Errorf("UntilWithSync: unable to sync caches: %v", ctx.Err()) + } + + done, err := precondition(indexer) + if err != nil { + return nil, err + } + + if done { + return nil, nil + } + } + + return UntilWithoutRetry(ctx, watcher, conditions...) +} + // ContextWithOptionalTimeout wraps context.WithTimeout and handles infinite timeouts expressed as 0 duration. func ContextWithOptionalTimeout(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc) { if timeout < 0 { diff --git a/staging/src/k8s.io/client-go/tools/watch/until_test.go b/staging/src/k8s.io/client-go/tools/watch/until_test.go index e766acd736..dd0559461e 100644 --- a/staging/src/k8s.io/client-go/tools/watch/until_test.go +++ b/staging/src/k8s.io/client-go/tools/watch/until_test.go @@ -19,14 +19,19 @@ package watch import ( "context" "errors" + "reflect" "strings" "testing" "time" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" + fakeclient "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/cache" ) type fakePod struct { @@ -172,3 +177,127 @@ func TestUntilErrorCondition(t *testing.T) { t.Fatalf("expected %q in error string, got %q", expected, err.Error()) } } + +func TestUntilWithSync(t *testing.T) { + // FIXME: test preconditions + tt := []struct { + name string + lw *cache.ListWatch + preconditionFunc PreconditionFunc + conditionFunc ConditionFunc + expectedErr error + expectedEvent *watch.Event + }{ + { + name: "doesn't wait for sync with no precondition", + lw: &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + select {} + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + select {} + }, + }, + preconditionFunc: nil, + conditionFunc: func(e watch.Event) (bool, error) { + return true, nil + }, + expectedErr: errors.New("timed out waiting for the condition"), + expectedEvent: nil, + }, + { + name: "waits indefinitely with precondition if it can't sync", + lw: &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + select {} + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + select {} + }, + }, + preconditionFunc: func(store cache.Store) (bool, error) { + return true, nil + }, + conditionFunc: func(e watch.Event) (bool, error) { + return true, nil + }, + expectedErr: errors.New("UntilWithSync: unable to sync caches: context deadline exceeded"), + expectedEvent: nil, + }, + { + name: "precondition can stop the loop", + lw: func() *cache.ListWatch { + fakeclient := fakeclient.NewSimpleClientset(&corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "first"}}) + + return &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return fakeclient.CoreV1().Secrets("").List(options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return fakeclient.CoreV1().Secrets("").Watch(options) + }, + } + }(), + preconditionFunc: func(store cache.Store) (bool, error) { + _, exists, err := store.Get(&metav1.ObjectMeta{Namespace: "", Name: "first"}) + if err != nil { + return true, err + } + if exists { + return true, nil + } + return false, nil + }, + conditionFunc: func(e watch.Event) (bool, error) { + return true, errors.New("should never reach this") + }, + expectedErr: nil, + expectedEvent: nil, + }, + { + name: "precondition lets it proceed to regular condition", + lw: func() *cache.ListWatch { + fakeclient := fakeclient.NewSimpleClientset(&corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "first"}}) + + return &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return fakeclient.CoreV1().Secrets("").List(options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return fakeclient.CoreV1().Secrets("").Watch(options) + }, + } + }(), + preconditionFunc: func(store cache.Store) (bool, error) { + return false, nil + }, + conditionFunc: func(e watch.Event) (bool, error) { + if e.Type == watch.Added { + return true, nil + } + panic("no other events are expected") + }, + expectedErr: nil, + expectedEvent: &watch.Event{Type: watch.Added, Object: &corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "first"}}}, + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + // Informer waits for caches to sync by polling in 100ms intervals, + // timeout needs to be reasonably higher + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + event, err := UntilWithSync(ctx, tc.lw, &corev1.Secret{}, tc.preconditionFunc, tc.conditionFunc) + + if !reflect.DeepEqual(err, tc.expectedErr) { + t.Errorf("expected error %#v, got %#v", tc.expectedErr, err) + } + + if !reflect.DeepEqual(event, tc.expectedEvent) { + t.Errorf("expected event %#v, got %#v", tc.expectedEvent, event) + } + }) + } +} diff --git a/test/integration/apimachinery/main_test.go b/test/integration/apimachinery/main_test.go new file mode 100644 index 0000000000..673c1cc1a0 --- /dev/null +++ b/test/integration/apimachinery/main_test.go @@ -0,0 +1,27 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apimachinery + +import ( + "testing" + + "k8s.io/kubernetes/test/integration/framework" +) + +func TestMain(m *testing.M) { + framework.EtcdMain(m.Run) +} diff --git a/test/integration/apimachinery/watch_restart_test.go b/test/integration/apimachinery/watch_restart_test.go new file mode 100644 index 0000000000..547ab8dd40 --- /dev/null +++ b/test/integration/apimachinery/watch_restart_test.go @@ -0,0 +1,258 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apimachinery + +import ( + "context" + "fmt" + "reflect" + "testing" + "time" + + "k8s.io/api/core/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" + restclient "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + watchtools "k8s.io/client-go/tools/watch" + "k8s.io/kubernetes/pkg/api/testapi" + "k8s.io/kubernetes/test/integration/framework" +) + +func noopNormalization(output []string) []string { + return output +} + +func normalizeInformerOutputFunc(initialVal string) func(output []string) []string { + return func(output []string) []string { + result := make([]string, 0, len(output)) + + // Removes initial value and all of its direct repetitions + lastVal := initialVal + for _, v := range output { + // Make values unique as informer(List+Watch) duplicates some events + if v == lastVal { + continue + } + result = append(result, v) + lastVal = v + } + + return result + } +} + +func TestWatchRestartsIfTimeoutNotReached(t *testing.T) { + // Has to be longer than 5 seconds + timeout := 2 * time.Minute + + // Set up a master + masterConfig := framework.NewIntegrationTestMasterConfig() + // Timeout is set random between MinRequestTimeout and 2x + masterConfig.GenericConfig.MinRequestTimeout = int(timeout.Seconds()) / 4 + _, s, closeFn := framework.RunAMaster(masterConfig) + defer closeFn() + + config := &restclient.Config{ + Host: s.URL, + ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Groups[corev1.GroupName].GroupVersion()}, + } + + namespaceObject := framework.CreateTestingNamespace("retry-watch", s, t) + defer framework.DeleteTestingNamespace(namespaceObject, s, t) + + getListFunc := func(c *kubernetes.Clientset, secret *v1.Secret) func(options metav1.ListOptions) *v1.SecretList { + return func(options metav1.ListOptions) *v1.SecretList { + options.FieldSelector = fields.OneTermEqualSelector("metadata.name", secret.Name).String() + res, err := c.CoreV1().Secrets(secret.Namespace).List(options) + if err != nil { + t.Fatalf("Failed to list Secrets: %v", err) + } + return res + } + } + + getWatchFunc := func(c *kubernetes.Clientset, secret *v1.Secret) func(options metav1.ListOptions) (watch.Interface, error) { + return func(options metav1.ListOptions) (watch.Interface, error) { + options.FieldSelector = fields.OneTermEqualSelector("metadata.name", secret.Name).String() + res, err := c.CoreV1().Secrets(secret.Namespace).Watch(options) + if err != nil { + t.Fatalf("Failed to create a watcher on Secrets: %v", err) + } + return res, err + } + } + + generateEvents := func(t *testing.T, c *kubernetes.Clientset, secret *v1.Secret, referenceOutput *[]string, stopChan chan struct{}, stoppedChan chan struct{}) { + defer close(stoppedChan) + counter := 0 + + // These 5 seconds are here to protect against a race at the end when we could write something there at the same time as watch.Until ends + softTimeout := timeout - 5*time.Second + if softTimeout < 0 { + panic("Timeout has to be grater than 5 seconds!") + } + endChannel := time.After(softTimeout) + for { + select { + // TODO: get this lower once we figure out how to extend ETCD cache + case <-time.After(1000 * time.Millisecond): + counter = counter + 1 + + patch := fmt.Sprintf(`{"metadata": {"annotations": {"count": "%d"}}}`, counter) + _, err := c.CoreV1().Secrets(secret.Namespace).Patch(secret.Name, types.StrategicMergePatchType, []byte(patch)) + if err != nil { + t.Fatalf("Failed to patch secret: %v", err) + } + + *referenceOutput = append(*referenceOutput, fmt.Sprintf("%d", counter)) + case <-endChannel: + return + case <-stopChan: + return + } + } + } + + initialCount := "0" + newTestSecret := func(name string) *v1.Secret { + return &v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespaceObject.Name, + Annotations: map[string]string{ + "count": initialCount, + }, + }, + Data: map[string][]byte{ + "data": []byte("value1\n"), + }, + } + } + + tt := []struct { + name string + succeed bool + secret *v1.Secret + getWatcher func(c *kubernetes.Clientset, secret *v1.Secret) (watch.Interface, error) + normalizeOutputFunc func(referenceOutput []string) []string + }{ + { + name: "regular watcher should fail", + succeed: false, + secret: newTestSecret("secret-01"), + getWatcher: func(c *kubernetes.Clientset, secret *v1.Secret) (watch.Interface, error) { + options := metav1.ListOptions{ + ResourceVersion: secret.ResourceVersion, + } + return getWatchFunc(c, secret)(options) + }, // regular watcher; unfortunately destined to fail + normalizeOutputFunc: noopNormalization, + }, + { + name: "InformerWatcher survives closed watches", + succeed: true, + secret: newTestSecret("secret-03"), + getWatcher: func(c *kubernetes.Clientset, secret *v1.Secret) (watch.Interface, error) { + lw := &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return getListFunc(c, secret)(options), nil + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return getWatchFunc(c, secret)(options) + }, + } + _, _, w := watchtools.NewIndexerInformerWatcher(lw, &v1.Secret{}) + return w, nil + }, + normalizeOutputFunc: normalizeInformerOutputFunc(initialCount), + }, + } + + for _, tmptc := range tt { + tc := tmptc // we need to copy it for parallel runs + t.Run(tc.name, func(t *testing.T) { + c, err := kubernetes.NewForConfig(config) + if err != nil { + t.Fatalf("Failed to create clientset: %v", err) + } + + secret, err := c.CoreV1().Secrets(tc.secret.Namespace).Create(tc.secret) + if err != nil { + t.Fatalf("Failed to create testing secret %s/%s: %v", tc.secret.Namespace, tc.secret.Name, err) + } + + watcher, err := tc.getWatcher(c, secret) + if err != nil { + t.Fatalf("Failed to create watcher: %v", err) + } + + var referenceOutput []string + var output []string + stopChan := make(chan struct{}) + stoppedChan := make(chan struct{}) + go generateEvents(t, c, secret, &referenceOutput, stopChan, stoppedChan) + + // Record current time to be able to asses if the timeout has been reached + startTime := time.Now() + ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout) + defer cancel() + _, err = watchtools.UntilWithoutRetry(ctx, watcher, func(event watch.Event) (bool, error) { + s, ok := event.Object.(*v1.Secret) + if !ok { + t.Fatalf("Received an object that is not a Secret: %#v", event.Object) + } + output = append(output, s.Annotations["count"]) + // Watch will never end voluntarily + return false, nil + }) + watchDuration := time.Since(startTime) + close(stopChan) + <-stoppedChan + + output = tc.normalizeOutputFunc(output) + + t.Logf("Watch duration: %v; timeout: %v", watchDuration, timeout) + + if err == nil && !tc.succeed { + t.Fatalf("Watch should have timed out but it exited without an error!") + } + + if err != wait.ErrWaitTimeout && tc.succeed { + t.Fatalf("Watch exited with error: %v!", err) + } + + if watchDuration < timeout && tc.succeed { + t.Fatalf("Watch should have timed out after %v but it timed out prematurely after %v!", timeout, watchDuration) + } + + if watchDuration >= timeout && !tc.succeed { + t.Fatalf("Watch should have timed out but it succeeded!") + } + + if tc.succeed && !reflect.DeepEqual(referenceOutput, output) { + t.Fatalf("Reference and real output differ! We must have lost some events or read some multiple times!\nRef: %#v\nReal: %#v", referenceOutput, output) + } + }) + } +}