mirror of https://github.com/k3s-io/k3s
Merge pull request #73845 from wojtek-t/fix_watcher_going_back_in_time
Avoid going back in time in watchcache watcherspull/564/head
commit
fd633d192f
|
@ -347,6 +347,13 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
|
|||
chanSize = 1000
|
||||
}
|
||||
|
||||
// With some events already sent, update resourceVersion so that
|
||||
// events that were buffered and not yet processed won't be delivered
|
||||
// to this watcher second time causing going back in time.
|
||||
if len(initEvents) > 0 {
|
||||
watchRV = initEvents[len(initEvents)-1].ResourceVersion
|
||||
}
|
||||
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
|
||||
|
|
|
@ -213,7 +213,15 @@ func (testVersioner) PrepareObjectForStorage(obj runtime.Object) error {
|
|||
return fmt.Errorf("unimplemented")
|
||||
}
|
||||
func (testVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) {
|
||||
return 0, fmt.Errorf("unimplemented")
|
||||
accessor, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
version := accessor.GetResourceVersion()
|
||||
if len(version) == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
return strconv.ParseUint(version, 10, 64)
|
||||
}
|
||||
func (testVersioner) ParseResourceVersion(resourceVersion string) (uint64, error) {
|
||||
return strconv.ParseUint(resourceVersion, 10, 64)
|
||||
|
@ -351,3 +359,82 @@ func TestGetToListWithLimitAndRV0(t *testing.T) {
|
|||
t.Errorf("List with Limit without RV=0 should bypass cacher: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatcherNotGoingBackInTime(t *testing.T) {
|
||||
backingStorage := &dummyStorage{}
|
||||
cacher, _ := newTestCacher(backingStorage, 1000)
|
||||
defer cacher.Stop()
|
||||
|
||||
// Wait until cacher is initialized.
|
||||
cacher.ready.wait()
|
||||
|
||||
// Ensure there is some budget for slowing down processing.
|
||||
cacher.dispatchTimeoutBudget.returnUnused(100 * time.Millisecond)
|
||||
|
||||
makePod := func(i int) *examplev1.Pod {
|
||||
return &examplev1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: fmt.Sprintf("pod-%d", 1000+i),
|
||||
Namespace: "ns",
|
||||
ResourceVersion: fmt.Sprintf("%d", 1000+i),
|
||||
},
|
||||
}
|
||||
}
|
||||
if err := cacher.watchCache.Add(makePod(0)); err != nil {
|
||||
t.Errorf("error: %v", err)
|
||||
}
|
||||
|
||||
totalPods := 100
|
||||
|
||||
// Create watcher that will be slowing down reading.
|
||||
w1, err := cacher.Watch(context.TODO(), "pods/ns", "999", storage.Everything)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create watch: %v", err)
|
||||
}
|
||||
defer w1.Stop()
|
||||
go func() {
|
||||
a := 0
|
||||
for range w1.ResultChan() {
|
||||
time.Sleep(time.Millisecond)
|
||||
a++
|
||||
if a == 100 {
|
||||
break
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Now push a ton of object to cache.
|
||||
for i := 1; i < totalPods; i++ {
|
||||
cacher.watchCache.Add(makePod(i))
|
||||
}
|
||||
|
||||
// Create fast watcher and ensure it will get each object exactly once.
|
||||
w2, err := cacher.Watch(context.TODO(), "pods/ns", "999", storage.Everything)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create watch: %v", err)
|
||||
}
|
||||
defer w2.Stop()
|
||||
|
||||
shouldContinue := true
|
||||
currentRV := uint64(0)
|
||||
for shouldContinue {
|
||||
select {
|
||||
case event, ok := <-w2.ResultChan():
|
||||
if !ok {
|
||||
shouldContinue = false
|
||||
break
|
||||
}
|
||||
rv, err := testVersioner{}.ParseResourceVersion(event.Object.(*examplev1.Pod).ResourceVersion)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected parsing error: %v", err)
|
||||
} else {
|
||||
if rv < currentRV {
|
||||
t.Errorf("watcher going back in time")
|
||||
}
|
||||
currentRV = rv
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
w2.Stop()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue