diff --git a/pkg/registry/event/registry.go b/pkg/registry/event/registry.go index a3fba17f6c..fda5e32aa3 100644 --- a/pkg/registry/event/registry.go +++ b/pkg/registry/event/registry.go @@ -44,7 +44,7 @@ func NewEtcdRegistry(h tools.EtcdHelper, ttl uint64) generic.Registry { KeyFunc: func(ctx api.Context, id string) (string, error) { return etcdgeneric.NamespaceKeyFunc(ctx, prefix, id) }, - TTLFunc: func(runtime.Object, bool) (uint64, error) { + TTLFunc: func(runtime.Object, uint64, bool) (uint64, error) { return ttl, nil }, Helper: h, diff --git a/pkg/registry/generic/etcd/etcd.go b/pkg/registry/generic/etcd/etcd.go index ce2923b47a..ee148c7f43 100644 --- a/pkg/registry/generic/etcd/etcd.go +++ b/pkg/registry/generic/etcd/etcd.go @@ -73,8 +73,9 @@ type Etcd struct { ObjectNameFunc func(obj runtime.Object) (string, error) // Return the TTL objects should be persisted with. Update is true if this - // is an operation against an existing object. - TTLFunc func(obj runtime.Object, update bool) (uint64, error) + // is an operation against an existing object. Existing is the current TTL + // or the default for this operation. + TTLFunc func(obj runtime.Object, existing uint64, update bool) (uint64, error) // Returns a matcher corresponding to the provided labels and fields. PredicateFunc func(label labels.Selector, field fields.Selector) generic.Matcher @@ -185,12 +186,9 @@ func (e *Etcd) CreateWithName(ctx api.Context, name string, obj runtime.Object) return err } } - ttl := uint64(0) - if e.TTLFunc != nil { - ttl, err = e.TTLFunc(obj, false) - if err != nil { - return err - } + ttl, err := e.calculateTTL(obj, 0, false) + if err != nil { + return err } err = e.Helper.CreateObj(key, obj, nil, ttl) err = etcderr.InterpretCreateError(err, e.EndpointName, name) @@ -215,12 +213,9 @@ func (e *Etcd) Create(ctx api.Context, obj runtime.Object) (runtime.Object, erro if err != nil { return nil, err } - ttl := uint64(0) - if e.TTLFunc != nil { - ttl, err = e.TTLFunc(obj, false) - if err != nil { - return nil, err - } + ttl, err := e.calculateTTL(obj, 0, false) + if err != nil { + return nil, err } trace.Step("About to create object") out := e.NewFunc() @@ -250,12 +245,9 @@ func (e *Etcd) UpdateWithName(ctx api.Context, name string, obj runtime.Object) if err != nil { return err } - ttl := uint64(0) - if e.TTLFunc != nil { - ttl, err = e.TTLFunc(obj, true) - if err != nil { - return err - } + ttl, err := e.calculateTTL(obj, 0, true) + if err != nil { + return err } err = e.Helper.SetObj(key, obj, nil, ttl) err = etcderr.InterpretUpdateError(err, e.EndpointName, name) @@ -282,49 +274,46 @@ func (e *Etcd) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool // TODO: expose TTL creating := false out := e.NewFunc() - err = e.Helper.GuaranteedUpdate(key, out, true, func(existing runtime.Object) (runtime.Object, uint64, error) { + err = e.Helper.GuaranteedUpdate(key, out, true, func(existing runtime.Object, res tools.ResponseMeta) (runtime.Object, *uint64, error) { version, err := e.Helper.Versioner.ObjectResourceVersion(existing) if err != nil { - return nil, 0, err + return nil, nil, err } if version == 0 { if !e.UpdateStrategy.AllowCreateOnUpdate() { - return nil, 0, kubeerr.NewNotFound(e.EndpointName, name) + return nil, nil, kubeerr.NewNotFound(e.EndpointName, name) } creating = true if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil { - return nil, 0, err + return nil, nil, err } - ttl := uint64(0) - if e.TTLFunc != nil { - ttl, err = e.TTLFunc(obj, false) - if err != nil { - return nil, 0, err - } + ttl, err := e.calculateTTL(obj, 0, false) + if err != nil { + return nil, nil, err } - return obj, ttl, nil + return obj, &ttl, nil } creating = false newVersion, err := e.Helper.Versioner.ObjectResourceVersion(obj) if err != nil { - return nil, 0, err + return nil, nil, err } if newVersion != version { // TODO: return the most recent version to a client? - return nil, 0, kubeerr.NewConflict(e.EndpointName, name, fmt.Errorf("the resource was updated to %d", version)) + return nil, nil, kubeerr.NewConflict(e.EndpointName, name, fmt.Errorf("the resource was updated to %d", version)) } if err := rest.BeforeUpdate(e.UpdateStrategy, ctx, obj, existing); err != nil { - return nil, 0, err + return nil, nil, err } - ttl := uint64(0) - if e.TTLFunc != nil { - ttl, err = e.TTLFunc(obj, true) - if err != nil { - return nil, 0, err - } + ttl, err := e.calculateTTL(obj, res.TTL, true) + if err != nil { + return nil, nil, err } - return obj, ttl, nil + if int64(ttl) != res.TTL { + return obj, &ttl, nil + } + return obj, nil, nil }) if err != nil { @@ -480,3 +469,19 @@ func (e *Etcd) WatchPredicate(ctx api.Context, m generic.Matcher, resourceVersio return e.Helper.WatchList(e.KeyRootFunc(ctx), version, filterFunc) } + +// calculateTTL is a helper for retrieving the updated TTL for an object or returning an error +// if the TTL cannot be calculated. The defaultTTL is changed to 1 if less than zero. Zero means +// no TTL, not expire immediately. +func (e *Etcd) calculateTTL(obj runtime.Object, defaultTTL int64, update bool) (ttl uint64, err error) { + // etcd may return a negative TTL for a node if the expiration has not occured due + // to server lag - we will ensure that the value is at least set. + if defaultTTL < 0 { + defaultTTL = 1 + } + ttl = uint64(defaultTTL) + if e.TTLFunc != nil { + ttl, err = e.TTLFunc(obj, ttl, update) + } + return ttl, err +} diff --git a/pkg/registry/pod/etcd/etcd.go b/pkg/registry/pod/etcd/etcd.go index cf8e6816ee..05538b9c8b 100644 --- a/pkg/registry/pod/etcd/etcd.go +++ b/pkg/registry/pod/etcd/etcd.go @@ -142,13 +142,16 @@ func (r *BindingREST) setPodHostAndAnnotations(ctx api.Context, podID, oldMachin if err != nil { return nil, err } - err = r.store.Helper.GuaranteedUpdate(podKey, &api.Pod{}, false, func(obj runtime.Object) (runtime.Object, uint64, error) { + err = r.store.Helper.GuaranteedUpdate(podKey, &api.Pod{}, false, tools.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) { pod, ok := obj.(*api.Pod) if !ok { - return nil, 0, fmt.Errorf("unexpected object: %#v", obj) + return nil, fmt.Errorf("unexpected object: %#v", obj) + } + if pod.DeletionTimestamp != nil { + return nil, fmt.Errorf("pod %s is being deleted, cannot be assigned to a host", pod.Name) } if pod.Spec.NodeName != oldMachine { - return nil, 0, fmt.Errorf("pod %v is already assigned to node %q", pod.Name, pod.Spec.NodeName) + return nil, fmt.Errorf("pod %v is already assigned to node %q", pod.Name, pod.Spec.NodeName) } pod.Spec.NodeName = machine if pod.Annotations == nil { @@ -158,8 +161,8 @@ func (r *BindingREST) setPodHostAndAnnotations(ctx api.Context, podID, oldMachin pod.Annotations[k] = v } finalPod = pod - return pod, 0, nil - }) + return pod, nil + })) return finalPod, err } diff --git a/pkg/registry/service/allocator/etcd/etcd.go b/pkg/registry/service/allocator/etcd/etcd.go index c2460405d6..f1a4981b27 100644 --- a/pkg/registry/service/allocator/etcd/etcd.go +++ b/pkg/registry/service/allocator/etcd/etcd.go @@ -141,25 +141,25 @@ func (e *Etcd) Release(item int) error { // tryUpdate performs a read-update to persist the latest snapshot state of allocation. func (e *Etcd) tryUpdate(fn func() error) error { err := e.helper.GuaranteedUpdate(e.baseKey, &api.RangeAllocation{}, true, - func(input runtime.Object) (output runtime.Object, ttl uint64, err error) { + tools.SimpleUpdate(func(input runtime.Object) (output runtime.Object, err error) { existing := input.(*api.RangeAllocation) if len(existing.ResourceVersion) == 0 { - return nil, 0, fmt.Errorf("cannot allocate resources of type %s at this time", e.kind) + return nil, fmt.Errorf("cannot allocate resources of type %s at this time", e.kind) } if existing.ResourceVersion != e.last { if err := e.alloc.Restore(existing.Range, existing.Data); err != nil { - return nil, 0, err + return nil, err } if err := fn(); err != nil { - return nil, 0, err + return nil, err } } e.last = existing.ResourceVersion rangeSpec, data := e.alloc.Snapshot() existing.Range = rangeSpec existing.Data = data - return existing, 0, nil - }, + return existing, nil + }), ) return etcderr.InterpretUpdateError(err, e.kind, "") } @@ -198,19 +198,19 @@ func (e *Etcd) CreateOrUpdate(snapshot *api.RangeAllocation) error { last := "" err := e.helper.GuaranteedUpdate(e.baseKey, &api.RangeAllocation{}, true, - func(input runtime.Object) (output runtime.Object, ttl uint64, err error) { + tools.SimpleUpdate(func(input runtime.Object) (output runtime.Object, err error) { existing := input.(*api.RangeAllocation) switch { case len(snapshot.ResourceVersion) != 0 && len(existing.ResourceVersion) != 0: if snapshot.ResourceVersion != existing.ResourceVersion { - return nil, 0, k8serr.NewConflict(e.kind, "", fmt.Errorf("the provided resource version does not match")) + return nil, k8serr.NewConflict(e.kind, "", fmt.Errorf("the provided resource version does not match")) } case len(existing.ResourceVersion) != 0: - return nil, 0, k8serr.NewConflict(e.kind, "", fmt.Errorf("another caller has already initialized the resource")) + return nil, k8serr.NewConflict(e.kind, "", fmt.Errorf("another caller has already initialized the resource")) } last = snapshot.ResourceVersion - return snapshot, 0, nil - }, + return snapshot, nil + }), ) if err != nil { return etcderr.InterpretUpdateError(err, e.kind, "") diff --git a/pkg/tools/etcd_helper.go b/pkg/tools/etcd_helper.go index 3976d3ea3f..d60cf68b43 100644 --- a/pkg/tools/etcd_helper.go +++ b/pkg/tools/etcd_helper.go @@ -335,23 +335,25 @@ func (h *EtcdHelper) ExtractObjToList(key string, listObj runtime.Object) error // empty responses and nil response nodes exactly like a not found error. func (h *EtcdHelper) ExtractObj(key string, objPtr runtime.Object, ignoreNotFound bool) error { key = h.PrefixEtcdKey(key) - _, _, err := h.bodyAndExtractObj(key, objPtr, ignoreNotFound) + _, _, _, err := h.bodyAndExtractObj(key, objPtr, ignoreNotFound) return err } -func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr runtime.Object, ignoreNotFound bool) (body string, modifiedIndex uint64, err error) { +// bodyAndExtractObj performs the normal Get path to etcd, returning the parsed node and response for additional information +// about the response, like the current etcd index and the ttl. +func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr runtime.Object, ignoreNotFound bool) (body string, node *etcd.Node, res *etcd.Response, err error) { startTime := time.Now() response, err := h.Client.Get(key, false, false) recordEtcdRequestLatency("get", getTypeName(objPtr), startTime) if err != nil && !IsEtcdNotFound(err) { - return "", 0, err + return "", nil, nil, err } - return h.extractObj(response, err, objPtr, ignoreNotFound, false) + body, node, err = h.extractObj(response, err, objPtr, ignoreNotFound, false) + return body, node, response, err } -func (h *EtcdHelper) extractObj(response *etcd.Response, inErr error, objPtr runtime.Object, ignoreNotFound, prevNode bool) (body string, modifiedIndex uint64, err error) { - var node *etcd.Node +func (h *EtcdHelper) extractObj(response *etcd.Response, inErr error, objPtr runtime.Object, ignoreNotFound, prevNode bool) (body string, node *etcd.Node, err error) { if response != nil { if prevNode { node = response.PrevNode @@ -363,14 +365,14 @@ func (h *EtcdHelper) extractObj(response *etcd.Response, inErr error, objPtr run if ignoreNotFound { v, err := conversion.EnforcePtr(objPtr) if err != nil { - return "", 0, err + return "", nil, err } v.Set(reflect.Zero(v.Type())) - return "", 0, nil + return "", nil, nil } else if inErr != nil { - return "", 0, inErr + return "", nil, inErr } - return "", 0, fmt.Errorf("unable to locate a value on the response: %#v", response) + return "", nil, fmt.Errorf("unable to locate a value on the response: %#v", response) } body = node.Value err = h.Codec.DecodeInto([]byte(body), objPtr) @@ -378,7 +380,7 @@ func (h *EtcdHelper) extractObj(response *etcd.Response, inErr error, objPtr run _ = h.Versioner.UpdateObject(objPtr, node) // being unable to set the version does not prevent the object from being extracted } - return body, node.ModifiedIndex, err + return body, node, err } // CreateObj adds a new object at a key unless it already exists. 'ttl' is time-to-live in seconds, @@ -482,9 +484,28 @@ func (h *EtcdHelper) SetObj(key string, obj, out runtime.Object, ttl uint64) err return err } +// ResponseMeta contains information about the etcd metadata that is associated with +// an object. It abstracts the actual underlying objects to prevent coupling with etcd +// and to improve testability. +type ResponseMeta struct { + // TTL is the time to live of the node that contained the returned object. It may be + // zero or negative in some cases (objects may be expired after the requested + // expiration time due to server lag). + TTL int64 +} + // Pass an EtcdUpdateFunc to EtcdHelper.GuaranteedUpdate to make an etcd update that is guaranteed to succeed. // See the comment for GuaranteedUpdate for more detail. -type EtcdUpdateFunc func(input runtime.Object) (output runtime.Object, ttl uint64, err error) +type EtcdUpdateFunc func(input runtime.Object, res ResponseMeta) (output runtime.Object, ttl *uint64, err error) +type SimpleEtcdUpdateFunc func(runtime.Object) (runtime.Object, error) + +// SimpleUpdateFunc converts SimpleEtcdUpdateFunc into EtcdUpdateFunc +func SimpleUpdate(fn SimpleEtcdUpdateFunc) EtcdUpdateFunc { + return func(input runtime.Object, _ ResponseMeta) (runtime.Object, *uint64, error) { + out, err := fn(input) + return out, nil, err + } +} // GuaranteedUpdate calls "tryUpdate()" to update key "key" that is of type "ptrToType". It keeps // calling tryUpdate() and retrying the update until success if there is etcd index conflict. Note that object @@ -495,7 +516,7 @@ type EtcdUpdateFunc func(input runtime.Object) (output runtime.Object, ttl uint6 // Example: // // h := &util.EtcdHelper{client, encoding, versioning} -// err := h.GuaranteedUpdate("myKey", &MyType{}, true, func(input runtime.Object) (runtime.Object, uint64, error) { +// err := h.GuaranteedUpdate("myKey", &MyType{}, true, func(input runtime.Object, res ResponseMeta) (runtime.Object, *uint64, error) { // // Before each invocation of the user-defined function, "input" is reset to etcd's current contents for "myKey". // // cur := input.(*MyType) // Guaranteed to succeed. @@ -503,9 +524,9 @@ type EtcdUpdateFunc func(input runtime.Object) (output runtime.Object, ttl uint6 // // Make a *modification*. // cur.Counter++ // -// // Return the modified object. Return an error to stop iterating. Return a non-zero uint64 to set -// // the TTL on the object. -// return cur, 0, nil +// // Return the modified object. Return an error to stop iterating. Return a uint64 to alter +// // the TTL on the object, or nil to keep it the same value. +// return cur, nil, nil // }) // func (h *EtcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate EtcdUpdateFunc) error { @@ -517,14 +538,33 @@ func (h *EtcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, igno key = h.PrefixEtcdKey(key) for { obj := reflect.New(v.Type()).Interface().(runtime.Object) - origBody, index, err := h.bodyAndExtractObj(key, obj, ignoreNotFound) + origBody, node, res, err := h.bodyAndExtractObj(key, obj, ignoreNotFound) + if err != nil { + return err + } + meta := ResponseMeta{} + if node != nil { + meta.TTL = node.TTL + } + + ret, newTTL, err := tryUpdate(obj, meta) if err != nil { return err } - ret, ttl, err := tryUpdate(obj) - if err != nil { - return err + index := uint64(0) + ttl := uint64(0) + if node != nil { + index = node.ModifiedIndex + if node.TTL > 0 { + ttl = uint64(node.TTL) + } + } else if res != nil { + index = res.EtcdIndex + } + + if newTTL != nil { + ttl = *newTTL } data, err := h.Codec.Encode(ret) diff --git a/pkg/tools/etcd_helper_test.go b/pkg/tools/etcd_helper_test.go index 89400d4061..0da3f2542b 100644 --- a/pkg/tools/etcd_helper_test.go +++ b/pkg/tools/etcd_helper_test.go @@ -529,9 +529,9 @@ func TestGuaranteedUpdate(t *testing.T) { // Create a new node. fakeClient.ExpectNotFoundGet(key) obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1} - err := helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) { - return obj, 0, nil - }) + err := helper.GuaranteedUpdate("/some/key", &TestResource{}, true, SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { + return obj, nil + })) if err != nil { t.Errorf("Unexpected error %#v", err) } @@ -548,15 +548,15 @@ func TestGuaranteedUpdate(t *testing.T) { // Update an existing node. callbackCalled := false objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 2} - err = helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) { + err = helper.GuaranteedUpdate("/some/key", &TestResource{}, true, SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { callbackCalled = true if in.(*TestResource).Value != 1 { t.Errorf("Callback input was not current set value") } - return objUpdate, 0, nil - }) + return objUpdate, nil + })) if err != nil { t.Errorf("Unexpected error %#v", err) } @@ -575,6 +575,107 @@ func TestGuaranteedUpdate(t *testing.T) { } } +func TestGuaranteedUpdateTTL(t *testing.T) { + fakeClient := NewFakeEtcdClient(t) + fakeClient.TestIndex = true + helper := NewEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) + key := etcdtest.AddPrefix("/some/key") + + // Create a new node. + fakeClient.ExpectNotFoundGet(key) + obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1} + err := helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object, res ResponseMeta) (runtime.Object, *uint64, error) { + if res.TTL != 0 { + t.Fatalf("unexpected response meta: %#v", res) + } + ttl := uint64(10) + return obj, &ttl, nil + }) + if err != nil { + t.Errorf("Unexpected error %#v", err) + } + data, err := codec.Encode(obj) + if err != nil { + t.Errorf("Unexpected error %#v", err) + } + expect := string(data) + got := fakeClient.Data[key].R.Node.Value + if expect != got { + t.Errorf("Wanted %v, got %v", expect, got) + } + if fakeClient.Data[key].R.Node.TTL != 10 { + t.Errorf("expected TTL set: %d", fakeClient.Data[key].R.Node.TTL) + } + + // Update an existing node. + callbackCalled := false + objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 2} + err = helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object, res ResponseMeta) (runtime.Object, *uint64, error) { + if res.TTL != 10 { + t.Fatalf("unexpected response meta: %#v", res) + } + callbackCalled = true + + if in.(*TestResource).Value != 1 { + t.Errorf("Callback input was not current set value") + } + + return objUpdate, nil, nil + }) + + if err != nil { + t.Errorf("Unexpected error %#v", err) + } + data, err = codec.Encode(objUpdate) + if err != nil { + t.Errorf("Unexpected error %#v", err) + } + expect = string(data) + got = fakeClient.Data[key].R.Node.Value + if expect != got { + t.Errorf("Wanted %v, got %v", expect, got) + } + if fakeClient.Data[key].R.Node.TTL != 10 { + t.Errorf("expected TTL remained set: %d", fakeClient.Data[key].R.Node.TTL) + } + + // Update an existing node and change ttl + callbackCalled = false + objUpdate = &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 3} + err = helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object, res ResponseMeta) (runtime.Object, *uint64, error) { + if res.TTL != 10 { + t.Fatalf("unexpected response meta: %#v", res) + } + callbackCalled = true + + if in.(*TestResource).Value != 2 { + t.Errorf("Callback input was not current set value") + } + + newTTL := uint64(20) + return objUpdate, &newTTL, nil + }) + if err != nil { + t.Errorf("Unexpected error %#v", err) + } + data, err = codec.Encode(objUpdate) + if err != nil { + t.Errorf("Unexpected error %#v", err) + } + expect = string(data) + got = fakeClient.Data[key].R.Node.Value + if expect != got { + t.Errorf("Wanted %v, got %v", expect, got) + } + if fakeClient.Data[key].R.Node.TTL != 20 { + t.Errorf("expected TTL changed: %d", fakeClient.Data[key].R.Node.TTL) + } + + if !callbackCalled { + t.Errorf("tryUpdate callback should have been called.") + } +} + func TestGuaranteedUpdateNoChange(t *testing.T) { fakeClient := NewFakeEtcdClient(t) fakeClient.TestIndex = true @@ -584,9 +685,9 @@ func TestGuaranteedUpdateNoChange(t *testing.T) { // Create a new node. fakeClient.ExpectNotFoundGet(key) obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1} - err := helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) { - return obj, 0, nil - }) + err := helper.GuaranteedUpdate("/some/key", &TestResource{}, true, SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { + return obj, nil + })) if err != nil { t.Errorf("Unexpected error %#v", err) } @@ -594,11 +695,11 @@ func TestGuaranteedUpdateNoChange(t *testing.T) { // Update an existing node with the same data callbackCalled := false objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1} - err = helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) { + err = helper.GuaranteedUpdate("/some/key", &TestResource{}, true, SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { fakeClient.Err = errors.New("should not be called") callbackCalled = true - return objUpdate, 0, nil - }) + return objUpdate, nil + })) if err != nil { t.Fatalf("Unexpected error %#v", err) } @@ -617,9 +718,9 @@ func TestGuaranteedUpdateKeyNotFound(t *testing.T) { fakeClient.ExpectNotFoundGet(key) obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1} - f := func(in runtime.Object) (runtime.Object, uint64, error) { - return obj, 0, nil - } + f := SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { + return obj, nil + }) ignoreNotFound := false err := helper.GuaranteedUpdate("/some/key", &TestResource{}, ignoreNotFound, f) @@ -654,7 +755,7 @@ func TestGuaranteedUpdate_CreateCollision(t *testing.T) { defer wgDone.Done() firstCall := true - err := helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) { + err := helper.GuaranteedUpdate("/some/key", &TestResource{}, true, SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { defer func() { firstCall = false }() if firstCall { @@ -665,8 +766,8 @@ func TestGuaranteedUpdate_CreateCollision(t *testing.T) { currValue := in.(*TestResource).Value obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: currValue + 1} - return obj, 0, nil - }) + return obj, nil + })) if err != nil { t.Errorf("Unexpected error %#v", err) }