From d70edd3d39d4430d71c4b7c9adba8df5ba7f16c8 Mon Sep 17 00:00:00 2001 From: "fansong.cfs" Date: Tue, 19 Mar 2019 18:16:23 +0800 Subject: [PATCH] add watch bookmark --- .../generic/registry/decorated_watcher.go | 2 +- .../pkg/registry/generic/registry/store.go | 1 + .../k8s.io/apiserver/pkg/storage/cacher/BUILD | 3 + .../apiserver/pkg/storage/cacher/cacher.go | 214 +++++++++++++++-- .../storage/cacher/cacher_whitebox_test.go | 223 +++++++++++++++++- .../pkg/storage/cacher/watch_cache.go | 2 + .../pkg/storage/selection_predicate.go | 13 +- .../k8s.io/apiserver/pkg/storage/tests/BUILD | 3 + .../pkg/storage/tests/cacher_test.go | 134 +++++++++++ .../k8s.io/client-go/rest/watch/decoder.go | 2 +- .../client-go/rest/watch/decoder_test.go | 2 +- .../client-go/rest/watch/encoder_test.go | 4 + .../k8s.io/client-go/tools/cache/reflector.go | 7 + .../client-go/tools/watch/retrywatcher.go | 2 +- 14 files changed, 572 insertions(+), 40 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/decorated_watcher.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/decorated_watcher.go index d6fba97009..005a376d40 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/decorated_watcher.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/decorated_watcher.go @@ -55,7 +55,7 @@ func (d *decoratedWatcher) run(ctx context.Context) { return } switch recv.Type { - case watch.Added, watch.Modified, watch.Deleted: + case watch.Added, watch.Modified, watch.Deleted, watch.Bookmark: err := d.decorator(recv.Object) if err != nil { send = makeStatusErrorEvent(err) diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go index 25b867ea9d..1db4845def 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go @@ -1075,6 +1075,7 @@ func (e *Store) Watch(ctx context.Context, options *metainternalversion.ListOpti resourceVersion := "" if options != nil { resourceVersion = options.ResourceVersion + predicate.AllowWatchBookmarks = options.AllowWatchBookmarks } return e.WatchPredicate(ctx, predicate, resourceVersion) } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/BUILD b/staging/src/k8s.io/apiserver/pkg/storage/cacher/BUILD index 0f47738e31..ba52a00f1f 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/BUILD @@ -59,8 +59,11 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", "//staging/src/k8s.io/apiserver/pkg/apis/example:go_default_library", "//staging/src/k8s.io/apiserver/pkg/apis/example/v1:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/features:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/etcd:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature/testing:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", ], ) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 0bb903f8c4..cb1e11437a 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -33,6 +33,7 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/clock" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" @@ -159,6 +160,53 @@ func (i *indexedWatchers) terminateAll(objectType reflect.Type, done func(*cache } } +// As we don't need a high precision here, we keep all watchers timeout within a +// second in a bucket, and pop up them once at the timeout. To be more specific, +// if you set fire time at X, you can get the bookmark within (X-1,X+1) period. +// This is NOT thread-safe. +type watcherBookmarkTimeBuckets struct { + watchersBuckets map[int64][]*cacheWatcher + startBucketID int64 + clock clock.Clock +} + +func newTimeBucketWatchers(clock clock.Clock) *watcherBookmarkTimeBuckets { + return &watcherBookmarkTimeBuckets{ + watchersBuckets: make(map[int64][]*cacheWatcher), + startBucketID: clock.Now().Unix(), + clock: clock, + } +} + +// adds a watcher to the bucket, if the deadline is before the start, it will be +// added to the first one. +func (t *watcherBookmarkTimeBuckets) addWatcher(w *cacheWatcher) bool { + nextTime, ok := w.nextBookmarkTime(t.clock.Now()) + if !ok { + return false + } + bucketID := nextTime.Unix() + if bucketID < t.startBucketID { + bucketID = t.startBucketID + } + watchers, _ := t.watchersBuckets[bucketID] + t.watchersBuckets[bucketID] = append(watchers, w) + return true +} + +func (t *watcherBookmarkTimeBuckets) popExpiredWatchers() [][]*cacheWatcher { + currentBucketID := t.clock.Now().Unix() + // There should be one or two elements in almost all cases + expiredWatchers := make([][]*cacheWatcher, 0, 2) + for ; t.startBucketID <= currentBucketID; t.startBucketID++ { + if watchers, ok := t.watchersBuckets[t.startBucketID]; ok { + delete(t.watchersBuckets, t.startBucketID) + expiredWatchers = append(expiredWatchers, watchers) + } + } + return expiredWatchers +} + type filterWithAttrsFunc func(key string, l labels.Set, f fields.Set) bool // Cacher is responsible for serving WATCH and LIST requests for a given @@ -197,6 +245,9 @@ type Cacher struct { // Versioner is used to handle resource versions. versioner storage.Versioner + // newFunc is a function that creates new empty object storing a object of type Type. + newFunc func() runtime.Object + // triggerFunc is used for optimizing amount of watchers that needs to process // an incoming event. triggerFunc storage.TriggerPublisherFunc @@ -215,6 +266,7 @@ type Cacher struct { stopCh chan struct{} stopWg sync.WaitGroup + clock clock.Clock // timer is used to avoid unnecessary allocations in underlying watchers. timer *time.Timer @@ -228,6 +280,10 @@ type Cacher struct { // during current dispatching, but stopping was deferred to the end of // dispatching that event to avoid race with closing channels in watchers. watchersToStop []*cacheWatcher + // Maintain a timeout queue to send the bookmark event before the watcher times out. + bookmarkWatchers *watcherBookmarkTimeBuckets + // watchBookmark feature-gate + watchBookmarkEnabled bool } // NewCacherFromConfig creates a new Cacher responsible for servicing WATCH and LIST requests from @@ -251,6 +307,8 @@ func NewCacherFromConfig(config Config) *Cacher { // Configure reflector's pager to for an appropriate pagination chunk size for fetching data from // storage. The pager falls back to full list if paginated list calls fail due to an "Expired" error. reflector.WatchListPageSize = storageWatchListPageSize + + clock := clock.RealClock{} cacher := &Cacher{ ready: newReady(), storage: config.Storage, @@ -258,6 +316,7 @@ func NewCacherFromConfig(config Config) *Cacher { watchCache: watchCache, reflector: reflector, versioner: config.Versioner, + newFunc: config.NewFunc, triggerFunc: config.TriggerPublisherFunc, watcherIdx: 0, watchers: indexedWatchers{ @@ -272,8 +331,11 @@ func NewCacherFromConfig(config Config) *Cacher { // - reflector.ListAndWatch // and there are no guarantees on the order that they will stop. // So we will be simply closing the channel, and synchronizing on the WaitGroup. - stopCh: stopCh, - timer: time.NewTimer(time.Duration(0)), + stopCh: stopCh, + clock: clock, + timer: time.NewTimer(time.Duration(0)), + bookmarkWatchers: newTimeBucketWatchers(clock), + watchBookmarkEnabled: utilfeature.DefaultFeatureGate.Enabled(features.WatchBookmark), } watchCache.SetOnEvent(cacher.processEvent) go cacher.dispatchEvents() @@ -375,11 +437,13 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, chanSize = 1000 } + // Determine watch timeout('0' means deadline is not set, ignore checking) + deadline, _ := ctx.Deadline() // Create a watcher here to reduce memory allocations under lock, // given that memory allocation may trigger GC and block the thread. // Also note that emptyFunc is a placeholder, until we will be able // to compute watcher.forget function (which has to happen under lock). - watcher := newCacheWatcher(chanSize, filterWithAttrsFunction(key, pred), emptyFunc, c.versioner) + watcher := newCacheWatcher(chanSize, filterWithAttrsFunction(key, pred), emptyFunc, c.versioner, deadline, pred.AllowWatchBookmarks) // We explicitly use thread unsafe version and do locking ourself to ensure that // no new events will be processed in the meantime. The watchCache will be unlocked @@ -409,6 +473,11 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, // Update watcher.forget function once we can compute it. watcher.forget = forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported) c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported) + + // Add it to the queue only when server and client support watch bookmarks. + if c.watchBookmarkEnabled && watcher.allowWatchBookmarks { + c.bookmarkWatchers.addWatcher(watcher) + } c.watcherIdx++ }() @@ -672,6 +741,15 @@ func (c *Cacher) processEvent(event *watchCacheEvent) { } func (c *Cacher) dispatchEvents() { + // Jitter to help level out any aggregate load. + bookmarkTimer := c.clock.NewTimer(wait.Jitter(time.Second, 0.25)) + // Stop the timer when watchBookmarkFeatureGate is not enabled. + if !c.watchBookmarkEnabled && !bookmarkTimer.Stop() { + <-bookmarkTimer.C() + } + defer bookmarkTimer.Stop() + + lastProcessedResourceVersion := uint64(0) for { select { case event, ok := <-c.incoming: @@ -679,6 +757,24 @@ func (c *Cacher) dispatchEvents() { return } c.dispatchEvent(&event) + lastProcessedResourceVersion = event.ResourceVersion + case <-bookmarkTimer.C(): + bookmarkTimer.Reset(wait.Jitter(time.Second, 0.25)) + // Never send a bookmark event if we did not see an event here, this is fine + // because we don't provide any guarantees on sending bookmarks. + if lastProcessedResourceVersion == 0 { + continue + } + bookmarkEvent := &watchCacheEvent{ + Type: watch.Bookmark, + Object: c.newFunc(), + ResourceVersion: lastProcessedResourceVersion, + } + if err := c.versioner.UpdateObject(bookmarkEvent.Object, bookmarkEvent.ResourceVersion); err != nil { + klog.Errorf("failure to set resourceVersion to %d on bookmark event %+v", bookmarkEvent.ResourceVersion, bookmarkEvent.Object) + continue + } + c.dispatchEvent(bookmarkEvent) case <-c.stopCh: return } @@ -687,13 +783,36 @@ func (c *Cacher) dispatchEvents() { func (c *Cacher) dispatchEvent(event *watchCacheEvent) { c.startDispatching(event) + defer c.finishDispatching() + // Watchers stopped after startDispatching will be delayed to finishDispatching, // Since add() can block, we explicitly add when cacher is unlocked. - for _, watcher := range c.watchersBuffer { - watcher.add(event, c.timer, c.dispatchTimeoutBudget) + if event.Type == watch.Bookmark { + for _, watcher := range c.watchersBuffer { + watcher.nonblockingAdd(event) + } + } else { + for _, watcher := range c.watchersBuffer { + watcher.add(event, c.timer, c.dispatchTimeoutBudget) + } } +} - c.finishDispatching() +func (c *Cacher) startDispatchingBookmarkEvents() { + // Pop already expired watchers. However, explicitly ignore stopped ones, + // as we don't delete watcher from bookmarkWatchers when it is stopped. + for _, watchers := range c.bookmarkWatchers.popExpiredWatchers() { + for _, watcher := range watchers { + // watcher.stop() is protected by c.Lock() + if watcher.stopped { + continue + } + c.watchersBuffer = append(c.watchersBuffer, watcher) + // Given that we send bookmark event once at deadline-2s, never push again + // after the watcher pops up from the buckets. Once we decide to change the + // strategy to more sophisticated, we may need it here. + } + } } // startDispatching chooses watchers potentially interested in a given event @@ -712,6 +831,12 @@ func (c *Cacher) startDispatching(event *watchCacheEvent) { // gain from avoiding memory allocations is much bigger. c.watchersBuffer = c.watchersBuffer[:0] + if event.Type == watch.Bookmark { + c.startDispatchingBookmarkEvents() + // return here to reduce following code indentation and diff + return + } + // Iterate over "allWatchers" no matter what the trigger function is. for _, watcher := range c.watchers.allWatchers { c.watchersBuffer = append(c.watchersBuffer, watcher) @@ -904,17 +1029,23 @@ type cacheWatcher struct { stopped bool forget func() versioner storage.Versioner + // The watcher will be closed by server after the deadline, + // save it here to send bookmark events before that. + deadline time.Time + allowWatchBookmarks bool } -func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(), versioner storage.Versioner) *cacheWatcher { +func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(), versioner storage.Versioner, deadline time.Time, allowWatchBookmarks bool) *cacheWatcher { return &cacheWatcher{ - input: make(chan *watchCacheEvent, chanSize), - result: make(chan watch.Event, chanSize), - done: make(chan struct{}), - filter: filter, - stopped: false, - forget: forget, - versioner: versioner, + input: make(chan *watchCacheEvent, chanSize), + result: make(chan watch.Event, chanSize), + done: make(chan struct{}), + filter: filter, + stopped: false, + forget: forget, + versioner: versioner, + deadline: deadline, + allowWatchBookmarks: allowWatchBookmarks, } } @@ -928,6 +1059,9 @@ func (c *cacheWatcher) Stop() { c.forget() } +// TODO(#73958) +// stop() is protected by Cacher.Lock(), rename it to +// stopThreadUnsafe and remove the sync.Mutex. func (c *cacheWatcher) stop() { c.Lock() defer c.Unlock() @@ -938,12 +1072,20 @@ func (c *cacheWatcher) stop() { } } -func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer, budget *timeBudget) { - // Try to send the event immediately, without blocking. +func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool { + // If we can't send it, don't block on it. select { case c.input <- event: - return + return true default: + return false + } +} + +func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer, budget *timeBudget) { + // Try to send the event immediately, without blocking. + if c.nonblockingAdd(event) { + return } // OK, block sending, but only for up to . @@ -972,8 +1114,20 @@ func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer, budget *ti budget.returnUnused(timeout - time.Since(startTime)) } -// NOTE: sendWatchCacheEvent is assumed to not modify !!! -func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) { +func (c *cacheWatcher) nextBookmarkTime(now time.Time) (time.Time, bool) { + // For now we return 2s before deadline (and maybe +infinity is now already passed this time) + // but it gives us extensibility for the future(false when deadline is not set). + if c.deadline.IsZero() { + return c.deadline, false + } + return c.deadline.Add(-2 * time.Second), true +} + +func (c *cacheWatcher) convertToWatchEvent(event *watchCacheEvent) *watch.Event { + if event.Type == watch.Bookmark { + return &watch.Event{Type: watch.Bookmark, Object: event.Object.DeepCopyObject()} + } + curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.ObjLabels, event.ObjFields) oldObjPasses := false if event.PrevObject != nil { @@ -981,22 +1135,32 @@ func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) { } if !curObjPasses && !oldObjPasses { // Watcher is not interested in that object. - return + return nil } - var watchEvent watch.Event switch { case curObjPasses && !oldObjPasses: - watchEvent = watch.Event{Type: watch.Added, Object: event.Object.DeepCopyObject()} + return &watch.Event{Type: watch.Added, Object: event.Object.DeepCopyObject()} case curObjPasses && oldObjPasses: - watchEvent = watch.Event{Type: watch.Modified, Object: event.Object.DeepCopyObject()} + return &watch.Event{Type: watch.Modified, Object: event.Object.DeepCopyObject()} case !curObjPasses && oldObjPasses: // return a delete event with the previous object content, but with the event's resource version oldObj := event.PrevObject.DeepCopyObject() if err := c.versioner.UpdateObject(oldObj, event.ResourceVersion); err != nil { utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", event.ResourceVersion, oldObj, err)) } - watchEvent = watch.Event{Type: watch.Deleted, Object: oldObj} + return &watch.Event{Type: watch.Deleted, Object: oldObj} + } + + return nil +} + +// NOTE: sendWatchCacheEvent is assumed to not modify !!! +func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) { + watchEvent := c.convertToWatchEvent(event) + if watchEvent == nil { + // Watcher is not interested in that object. + return } // We need to ensure that if we put event X to the c.result, all @@ -1018,7 +1182,7 @@ func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) { } select { - case c.result <- watchEvent: + case c.result <- *watchEvent: case <-c.done: } } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 07dfcd5c5d..b7ab7d8efa 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -32,13 +32,17 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/diff" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/apis/example" examplev1 "k8s.io/apiserver/pkg/apis/example/v1" + "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage" + utilfeature "k8s.io/apiserver/pkg/util/feature" + utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing" ) // verifies the cacheWatcher.process goroutine is properly cleaned up even if @@ -63,7 +67,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) { } // set the size of the buffer of w.result to 0, so that the writes to // w.result is blocked. - w = newCacheWatcher(0, filter, forget, testVersioner{}) + w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false) go w.process(context.Background(), initEvents, 0) w.Stop() if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) { @@ -183,8 +187,9 @@ TestCase: testCase.events[j].ResourceVersion = uint64(j) + 1 } - w := newCacheWatcher(0, filter, forget, testVersioner{}) + w := newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false) go w.process(context.Background(), testCase.events, 0) + ch := w.ResultChan() for j, event := range testCase.expected { e := <-ch @@ -461,7 +466,7 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) { // timeout to zero and run the Stop goroutine concurrently. // May sure that the watch will not be blocked on Stop. for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ { - w = newCacheWatcher(0, filter, forget, testVersioner{}) + w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false) go w.Stop() select { case <-done: @@ -470,11 +475,12 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) { } } + deadline := time.Now().Add(time.Hour) // After that, verifies the cacheWatcher.process goroutine works correctly. for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ { - w = newCacheWatcher(2, filter, emptyFunc, testVersioner{}) + w = newCacheWatcher(2, filter, emptyFunc, testVersioner{}, deadline, false) w.input <- &watchCacheEvent{Object: &v1.Pod{}, ResourceVersion: uint64(i + 1)} - ctx, _ := context.WithTimeout(context.Background(), time.Hour) + ctx, _ := context.WithDeadline(context.Background(), deadline) go w.process(ctx, nil, 0) select { case <-w.ResultChan(): @@ -484,3 +490,210 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) { w.Stop() } } + +func TestTimeBucketWatchersBasic(t *testing.T) { + filter := func(_ string, _ labels.Set, _ fields.Set) bool { + return true + } + forget := func() {} + + newWatcher := func(deadline time.Time) *cacheWatcher { + return newCacheWatcher(0, filter, forget, testVersioner{}, deadline, true) + } + + clock := clock.NewFakeClock(time.Now()) + watchers := newTimeBucketWatchers(clock) + now := clock.Now() + watchers.addWatcher(newWatcher(now.Add(10 * time.Second))) + watchers.addWatcher(newWatcher(now.Add(20 * time.Second))) + watchers.addWatcher(newWatcher(now.Add(20 * time.Second))) + + if len(watchers.watchersBuckets) != 2 { + t.Errorf("unexpected bucket size: %#v", watchers.watchersBuckets) + } + watchers0 := watchers.popExpiredWatchers() + if len(watchers0) != 0 { + t.Errorf("unexpected bucket size: %#v", watchers0) + } + + clock.Step(10 * time.Second) + watchers1 := watchers.popExpiredWatchers() + if len(watchers1) != 1 || len(watchers1[0]) != 1 { + t.Errorf("unexpected bucket size: %v", watchers1) + } + watchers1 = watchers.popExpiredWatchers() + if len(watchers1) != 0 { + t.Errorf("unexpected bucket size: %#v", watchers1) + } + + clock.Step(12 * time.Second) + watchers2 := watchers.popExpiredWatchers() + if len(watchers2) != 1 || len(watchers2[0]) != 2 { + t.Errorf("unexpected bucket size: %#v", watchers2) + } +} + +func testCacherSendBookmarkEvents(t *testing.T, watchCacheEnabled, allowWatchBookmarks, expectedBookmarks bool) { + defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, watchCacheEnabled)() + backingStorage := &dummyStorage{} + cacher, _ := newTestCacher(backingStorage, 1000) + defer cacher.Stop() + + // Wait until cacher is initialized. + cacher.ready.wait() + pred := storage.Everything + pred.AllowWatchBookmarks = allowWatchBookmarks + + ctx, _ := context.WithTimeout(context.Background(), 3*time.Second) + w, err := cacher.Watch(ctx, "pods/ns", "0", pred) + if err != nil { + t.Fatalf("Failed to create watch: %v", err) + } + + resourceVersion := uint64(1000) + go func() { + deadline := time.Now().Add(time.Second) + for i := 0; time.Now().Before(deadline); i++ { + err = cacher.watchCache.Add(&examplev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("pod-%d", i), + Namespace: "ns", + ResourceVersion: fmt.Sprintf("%v", resourceVersion+uint64(i)), + }}) + if err != nil { + t.Fatalf("failed to add a pod: %v", err) + } + time.Sleep(100 * time.Millisecond) + } + }() + + timeoutCh := time.After(2 * time.Second) + lastObservedRV := uint64(0) + for { + select { + case event, ok := <-w.ResultChan(): + if !ok { + t.Fatal("Unexpected closed") + } + rv, err := cacher.versioner.ObjectResourceVersion(event.Object) + if err != nil { + t.Errorf("failed to parse resource version from %#v", event.Object) + } + if event.Type == watch.Bookmark { + if !expectedBookmarks { + t.Fatalf("Unexpected bookmark events received") + } + + if rv < lastObservedRV { + t.Errorf("Unexpected bookmark event resource version %v (last %v)", rv, lastObservedRV) + } + return + } + lastObservedRV = rv + case <-timeoutCh: + if expectedBookmarks { + t.Fatal("Unexpected timeout to receive a bookmark event") + } + return + } + } +} + +func TestCacherSendBookmarkEvents(t *testing.T) { + testCases := []struct { + watchCacheEnabled bool + allowWatchBookmarks bool + expectedBookmarks bool + }{ + { + watchCacheEnabled: true, + allowWatchBookmarks: true, + expectedBookmarks: true, + }, + { + watchCacheEnabled: true, + allowWatchBookmarks: false, + expectedBookmarks: false, + }, + { + watchCacheEnabled: false, + allowWatchBookmarks: true, + expectedBookmarks: false, + }, + { + watchCacheEnabled: false, + allowWatchBookmarks: false, + expectedBookmarks: false, + }, + } + + for _, tc := range testCases { + testCacherSendBookmarkEvents(t, tc.watchCacheEnabled, tc.allowWatchBookmarks, tc.expectedBookmarks) + } +} + +func TestDispatchingBookmarkEventsWithConcurrentStop(t *testing.T) { + defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, true)() + backingStorage := &dummyStorage{} + cacher, _ := newTestCacher(backingStorage, 1000) + defer cacher.Stop() + + // Wait until cacher is initialized. + cacher.ready.wait() + + // Ensure there is some budget for slowing down processing. + cacher.dispatchTimeoutBudget.returnUnused(100 * time.Millisecond) + + resourceVersion := uint64(1000) + err := cacher.watchCache.Add(&examplev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("pod-0"), + Namespace: "ns", + ResourceVersion: fmt.Sprintf("%v", resourceVersion), + }}) + if err != nil { + t.Fatalf("failed to add a pod: %v", err) + } + + for i := 0; i < 1000; i++ { + pred := storage.Everything + pred.AllowWatchBookmarks = true + ctx, _ := context.WithTimeout(context.Background(), time.Second) + w, err := cacher.Watch(ctx, "pods/ns", "999", pred) + if err != nil { + t.Fatalf("Failed to create watch: %v", err) + } + bookmark := &watchCacheEvent{ + Type: watch.Bookmark, + ResourceVersion: uint64(i), + Object: cacher.newFunc(), + } + err = cacher.versioner.UpdateObject(bookmark.Object, bookmark.ResourceVersion) + if err != nil { + t.Fatalf("failure to update version of object (%d) %#v", bookmark.ResourceVersion, bookmark.Object) + } + + go func() { + cacher.dispatchEvent(bookmark) + }() + + go func() { + w.Stop() + }() + + done := make(chan struct{}) + go func() { + for range w.ResultChan() { + } + close(done) + }() + + select { + case <-done: + break + case <-time.After(time.Second): + t.Fatal("receive result timeout") + } + w.Stop() + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index 8b30538dd8..97efdb98cf 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -30,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/storage" "k8s.io/client-go/tools/cache" + "k8s.io/klog" utiltrace "k8s.io/utils/trace" ) @@ -386,6 +387,7 @@ func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error { w.onReplace() } w.cond.Broadcast() + klog.V(3).Infof("Replace watchCache (rev: %v) ", resourceVersion) return nil } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/selection_predicate.go b/staging/src/k8s.io/apiserver/pkg/storage/selection_predicate.go index bed741dfa7..b2f8c8e88e 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/selection_predicate.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/selection_predicate.go @@ -71,12 +71,13 @@ func (f AttrFunc) WithFieldMutation(fieldMutator FieldMutationFunc) AttrFunc { // SelectionPredicate is used to represent the way to select objects from api storage. type SelectionPredicate struct { - Label labels.Selector - Field fields.Selector - GetAttrs AttrFunc - IndexFields []string - Limit int64 - Continue string + Label labels.Selector + Field fields.Selector + GetAttrs AttrFunc + IndexFields []string + Limit int64 + Continue string + AllowWatchBookmarks bool } // Matches returns true if the given object's labels and fields (as diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/BUILD b/staging/src/k8s.io/apiserver/pkg/storage/tests/BUILD index 5fee66a22d..ce3f97d803 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/BUILD @@ -26,6 +26,7 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", "//staging/src/k8s.io/apiserver/pkg/apis/example:go_default_library", "//staging/src/k8s.io/apiserver/pkg/apis/example/v1:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/features:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/cacher:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/etcd:go_default_library", @@ -33,6 +34,8 @@ go_test( "//staging/src/k8s.io/apiserver/pkg/storage/etcd/testing:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/etcd3:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/value:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature/testing:go_default_library", ], ) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go index 95ebfba4e8..62d3af9de6 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go @@ -40,6 +40,7 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/apis/example" examplev1 "k8s.io/apiserver/pkg/apis/example/v1" + "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage" cacherstorage "k8s.io/apiserver/pkg/storage/cacher" etcdstorage "k8s.io/apiserver/pkg/storage/etcd" @@ -47,6 +48,8 @@ import ( etcdtesting "k8s.io/apiserver/pkg/storage/etcd/testing" "k8s.io/apiserver/pkg/storage/etcd3" "k8s.io/apiserver/pkg/storage/value" + utilfeature "k8s.io/apiserver/pkg/util/feature" + utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing" ) var ( @@ -121,6 +124,12 @@ func makeTestPod(name string) *example.Pod { } } +func createPod(s storage.Interface, obj *example.Pod) error { + key := "pods/" + obj.Namespace + "/" + obj.Name + out := &example.Pod{} + return s.Create(context.TODO(), key, obj, out, 0) +} + func updatePod(t *testing.T, s storage.Interface, obj, old *example.Pod) *example.Pod { updateFn := func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) { return obj.DeepCopyObject(), nil, nil @@ -773,4 +782,129 @@ func TestCacherListerWatcherPagination(t *testing.T) { if limit2.Items[0].Name != podFoo.Name { t.Errorf("Expected list2.Items[0] to be %s but got %s", podFoo.Name, limit2.Items[0].Name) } + +} + +func TestWatchDispatchBookmarkEvents(t *testing.T) { + defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, true)() + + server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix()) + defer server.Terminate(t) + cacher, v := newTestCacher(etcdStorage, 10) + defer cacher.Stop() + + fooCreated := updatePod(t, etcdStorage, makeTestPod("foo"), nil) + rv, err := v.ParseResourceVersion(fooCreated.ResourceVersion) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + startVersion := strconv.Itoa(int(rv)) + + tests := []struct { + timeout time.Duration + expected bool + allowWatchBookmark bool + }{ + { // test old client won't get Bookmark event + timeout: 2 * time.Second, + expected: false, + allowWatchBookmark: false, + }, + { + timeout: 2 * time.Second, + expected: true, + allowWatchBookmark: true, + }, + } + + for i, c := range tests { + pred := storage.Everything + pred.AllowWatchBookmarks = c.allowWatchBookmark + ctx, _ := context.WithTimeout(context.Background(), c.timeout) + watcher, err := cacher.Watch(ctx, "pods/ns/foo", startVersion, pred) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Create events of other pods + updatePod(t, etcdStorage, makeTestPod(fmt.Sprintf("foo-whatever-%d", i)), nil) + + // Now wait for Bookmark event + select { + case event, ok := <-watcher.ResultChan(): + if !ok && c.expected { + t.Errorf("Unexpected object watched (no objects)") + } + if c.expected && event.Type != watch.Bookmark { + t.Errorf("Unexpected object watched %#v", event) + } + case <-time.After(time.Second * 3): + if c.expected { + t.Errorf("Unexpected object watched (timeout)") + } + } + watcher.Stop() + } +} + +func TestWatchBookmarksWithCorrectResourceVersion(t *testing.T) { + defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, true)() + + server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix()) + defer server.Terminate(t) + cacher, v := newTestCacher(etcdStorage, 10) + defer cacher.Stop() + + pred := storage.Everything + pred.AllowWatchBookmarks = true + ctx, _ := context.WithTimeout(context.Background(), 2*time.Second) + watcher, err := cacher.WatchList(ctx, "pods/ns", "0", pred) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer watcher.Stop() + + done := make(chan struct{}) + defer close(done) + go func() { + for i := 0; i < 100; i++ { + select { + case <-done: + return + default: + pod := fmt.Sprintf("foo-%d", i) + err := createPod(etcdStorage, makeTestPod(pod)) + if err != nil { + t.Fatalf("failed to create pod %v", pod) + } + time.Sleep(time.Second / 100) + } + } + }() + + bookmarkReceived := false + lastObservedResourceVersion := uint64(0) + for event := range watcher.ResultChan() { + rv, err := v.ObjectResourceVersion(event.Object) + if err != nil { + t.Fatalf("failed to parse resourceVersion from %#v", event) + } + if event.Type == watch.Bookmark { + bookmarkReceived = true + // bookmark event has a RV greater than or equal to the before one + if rv < lastObservedResourceVersion { + t.Fatalf("Unexpected bookmark resourceVersion %v less than observed %v)", rv, lastObservedResourceVersion) + } + } else { + // non-bookmark event has a RV greater than anything before + if rv <= lastObservedResourceVersion { + t.Fatalf("Unexpected event resourceVersion %v less than or equal to bookmark %v)", rv, lastObservedResourceVersion) + } + } + lastObservedResourceVersion = rv + } + // Make sure we have received a bookmark event + if !bookmarkReceived { + t.Fatalf("Unpexected error, we did not received a bookmark event") + } } diff --git a/staging/src/k8s.io/client-go/rest/watch/decoder.go b/staging/src/k8s.io/client-go/rest/watch/decoder.go index 73bb63addf..e95c020b2e 100644 --- a/staging/src/k8s.io/client-go/rest/watch/decoder.go +++ b/staging/src/k8s.io/client-go/rest/watch/decoder.go @@ -54,7 +54,7 @@ func (d *Decoder) Decode() (watch.EventType, runtime.Object, error) { return "", nil, fmt.Errorf("unable to decode to metav1.Event") } switch got.Type { - case string(watch.Added), string(watch.Modified), string(watch.Deleted), string(watch.Error): + case string(watch.Added), string(watch.Modified), string(watch.Deleted), string(watch.Error), string(watch.Bookmark): default: return "", nil, fmt.Errorf("got invalid watch event type: %v", got.Type) } diff --git a/staging/src/k8s.io/client-go/rest/watch/decoder_test.go b/staging/src/k8s.io/client-go/rest/watch/decoder_test.go index bfc581303f..2e02cf7526 100644 --- a/staging/src/k8s.io/client-go/rest/watch/decoder_test.go +++ b/staging/src/k8s.io/client-go/rest/watch/decoder_test.go @@ -42,7 +42,7 @@ func getDecoder() runtime.Decoder { } func TestDecoder(t *testing.T) { - table := []watch.EventType{watch.Added, watch.Deleted, watch.Modified, watch.Error} + table := []watch.EventType{watch.Added, watch.Deleted, watch.Modified, watch.Error, watch.Bookmark} for _, eventType := range table { out, in := io.Pipe() diff --git a/staging/src/k8s.io/client-go/rest/watch/encoder_test.go b/staging/src/k8s.io/client-go/rest/watch/encoder_test.go index 577b7010da..b56ce4324a 100644 --- a/staging/src/k8s.io/client-go/rest/watch/encoder_test.go +++ b/staging/src/k8s.io/client-go/rest/watch/encoder_test.go @@ -56,6 +56,10 @@ func TestEncodeDecodeRoundTrip(t *testing.T) { watch.Deleted, &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, }, + { + watch.Bookmark, + &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + }, } for i, testCase := range testCases { buf := &bytes.Buffer{} diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector.go b/staging/src/k8s.io/client-go/tools/cache/reflector.go index 023fd5e8e1..4b5daeedc2 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector.go @@ -271,6 +271,11 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { // We want to avoid situations of hanging watchers. Stop any wachers that do not // receive any events within the timeout window. TimeoutSeconds: &timeoutSeconds, + // To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks. + // Reflector doesn't assume bookmarks are returned at all (if the server do not support + // watch bookmarks, it will ignore this field). + // Disabled in Alpha release of watch bookmarks feature. + AllowWatchBookmarks: false, } w, err := r.listerWatcher.Watch(options) @@ -368,6 +373,8 @@ loop: if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err)) } + case watch.Bookmark: + // A `Bookmark` means watch has synced here, just update the resourceVersion default: utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) } diff --git a/staging/src/k8s.io/client-go/tools/watch/retrywatcher.go b/staging/src/k8s.io/client-go/tools/watch/retrywatcher.go index e45d58ec15..47ae9df4af 100644 --- a/staging/src/k8s.io/client-go/tools/watch/retrywatcher.go +++ b/staging/src/k8s.io/client-go/tools/watch/retrywatcher.go @@ -153,7 +153,7 @@ func (rw *RetryWatcher) doReceive() (bool, time.Duration) { // We need to inspect the event and get ResourceVersion out of it switch event.Type { - case watch.Added, watch.Modified, watch.Deleted: + case watch.Added, watch.Modified, watch.Deleted, watch.Bookmark: metaObject, ok := event.Object.(resourceVersionGetter) if !ok { _ = rw.send(watch.Event{