Merge pull request #75474 from hormes/add_heartbeat_inside_watch

add heartbeat inside watch
k3s-v1.15.3
Kubernetes Prow Robot 2019-04-16 13:55:10 -07:00 committed by GitHub
commit 84a859fbcf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 572 additions and 40 deletions

View File

@ -55,7 +55,7 @@ func (d *decoratedWatcher) run(ctx context.Context) {
return
}
switch recv.Type {
case watch.Added, watch.Modified, watch.Deleted:
case watch.Added, watch.Modified, watch.Deleted, watch.Bookmark:
err := d.decorator(recv.Object)
if err != nil {
send = makeStatusErrorEvent(err)

View File

@ -1075,6 +1075,7 @@ func (e *Store) Watch(ctx context.Context, options *metainternalversion.ListOpti
resourceVersion := ""
if options != nil {
resourceVersion = options.ResourceVersion
predicate.AllowWatchBookmarks = options.AllowWatchBookmarks
}
return e.WatchPredicate(ctx, predicate, resourceVersion)
}

View File

@ -59,8 +59,11 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/example:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/example/v1:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature/testing:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
],
)

View File

@ -33,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
@ -159,6 +160,53 @@ func (i *indexedWatchers) terminateAll(objectType reflect.Type, done func(*cache
}
}
// As we don't need a high precision here, we keep all watchers timeout within a
// second in a bucket, and pop up them once at the timeout. To be more specific,
// if you set fire time at X, you can get the bookmark within (X-1,X+1) period.
// This is NOT thread-safe.
type watcherBookmarkTimeBuckets struct {
watchersBuckets map[int64][]*cacheWatcher
startBucketID int64
clock clock.Clock
}
func newTimeBucketWatchers(clock clock.Clock) *watcherBookmarkTimeBuckets {
return &watcherBookmarkTimeBuckets{
watchersBuckets: make(map[int64][]*cacheWatcher),
startBucketID: clock.Now().Unix(),
clock: clock,
}
}
// adds a watcher to the bucket, if the deadline is before the start, it will be
// added to the first one.
func (t *watcherBookmarkTimeBuckets) addWatcher(w *cacheWatcher) bool {
nextTime, ok := w.nextBookmarkTime(t.clock.Now())
if !ok {
return false
}
bucketID := nextTime.Unix()
if bucketID < t.startBucketID {
bucketID = t.startBucketID
}
watchers, _ := t.watchersBuckets[bucketID]
t.watchersBuckets[bucketID] = append(watchers, w)
return true
}
func (t *watcherBookmarkTimeBuckets) popExpiredWatchers() [][]*cacheWatcher {
currentBucketID := t.clock.Now().Unix()
// There should be one or two elements in almost all cases
expiredWatchers := make([][]*cacheWatcher, 0, 2)
for ; t.startBucketID <= currentBucketID; t.startBucketID++ {
if watchers, ok := t.watchersBuckets[t.startBucketID]; ok {
delete(t.watchersBuckets, t.startBucketID)
expiredWatchers = append(expiredWatchers, watchers)
}
}
return expiredWatchers
}
type filterWithAttrsFunc func(key string, l labels.Set, f fields.Set) bool
// Cacher is responsible for serving WATCH and LIST requests for a given
@ -197,6 +245,9 @@ type Cacher struct {
// Versioner is used to handle resource versions.
versioner storage.Versioner
// newFunc is a function that creates new empty object storing a object of type Type.
newFunc func() runtime.Object
// triggerFunc is used for optimizing amount of watchers that needs to process
// an incoming event.
triggerFunc storage.TriggerPublisherFunc
@ -215,6 +266,7 @@ type Cacher struct {
stopCh chan struct{}
stopWg sync.WaitGroup
clock clock.Clock
// timer is used to avoid unnecessary allocations in underlying watchers.
timer *time.Timer
@ -228,6 +280,10 @@ type Cacher struct {
// during current dispatching, but stopping was deferred to the end of
// dispatching that event to avoid race with closing channels in watchers.
watchersToStop []*cacheWatcher
// Maintain a timeout queue to send the bookmark event before the watcher times out.
bookmarkWatchers *watcherBookmarkTimeBuckets
// watchBookmark feature-gate
watchBookmarkEnabled bool
}
// NewCacherFromConfig creates a new Cacher responsible for servicing WATCH and LIST requests from
@ -251,6 +307,8 @@ func NewCacherFromConfig(config Config) *Cacher {
// Configure reflector's pager to for an appropriate pagination chunk size for fetching data from
// storage. The pager falls back to full list if paginated list calls fail due to an "Expired" error.
reflector.WatchListPageSize = storageWatchListPageSize
clock := clock.RealClock{}
cacher := &Cacher{
ready: newReady(),
storage: config.Storage,
@ -258,6 +316,7 @@ func NewCacherFromConfig(config Config) *Cacher {
watchCache: watchCache,
reflector: reflector,
versioner: config.Versioner,
newFunc: config.NewFunc,
triggerFunc: config.TriggerPublisherFunc,
watcherIdx: 0,
watchers: indexedWatchers{
@ -272,8 +331,11 @@ func NewCacherFromConfig(config Config) *Cacher {
// - reflector.ListAndWatch
// and there are no guarantees on the order that they will stop.
// So we will be simply closing the channel, and synchronizing on the WaitGroup.
stopCh: stopCh,
timer: time.NewTimer(time.Duration(0)),
stopCh: stopCh,
clock: clock,
timer: time.NewTimer(time.Duration(0)),
bookmarkWatchers: newTimeBucketWatchers(clock),
watchBookmarkEnabled: utilfeature.DefaultFeatureGate.Enabled(features.WatchBookmark),
}
watchCache.SetOnEvent(cacher.processEvent)
go cacher.dispatchEvents()
@ -375,11 +437,13 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
chanSize = 1000
}
// Determine watch timeout('0' means deadline is not set, ignore checking)
deadline, _ := ctx.Deadline()
// Create a watcher here to reduce memory allocations under lock,
// given that memory allocation may trigger GC and block the thread.
// Also note that emptyFunc is a placeholder, until we will be able
// to compute watcher.forget function (which has to happen under lock).
watcher := newCacheWatcher(chanSize, filterWithAttrsFunction(key, pred), emptyFunc, c.versioner)
watcher := newCacheWatcher(chanSize, filterWithAttrsFunction(key, pred), emptyFunc, c.versioner, deadline, pred.AllowWatchBookmarks)
// We explicitly use thread unsafe version and do locking ourself to ensure that
// no new events will be processed in the meantime. The watchCache will be unlocked
@ -409,6 +473,11 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
// Update watcher.forget function once we can compute it.
watcher.forget = forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
// Add it to the queue only when server and client support watch bookmarks.
if c.watchBookmarkEnabled && watcher.allowWatchBookmarks {
c.bookmarkWatchers.addWatcher(watcher)
}
c.watcherIdx++
}()
@ -672,6 +741,15 @@ func (c *Cacher) processEvent(event *watchCacheEvent) {
}
func (c *Cacher) dispatchEvents() {
// Jitter to help level out any aggregate load.
bookmarkTimer := c.clock.NewTimer(wait.Jitter(time.Second, 0.25))
// Stop the timer when watchBookmarkFeatureGate is not enabled.
if !c.watchBookmarkEnabled && !bookmarkTimer.Stop() {
<-bookmarkTimer.C()
}
defer bookmarkTimer.Stop()
lastProcessedResourceVersion := uint64(0)
for {
select {
case event, ok := <-c.incoming:
@ -679,6 +757,24 @@ func (c *Cacher) dispatchEvents() {
return
}
c.dispatchEvent(&event)
lastProcessedResourceVersion = event.ResourceVersion
case <-bookmarkTimer.C():
bookmarkTimer.Reset(wait.Jitter(time.Second, 0.25))
// Never send a bookmark event if we did not see an event here, this is fine
// because we don't provide any guarantees on sending bookmarks.
if lastProcessedResourceVersion == 0 {
continue
}
bookmarkEvent := &watchCacheEvent{
Type: watch.Bookmark,
Object: c.newFunc(),
ResourceVersion: lastProcessedResourceVersion,
}
if err := c.versioner.UpdateObject(bookmarkEvent.Object, bookmarkEvent.ResourceVersion); err != nil {
klog.Errorf("failure to set resourceVersion to %d on bookmark event %+v", bookmarkEvent.ResourceVersion, bookmarkEvent.Object)
continue
}
c.dispatchEvent(bookmarkEvent)
case <-c.stopCh:
return
}
@ -687,13 +783,36 @@ func (c *Cacher) dispatchEvents() {
func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
c.startDispatching(event)
defer c.finishDispatching()
// Watchers stopped after startDispatching will be delayed to finishDispatching,
// Since add() can block, we explicitly add when cacher is unlocked.
for _, watcher := range c.watchersBuffer {
watcher.add(event, c.timer, c.dispatchTimeoutBudget)
if event.Type == watch.Bookmark {
for _, watcher := range c.watchersBuffer {
watcher.nonblockingAdd(event)
}
} else {
for _, watcher := range c.watchersBuffer {
watcher.add(event, c.timer, c.dispatchTimeoutBudget)
}
}
}
c.finishDispatching()
func (c *Cacher) startDispatchingBookmarkEvents() {
// Pop already expired watchers. However, explicitly ignore stopped ones,
// as we don't delete watcher from bookmarkWatchers when it is stopped.
for _, watchers := range c.bookmarkWatchers.popExpiredWatchers() {
for _, watcher := range watchers {
// watcher.stop() is protected by c.Lock()
if watcher.stopped {
continue
}
c.watchersBuffer = append(c.watchersBuffer, watcher)
// Given that we send bookmark event once at deadline-2s, never push again
// after the watcher pops up from the buckets. Once we decide to change the
// strategy to more sophisticated, we may need it here.
}
}
}
// startDispatching chooses watchers potentially interested in a given event
@ -712,6 +831,12 @@ func (c *Cacher) startDispatching(event *watchCacheEvent) {
// gain from avoiding memory allocations is much bigger.
c.watchersBuffer = c.watchersBuffer[:0]
if event.Type == watch.Bookmark {
c.startDispatchingBookmarkEvents()
// return here to reduce following code indentation and diff
return
}
// Iterate over "allWatchers" no matter what the trigger function is.
for _, watcher := range c.watchers.allWatchers {
c.watchersBuffer = append(c.watchersBuffer, watcher)
@ -904,17 +1029,23 @@ type cacheWatcher struct {
stopped bool
forget func()
versioner storage.Versioner
// The watcher will be closed by server after the deadline,
// save it here to send bookmark events before that.
deadline time.Time
allowWatchBookmarks bool
}
func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(), versioner storage.Versioner) *cacheWatcher {
func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(), versioner storage.Versioner, deadline time.Time, allowWatchBookmarks bool) *cacheWatcher {
return &cacheWatcher{
input: make(chan *watchCacheEvent, chanSize),
result: make(chan watch.Event, chanSize),
done: make(chan struct{}),
filter: filter,
stopped: false,
forget: forget,
versioner: versioner,
input: make(chan *watchCacheEvent, chanSize),
result: make(chan watch.Event, chanSize),
done: make(chan struct{}),
filter: filter,
stopped: false,
forget: forget,
versioner: versioner,
deadline: deadline,
allowWatchBookmarks: allowWatchBookmarks,
}
}
@ -928,6 +1059,9 @@ func (c *cacheWatcher) Stop() {
c.forget()
}
// TODO(#73958)
// stop() is protected by Cacher.Lock(), rename it to
// stopThreadUnsafe and remove the sync.Mutex.
func (c *cacheWatcher) stop() {
c.Lock()
defer c.Unlock()
@ -938,12 +1072,20 @@ func (c *cacheWatcher) stop() {
}
}
func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer, budget *timeBudget) {
// Try to send the event immediately, without blocking.
func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool {
// If we can't send it, don't block on it.
select {
case c.input <- event:
return
return true
default:
return false
}
}
func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer, budget *timeBudget) {
// Try to send the event immediately, without blocking.
if c.nonblockingAdd(event) {
return
}
// OK, block sending, but only for up to <timeout>.
@ -972,8 +1114,20 @@ func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer, budget *ti
budget.returnUnused(timeout - time.Since(startTime))
}
// NOTE: sendWatchCacheEvent is assumed to not modify <event> !!!
func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
func (c *cacheWatcher) nextBookmarkTime(now time.Time) (time.Time, bool) {
// For now we return 2s before deadline (and maybe +infinity is now already passed this time)
// but it gives us extensibility for the future(false when deadline is not set).
if c.deadline.IsZero() {
return c.deadline, false
}
return c.deadline.Add(-2 * time.Second), true
}
func (c *cacheWatcher) convertToWatchEvent(event *watchCacheEvent) *watch.Event {
if event.Type == watch.Bookmark {
return &watch.Event{Type: watch.Bookmark, Object: event.Object.DeepCopyObject()}
}
curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.ObjLabels, event.ObjFields)
oldObjPasses := false
if event.PrevObject != nil {
@ -981,22 +1135,32 @@ func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
}
if !curObjPasses && !oldObjPasses {
// Watcher is not interested in that object.
return
return nil
}
var watchEvent watch.Event
switch {
case curObjPasses && !oldObjPasses:
watchEvent = watch.Event{Type: watch.Added, Object: event.Object.DeepCopyObject()}
return &watch.Event{Type: watch.Added, Object: event.Object.DeepCopyObject()}
case curObjPasses && oldObjPasses:
watchEvent = watch.Event{Type: watch.Modified, Object: event.Object.DeepCopyObject()}
return &watch.Event{Type: watch.Modified, Object: event.Object.DeepCopyObject()}
case !curObjPasses && oldObjPasses:
// return a delete event with the previous object content, but with the event's resource version
oldObj := event.PrevObject.DeepCopyObject()
if err := c.versioner.UpdateObject(oldObj, event.ResourceVersion); err != nil {
utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", event.ResourceVersion, oldObj, err))
}
watchEvent = watch.Event{Type: watch.Deleted, Object: oldObj}
return &watch.Event{Type: watch.Deleted, Object: oldObj}
}
return nil
}
// NOTE: sendWatchCacheEvent is assumed to not modify <event> !!!
func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
watchEvent := c.convertToWatchEvent(event)
if watchEvent == nil {
// Watcher is not interested in that object.
return
}
// We need to ensure that if we put event X to the c.result, all
@ -1018,7 +1182,7 @@ func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
}
select {
case c.result <- watchEvent:
case c.result <- *watchEvent:
case <-c.done:
}
}

View File

@ -32,13 +32,17 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/diff"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/apis/example"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing"
)
// verifies the cacheWatcher.process goroutine is properly cleaned up even if
@ -63,7 +67,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
}
// set the size of the buffer of w.result to 0, so that the writes to
// w.result is blocked.
w = newCacheWatcher(0, filter, forget, testVersioner{})
w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false)
go w.process(context.Background(), initEvents, 0)
w.Stop()
if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
@ -183,8 +187,9 @@ TestCase:
testCase.events[j].ResourceVersion = uint64(j) + 1
}
w := newCacheWatcher(0, filter, forget, testVersioner{})
w := newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false)
go w.process(context.Background(), testCase.events, 0)
ch := w.ResultChan()
for j, event := range testCase.expected {
e := <-ch
@ -461,7 +466,7 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) {
// timeout to zero and run the Stop goroutine concurrently.
// May sure that the watch will not be blocked on Stop.
for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ {
w = newCacheWatcher(0, filter, forget, testVersioner{})
w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false)
go w.Stop()
select {
case <-done:
@ -470,11 +475,12 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) {
}
}
deadline := time.Now().Add(time.Hour)
// After that, verifies the cacheWatcher.process goroutine works correctly.
for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ {
w = newCacheWatcher(2, filter, emptyFunc, testVersioner{})
w = newCacheWatcher(2, filter, emptyFunc, testVersioner{}, deadline, false)
w.input <- &watchCacheEvent{Object: &v1.Pod{}, ResourceVersion: uint64(i + 1)}
ctx, _ := context.WithTimeout(context.Background(), time.Hour)
ctx, _ := context.WithDeadline(context.Background(), deadline)
go w.process(ctx, nil, 0)
select {
case <-w.ResultChan():
@ -484,3 +490,210 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) {
w.Stop()
}
}
func TestTimeBucketWatchersBasic(t *testing.T) {
filter := func(_ string, _ labels.Set, _ fields.Set) bool {
return true
}
forget := func() {}
newWatcher := func(deadline time.Time) *cacheWatcher {
return newCacheWatcher(0, filter, forget, testVersioner{}, deadline, true)
}
clock := clock.NewFakeClock(time.Now())
watchers := newTimeBucketWatchers(clock)
now := clock.Now()
watchers.addWatcher(newWatcher(now.Add(10 * time.Second)))
watchers.addWatcher(newWatcher(now.Add(20 * time.Second)))
watchers.addWatcher(newWatcher(now.Add(20 * time.Second)))
if len(watchers.watchersBuckets) != 2 {
t.Errorf("unexpected bucket size: %#v", watchers.watchersBuckets)
}
watchers0 := watchers.popExpiredWatchers()
if len(watchers0) != 0 {
t.Errorf("unexpected bucket size: %#v", watchers0)
}
clock.Step(10 * time.Second)
watchers1 := watchers.popExpiredWatchers()
if len(watchers1) != 1 || len(watchers1[0]) != 1 {
t.Errorf("unexpected bucket size: %v", watchers1)
}
watchers1 = watchers.popExpiredWatchers()
if len(watchers1) != 0 {
t.Errorf("unexpected bucket size: %#v", watchers1)
}
clock.Step(12 * time.Second)
watchers2 := watchers.popExpiredWatchers()
if len(watchers2) != 1 || len(watchers2[0]) != 2 {
t.Errorf("unexpected bucket size: %#v", watchers2)
}
}
func testCacherSendBookmarkEvents(t *testing.T, watchCacheEnabled, allowWatchBookmarks, expectedBookmarks bool) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, watchCacheEnabled)()
backingStorage := &dummyStorage{}
cacher, _ := newTestCacher(backingStorage, 1000)
defer cacher.Stop()
// Wait until cacher is initialized.
cacher.ready.wait()
pred := storage.Everything
pred.AllowWatchBookmarks = allowWatchBookmarks
ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)
w, err := cacher.Watch(ctx, "pods/ns", "0", pred)
if err != nil {
t.Fatalf("Failed to create watch: %v", err)
}
resourceVersion := uint64(1000)
go func() {
deadline := time.Now().Add(time.Second)
for i := 0; time.Now().Before(deadline); i++ {
err = cacher.watchCache.Add(&examplev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("pod-%d", i),
Namespace: "ns",
ResourceVersion: fmt.Sprintf("%v", resourceVersion+uint64(i)),
}})
if err != nil {
t.Fatalf("failed to add a pod: %v", err)
}
time.Sleep(100 * time.Millisecond)
}
}()
timeoutCh := time.After(2 * time.Second)
lastObservedRV := uint64(0)
for {
select {
case event, ok := <-w.ResultChan():
if !ok {
t.Fatal("Unexpected closed")
}
rv, err := cacher.versioner.ObjectResourceVersion(event.Object)
if err != nil {
t.Errorf("failed to parse resource version from %#v", event.Object)
}
if event.Type == watch.Bookmark {
if !expectedBookmarks {
t.Fatalf("Unexpected bookmark events received")
}
if rv < lastObservedRV {
t.Errorf("Unexpected bookmark event resource version %v (last %v)", rv, lastObservedRV)
}
return
}
lastObservedRV = rv
case <-timeoutCh:
if expectedBookmarks {
t.Fatal("Unexpected timeout to receive a bookmark event")
}
return
}
}
}
func TestCacherSendBookmarkEvents(t *testing.T) {
testCases := []struct {
watchCacheEnabled bool
allowWatchBookmarks bool
expectedBookmarks bool
}{
{
watchCacheEnabled: true,
allowWatchBookmarks: true,
expectedBookmarks: true,
},
{
watchCacheEnabled: true,
allowWatchBookmarks: false,
expectedBookmarks: false,
},
{
watchCacheEnabled: false,
allowWatchBookmarks: true,
expectedBookmarks: false,
},
{
watchCacheEnabled: false,
allowWatchBookmarks: false,
expectedBookmarks: false,
},
}
for _, tc := range testCases {
testCacherSendBookmarkEvents(t, tc.watchCacheEnabled, tc.allowWatchBookmarks, tc.expectedBookmarks)
}
}
func TestDispatchingBookmarkEventsWithConcurrentStop(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, true)()
backingStorage := &dummyStorage{}
cacher, _ := newTestCacher(backingStorage, 1000)
defer cacher.Stop()
// Wait until cacher is initialized.
cacher.ready.wait()
// Ensure there is some budget for slowing down processing.
cacher.dispatchTimeoutBudget.returnUnused(100 * time.Millisecond)
resourceVersion := uint64(1000)
err := cacher.watchCache.Add(&examplev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("pod-0"),
Namespace: "ns",
ResourceVersion: fmt.Sprintf("%v", resourceVersion),
}})
if err != nil {
t.Fatalf("failed to add a pod: %v", err)
}
for i := 0; i < 1000; i++ {
pred := storage.Everything
pred.AllowWatchBookmarks = true
ctx, _ := context.WithTimeout(context.Background(), time.Second)
w, err := cacher.Watch(ctx, "pods/ns", "999", pred)
if err != nil {
t.Fatalf("Failed to create watch: %v", err)
}
bookmark := &watchCacheEvent{
Type: watch.Bookmark,
ResourceVersion: uint64(i),
Object: cacher.newFunc(),
}
err = cacher.versioner.UpdateObject(bookmark.Object, bookmark.ResourceVersion)
if err != nil {
t.Fatalf("failure to update version of object (%d) %#v", bookmark.ResourceVersion, bookmark.Object)
}
go func() {
cacher.dispatchEvent(bookmark)
}()
go func() {
w.Stop()
}()
done := make(chan struct{})
go func() {
for range w.ResultChan() {
}
close(done)
}()
select {
case <-done:
break
case <-time.After(time.Second):
t.Fatal("receive result timeout")
}
w.Stop()
}
}

