mirror of https://github.com/k3s-io/k3s
Merge pull request #49992 from liggitt/debug-flake
Automatic merge from submit-queue (batch tested with PRs 49992, 48861, 49267, 49356, 49886) Correctly handle empty watch event cache Fixes https://github.com/kubernetes/kubernetes/issues/49956 Introduced bypull/6/headada60236f7
which did not adjust the oldest available resourceVersion for an empty watch event cache. Exposed by74b9ba3b4d
, which allowed controllers to get list results from etcd before the watch cache is ready (normally they list with resourceVersion=0 which serves the list request from the watch cache, blocking until it is ready) When the watch cache had an empty cache of watch events, it currently allows establishing a watch as if it can deliver a watch event for its currently synced resourceVersion. This results in an off-by-one error which can result in a missed watch event. Scenario: bob: 1. creates object at resourceVersion=11 sally: 1. does a list API request, gets a list resourceVersion of 10 (just before bob creates the object) 2. starts watch handled by watch cache at resourceVersion=10 Watch cache: 1. initial list gets resourceVersion=11, including the item created by bob 2. when determining the initial watch events to send to sally's watch, there are no watch events in the cache, so no initial watch events are sent. 3. the cache listerwatcher watches etcd starting at resourceVersion=11, so future events are fed into the event cache and to sally's watch The watch cache should have dropped sally's watch from resourceVersion=10 with a "gone" error, since it can't deliver the watch event for resourceVersion=11. This would force sally to relist (where she would get a list at resourceVersion=11) and rewatch (from resourceVersion=11) This particularly affects tests that create CRD/TPRs and establish watches on the new types as the storage layer's watch cache is also populating for that type. ```release-note Fix a bug in watch cache sometimes causing missing events after watch cache initialization. ```
commit
35c3a51e2c
|
@ -537,6 +537,70 @@ func TestStartingResourceVersion(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestEmptyWatchEventCache(t *testing.T) {
|
||||
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
|
||||
defer server.Terminate(t)
|
||||
|
||||
// add a few objects
|
||||
updatePod(t, etcdStorage, makeTestPod("pod1"), nil)
|
||||
updatePod(t, etcdStorage, makeTestPod("pod2"), nil)
|
||||
updatePod(t, etcdStorage, makeTestPod("pod3"), nil)
|
||||
updatePod(t, etcdStorage, makeTestPod("pod4"), nil)
|
||||
updatePod(t, etcdStorage, makeTestPod("pod5"), nil)
|
||||
|
||||
fooCreated := updatePod(t, etcdStorage, makeTestPod("foo"), nil)
|
||||
|
||||
// get rv of last pod created
|
||||
rv, err := storage.ParseWatchResourceVersion(fooCreated.ResourceVersion)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
cacher := newTestCacher(etcdStorage, 10)
|
||||
defer cacher.Stop()
|
||||
|
||||
// We now have a cacher with an empty cache of watch events and a resourceVersion of rv.
|
||||
// It should support establishing watches from rv and higher, but not older.
|
||||
|
||||
{
|
||||
watcher, err := cacher.Watch(context.TODO(), "pods/ns", strconv.Itoa(int(rv-1)), storage.Everything)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
defer watcher.Stop()
|
||||
expectedGoneError := errors.NewGone("").ErrStatus
|
||||
verifyWatchEvent(t, watcher, watch.Error, &expectedGoneError)
|
||||
}
|
||||
|
||||
{
|
||||
watcher, err := cacher.Watch(context.TODO(), "pods/ns", strconv.Itoa(int(rv+1)), storage.Everything)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
defer watcher.Stop()
|
||||
select {
|
||||
case e := <-watcher.ResultChan():
|
||||
t.Errorf("unexpected event %#v", e)
|
||||
case <-time.After(3 * time.Second):
|
||||
// watch from rv+1 remained established successfully
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
watcher, err := cacher.Watch(context.TODO(), "pods/ns", strconv.Itoa(int(rv)), storage.Everything)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
defer watcher.Stop()
|
||||
select {
|
||||
case e := <-watcher.ResultChan():
|
||||
t.Errorf("unexpected event %#v", e)
|
||||
case <-time.After(3 * time.Second):
|
||||
// watch from rv remained established successfully
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRandomWatchDeliver(t *testing.T) {
|
||||
server, etcdStorage := newEtcdTestStorage(t, etcdtest.PathPrefix())
|
||||
defer server.Terminate(t)
|
||||
|
|
|
@ -412,7 +412,9 @@ func (w *watchCache) SetOnEvent(onEvent func(*watchCacheEvent)) {
|
|||
|
||||
func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*watchCacheEvent, error) {
|
||||
size := w.endIndex - w.startIndex
|
||||
oldest := w.resourceVersion
|
||||
// if we have no watch events in our cache, the oldest one we can successfully deliver to a watcher
|
||||
// is the *next* event we'll receive, which will be at least one greater than our current resourceVersion
|
||||
oldest := w.resourceVersion + 1
|
||||
if size > 0 {
|
||||
oldest = w.cache[w.startIndex%w.capacity].resourceVersion
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue