diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go index ef0c4f38de..447be77997 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go @@ -537,6 +537,70 @@ func TestStartingResourceVersion(t *testing.T) { } } +func TestEmptyWatchEventCache(t *testing.T) { + server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix()) + defer server.Terminate(t) + + // add a few objects + updatePod(t, etcdStorage, makeTestPod("pod1"), nil) + updatePod(t, etcdStorage, makeTestPod("pod2"), nil) + updatePod(t, etcdStorage, makeTestPod("pod3"), nil) + updatePod(t, etcdStorage, makeTestPod("pod4"), nil) + updatePod(t, etcdStorage, makeTestPod("pod5"), nil) + + fooCreated := updatePod(t, etcdStorage, makeTestPod("foo"), nil) + + // get rv of last pod created + rv, err := storage.ParseWatchResourceVersion(fooCreated.ResourceVersion) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + cacher := newTestCacher(etcdStorage, 10) + defer cacher.Stop() + + // We now have a cacher with an empty cache of watch events and a resourceVersion of rv. + // It should support establishing watches from rv and higher, but not older. + + { + watcher, err := cacher.Watch(context.TODO(), "pods/ns", strconv.Itoa(int(rv-1)), storage.Everything) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer watcher.Stop() + expectedGoneError := errors.NewGone("").ErrStatus + verifyWatchEvent(t, watcher, watch.Error, &expectedGoneError) + } + + { + watcher, err := cacher.Watch(context.TODO(), "pods/ns", strconv.Itoa(int(rv+1)), storage.Everything) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer watcher.Stop() + select { + case e := <-watcher.ResultChan(): + t.Errorf("unexpected event %#v", e) + case <-time.After(3 * time.Second): + // watch from rv+1 remained established successfully + } + } + + { + watcher, err := cacher.Watch(context.TODO(), "pods/ns", strconv.Itoa(int(rv)), storage.Everything) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer watcher.Stop() + select { + case e := <-watcher.ResultChan(): + t.Errorf("unexpected event %#v", e) + case <-time.After(3 * time.Second): + // watch from rv remained established successfully + } + } +} + func TestRandomWatchDeliver(t *testing.T) { server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix()) defer server.Terminate(t) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/watch_cache.go index 1268b9d7a5..602312b605 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/watch_cache.go @@ -412,7 +412,9 @@ func (w *watchCache) SetOnEvent(onEvent func(*watchCacheEvent)) { func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*watchCacheEvent, error) { size := w.endIndex - w.startIndex - oldest := w.resourceVersion + // if we have no watch events in our cache, the oldest one we can successfully deliver to a watcher + // is the *next* event we'll receive, which will be at least one greater than our current resourceVersion + oldest := w.resourceVersion + 1 if size > 0 { oldest = w.cache[w.startIndex%w.capacity].resourceVersion }