mirror of https://github.com/k3s-io/k3s
etcd3/watcher: Event.Object should have the same rev as etcd delete
instead of previous object's revision.pull/6/head
parent
def7639457
commit
da7e9783e8
|
@ -322,7 +322,11 @@ func prepareObjs(ctx context.Context, e *event, client *clientv3.Client, codec r
|
|||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
oldObj, err = decodeObj(codec, versioner, getResp.Kvs[0].Value, getResp.Kvs[0].ModRevision)
|
||||
// Note that this sends the *old* object with the etcd revision for the time at
|
||||
// which it gets deleted.
|
||||
// We assume old object is returned only in Deleted event. Users (e.g. cacher) need
|
||||
// to have larger than previous rev to tell the ordering.
|
||||
oldObj, err = decodeObj(codec, versioner, getResp.Kvs[0].Value, e.rev)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
|
|
@ -20,11 +20,11 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"sync"
|
||||
|
||||
"github.com/coreos/etcd/clientv3"
|
||||
"github.com/coreos/etcd/integration"
|
||||
"golang.org/x/net/context"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
|
@ -123,7 +123,7 @@ func TestDeleteTriggerWatch(t *testing.T) {
|
|||
if err := store.Delete(ctx, key, &api.Pod{}, nil); err != nil {
|
||||
t.Fatalf("Delete failed: %v", err)
|
||||
}
|
||||
testCheckResult(t, 0, watch.Deleted, w, storedObj)
|
||||
testCheckResult(t, 0, watch.Deleted, w, nil)
|
||||
}
|
||||
|
||||
// TestWatchSync tests that
|
||||
|
@ -213,6 +213,36 @@ func TestWatchErrResultNotBlockAfterCancel(t *testing.T) {
|
|||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestWatchDeleteEventObjectShouldHaveLatestRV(t *testing.T) {
|
||||
ctx, store, cluster := testSetup(t)
|
||||
defer cluster.Terminate(t)
|
||||
key, storedObj := testPropogateStore(t, store, ctx, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}})
|
||||
|
||||
w, err := store.Watch(ctx, key, storedObj.ResourceVersion, storage.Everything)
|
||||
if err != nil {
|
||||
t.Fatalf("Watch failed: %v", err)
|
||||
}
|
||||
etcdW := cluster.RandClient().Watch(ctx, "/", clientv3.WithPrefix())
|
||||
|
||||
if err := store.Delete(ctx, key, &api.Pod{}, &storage.Preconditions{}); err != nil {
|
||||
t.Fatalf("Delete failed: %v", err)
|
||||
}
|
||||
|
||||
e := <-w.ResultChan()
|
||||
watchedDeleteObj := e.Object.(*api.Pod)
|
||||
var wres clientv3.WatchResponse
|
||||
wres = <-etcdW
|
||||
|
||||
watchedDeleteRev, err := storage.ParseWatchResourceVersion(watchedDeleteObj.ResourceVersion)
|
||||
if err != nil {
|
||||
t.Fatalf("ParseWatchResourceVersion failed: %v", err)
|
||||
}
|
||||
if int64(watchedDeleteRev) != wres.Events[0].Kv.ModRevision {
|
||||
t.Errorf("Object from delete event have version: %v, should be the same as etcd delete's mod rev: %d",
|
||||
watchedDeleteRev, wres.Events[0].Kv.ModRevision)
|
||||
}
|
||||
}
|
||||
|
||||
type testWatchStruct struct {
|
||||
obj *api.Pod
|
||||
expectEvent bool
|
||||
|
|
Loading…
Reference in New Issue