View File

@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
utiltrace "k8s.io/utils/trace"
)
@ -386,6 +387,7 @@ func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error {
w.onReplace()
}
w.cond.Broadcast()
klog.V(3).Infof("Replace watchCache (rev: %v) ", resourceVersion)
return nil
}

View File

@ -71,12 +71,13 @@ func (f AttrFunc) WithFieldMutation(fieldMutator FieldMutationFunc) AttrFunc {
// SelectionPredicate is used to represent the way to select objects from api storage.
type SelectionPredicate struct {
Label labels.Selector
Field fields.Selector
GetAttrs AttrFunc
IndexFields []string
Limit int64
Continue string
Label labels.Selector
Field fields.Selector
GetAttrs AttrFunc
IndexFields []string
Limit int64
Continue string
AllowWatchBookmarks bool
}
// Matches returns true if the given object's labels and fields (as

View File

@ -26,6 +26,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/example:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/example/v1:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/cacher:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd:go_default_library",
@ -33,6 +34,8 @@ go_test(
"//staging/src/k8s.io/apiserver/pkg/storage/etcd/testing:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd3:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/value:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature/testing:go_default_library",
],
)

View File

@ -40,6 +40,7 @@ import (
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/apis/example"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
cacherstorage "k8s.io/apiserver/pkg/storage/cacher"
etcdstorage "k8s.io/apiserver/pkg/storage/etcd"
@ -47,6 +48,8 @@ import (
etcdtesting "k8s.io/apiserver/pkg/storage/etcd/testing"
"k8s.io/apiserver/pkg/storage/etcd3"
"k8s.io/apiserver/pkg/storage/value"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing"
)
var (
@ -121,6 +124,12 @@ func makeTestPod(name string) *example.Pod {
}
}
func createPod(s storage.Interface, obj *example.Pod) error {
key := "pods/" + obj.Namespace + "/" + obj.Name
out := &example.Pod{}
return s.Create(context.TODO(), key, obj, out, 0)
}
func updatePod(t *testing.T, s storage.Interface, obj, old *example.Pod) *example.Pod {
updateFn := func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
return obj.DeepCopyObject(), nil, nil
@ -773,4 +782,129 @@ func TestCacherListerWatcherPagination(t *testing.T) {
if limit2.Items[0].Name != podFoo.Name {
t.Errorf("Expected list2.Items[0] to be %s but got %s", podFoo.Name, limit2.Items[0].Name)
}
}
func TestWatchDispatchBookmarkEvents(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, true)()
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
defer server.Terminate(t)
cacher, v := newTestCacher(etcdStorage, 10)
defer cacher.Stop()
fooCreated := updatePod(t, etcdStorage, makeTestPod("foo"), nil)
rv, err := v.ParseResourceVersion(fooCreated.ResourceVersion)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
startVersion := strconv.Itoa(int(rv))
tests := []struct {
timeout time.Duration
expected bool
allowWatchBookmark bool
}{
{ // test old client won't get Bookmark event
timeout: 2 * time.Second,
expected: false,
allowWatchBookmark: false,
},
{
timeout: 2 * time.Second,
expected: true,
allowWatchBookmark: true,
},
}
for i, c := range tests {
pred := storage.Everything
pred.AllowWatchBookmarks = c.allowWatchBookmark
ctx, _ := context.WithTimeout(context.Background(), c.timeout)
watcher, err := cacher.Watch(ctx, "pods/ns/foo", startVersion, pred)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Create events of other pods
updatePod(t, etcdStorage, makeTestPod(fmt.Sprintf("foo-whatever-%d", i)), nil)
// Now wait for Bookmark event
select {
case event, ok := <-watcher.ResultChan():
if !ok && c.expected {
t.Errorf("Unexpected object watched (no objects)")
}
if c.expected && event.Type != watch.Bookmark {
t.Errorf("Unexpected object watched %#v", event)
}
case <-time.After(time.Second * 3):
if c.expected {
t.Errorf("Unexpected object watched (timeout)")
}
}
watcher.Stop()
}
}
func TestWatchBookmarksWithCorrectResourceVersion(t *testing.T) {
defer utilfeaturetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, true)()
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
defer server.Terminate(t)
cacher, v := newTestCacher(etcdStorage, 10)
defer cacher.Stop()
pred := storage.Everything
pred.AllowWatchBookmarks = true
ctx, _ := context.WithTimeout(context.Background(), 2*time.Second)
watcher, err := cacher.WatchList(ctx, "pods/ns", "0", pred)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer watcher.Stop()
done := make(chan struct{})
defer close(done)
go func() {
for i := 0; i < 100; i++ {
select {
case <-done:
return
default:
pod := fmt.Sprintf("foo-%d", i)
err := createPod(etcdStorage, makeTestPod(pod))
if err != nil {
t.Fatalf("failed to create pod %v", pod)
}
time.Sleep(time.Second / 100)
}
}
}()
bookmarkReceived := false
lastObservedResourceVersion := uint64(0)
for event := range watcher.ResultChan() {
rv, err := v.ObjectResourceVersion(event.Object)
if err != nil {
t.Fatalf("failed to parse resourceVersion from %#v", event)
}
if event.Type == watch.Bookmark {
bookmarkReceived = true
// bookmark event has a RV greater than or equal to the before one
if rv < lastObservedResourceVersion {
t.Fatalf("Unexpected bookmark resourceVersion %v less than observed %v)", rv, lastObservedResourceVersion)
}
} else {
// non-bookmark event has a RV greater than anything before
if rv <= lastObservedResourceVersion {
t.Fatalf("Unexpected event resourceVersion %v less than or equal to bookmark %v)", rv, lastObservedResourceVersion)
}
}
lastObservedResourceVersion = rv
}
// Make sure we have received a bookmark event
if !bookmarkReceived {
t.Fatalf("Unpexected error, we did not received a bookmark event")
}
}

