mirror of https://github.com/k3s-io/k3s
Merge pull request #36561 from liggitt/etcd3-watch-zero
Automatic merge from submit-queue Fix watching from resourceVersion=0 in etcd3 watcher Fixes https://github.com/kubernetes/kubernetes/issues/36545 * Makes etcd3 consistent with watch cache behavior (all synthetic events sent for the initial list of items result in ADDED events) * Fixes errors if previous values of initial items had been compacted away * Removes fan-out Get() for previous values of initial items Should be fixed before making etcd3 the default (https://github.com/kubernetes/kubernetes/pull/36229)pull/6/head
commit
76f99d726c
|
@ -30,14 +30,15 @@ type event struct {
|
|||
isCreated bool
|
||||
}
|
||||
|
||||
func parseKV(kv *mvccpb.KeyValue, prevVal []byte) *event {
|
||||
// parseKV converts a KeyValue retrieved from an initial sync() listing to a synthetic isCreated event.
|
||||
func parseKV(kv *mvccpb.KeyValue) *event {
|
||||
return &event{
|
||||
key: string(kv.Key),
|
||||
value: kv.Value,
|
||||
prevValue: prevVal,
|
||||
prevValue: nil,
|
||||
rev: kv.ModRevision,
|
||||
isDeleted: false,
|
||||
isCreated: kv.ModRevision == kv.CreateRevision,
|
||||
isCreated: true,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -146,6 +146,7 @@ func (wc *watchChan) ResultChan() <-chan watch.Event {
|
|||
|
||||
// sync tries to retrieve existing data and send them to process.
|
||||
// The revision to watch will be set to the revision in response.
|
||||
// All events sent will have isCreated=true
|
||||
func (wc *watchChan) sync() error {
|
||||
opts := []clientv3.OpOption{}
|
||||
if wc.recursive {
|
||||
|
@ -156,17 +157,8 @@ func (wc *watchChan) sync() error {
|
|||
return err
|
||||
}
|
||||
wc.initialRev = getResp.Header.Revision
|
||||
|
||||
for _, kv := range getResp.Kvs {
|
||||
prevResp, err := wc.watcher.client.Get(wc.ctx, string(kv.Key), clientv3.WithRev(kv.ModRevision-1), clientv3.WithSerializable())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var prevVal []byte
|
||||
if len(prevResp.Kvs) > 0 {
|
||||
prevVal = prevResp.Kvs[0].Value
|
||||
}
|
||||
wc.sendEvent(parseKV(kv, prevVal))
|
||||
wc.sendEvent(parseKV(kv))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
@ -141,17 +142,64 @@ func TestDeleteTriggerWatch(t *testing.T) {
|
|||
|
||||
// TestWatchFromZero tests that
|
||||
// - watch from 0 should sync up and grab the object added before
|
||||
// - watch from 0 is able to return events for objects whose previous version has been compacted
|
||||
// - watch from non-0 should just watch changes after given version
|
||||
func TestWatchFromZero(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
key, storedObj := testPropogateStore(ctx, t, store, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}})
|
||||
key, storedObj := testPropogateStore(ctx, t, store, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "ns"}})
|
||||
|
||||
w, err := store.Watch(ctx, key, "0", storage.Everything)
|
||||
if err != nil {
|
||||
t.Fatalf("Watch failed: %v", err)
|
||||
}
|
||||
testCheckResult(t, 0, watch.Added, w, storedObj)
|
||||
w.Stop()
|
||||
|
||||
// Update
|
||||
out := &api.Pod{}
|
||||
err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate(
|
||||
func(runtime.Object) (runtime.Object, error) {
|
||||
return &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "ns", Annotations: map[string]string{"a": "1"}}}, nil
|
||||
}))
|
||||
if err != nil {
|
||||
t.Fatalf("GuaranteedUpdate failed: %v", err)
|
||||
}
|
||||
|
||||
// Make sure when we watch from 0 we receive an ADDED event
|
||||
w, err = store.Watch(ctx, key, "0", storage.Everything)
|
||||
if err != nil {
|
||||
t.Fatalf("Watch failed: %v", err)
|
||||
}
|
||||
testCheckResult(t, 1, watch.Added, w, out)
|
||||
w.Stop()
|
||||
|
||||
// Update again
|
||||
out = &api.Pod{}
|
||||
err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate(
|
||||
func(runtime.Object) (runtime.Object, error) {
|
||||
return &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "ns"}}, nil
|
||||
}))
|
||||
if err != nil {
|
||||
t.Fatalf("GuaranteedUpdate failed: %v", err)
|
||||
}
|
||||
|
||||
// Compact previous versions
|
||||
revToCompact, err := strconv.Atoi(out.ResourceVersion)
|
||||
if err != nil {
|
||||
t.Fatalf("Error converting %q to an int: %v", storedObj.ResourceVersion, err)
|
||||
}
|
||||
_, err = cluster.RandClient().Compact(ctx, int64(revToCompact), clientv3.WithCompactPhysical())
|
||||
if err != nil {
|
||||
t.Fatalf("Error compacting: %v", err)
|
||||
}
|
||||
|
||||
// Make sure we can still watch from 0 and receive an ADDED event
|
||||
w, err = store.Watch(ctx, key, "0", storage.Everything)
|
||||
if err != nil {
|
||||
t.Fatalf("Watch failed: %v", err)
|
||||
}
|
||||
testCheckResult(t, 2, watch.Added, w, out)
|
||||
}
|
||||
|
||||
// TestWatchFromNoneZero tests that
|
||||
|
|
Loading…
Reference in New Issue