View File

@ -54,7 +54,7 @@ func (d *Decoder) Decode() (watch.EventType, runtime.Object, error) {
return "", nil, fmt.Errorf("unable to decode to metav1.Event")
}
switch got.Type {
case string(watch.Added), string(watch.Modified), string(watch.Deleted), string(watch.Error):
case string(watch.Added), string(watch.Modified), string(watch.Deleted), string(watch.Error), string(watch.Bookmark):
default:
return "", nil, fmt.Errorf("got invalid watch event type: %v", got.Type)
}

View File

@ -42,7 +42,7 @@ func getDecoder() runtime.Decoder {
}
func TestDecoder(t *testing.T) {
table := []watch.EventType{watch.Added, watch.Deleted, watch.Modified, watch.Error}
table := []watch.EventType{watch.Added, watch.Deleted, watch.Modified, watch.Error, watch.Bookmark}
for _, eventType := range table {
out, in := io.Pipe()

View File

@ -56,6 +56,10 @@ func TestEncodeDecodeRoundTrip(t *testing.T) {
watch.Deleted,
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
},
{
watch.Bookmark,
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
},
}
for i, testCase := range testCases {
buf := &bytes.Buffer{}

View File

@ -271,6 +271,11 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
// We want to avoid situations of hanging watchers. Stop any wachers that do not
// receive any events within the timeout window.
TimeoutSeconds: &timeoutSeconds,
// To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
// Reflector doesn't assume bookmarks are returned at all (if the server do not support
// watch bookmarks, it will ignore this field).
// Disabled in Alpha release of watch bookmarks feature.
AllowWatchBookmarks: false,
}
w, err := r.listerWatcher.Watch(options)
@ -368,6 +373,8 @@ loop:
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err))
}
case watch.Bookmark:
// A `Bookmark` means watch has synced here, just update the resourceVersion
default:
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
}

View File

@ -153,7 +153,7 @@ func (rw *RetryWatcher) doReceive() (bool, time.Duration) {
// We need to inspect the event and get ResourceVersion out of it
switch event.Type {
case watch.Added, watch.Modified, watch.Deleted:
case watch.Added, watch.Modified, watch.Deleted, watch.Bookmark:
metaObject, ok := event.Object.(resourceVersionGetter)
if !ok {
_ = rw.send(watch.Event{