diff --git a/pkg/registry/etcd/etcd.go b/pkg/registry/etcd/etcd.go index 0cb49c0246..d63ea71d96 100644 --- a/pkg/registry/etcd/etcd.go +++ b/pkg/registry/etcd/etcd.go @@ -184,7 +184,7 @@ func (r *Registry) setPodHostTo(ctx api.Context, podID, oldMachine, machine stri if err != nil { return nil, err } - err = r.AtomicUpdate(podKey, &api.Pod{}, func(obj runtime.Object) (runtime.Object, error) { + err = r.AtomicUpdate(podKey, &api.Pod{}, false, func(obj runtime.Object) (runtime.Object, error) { pod, ok := obj.(*api.Pod) if !ok { return nil, fmt.Errorf("unexpected object: %#v", obj) @@ -211,7 +211,7 @@ func (r *Registry) assignPod(ctx api.Context, podID string, machine string) erro } // Doing the constraint check this way provides atomicity guarantees. contKey := makeBoundPodsKey(machine) - err = r.AtomicUpdate(contKey, &api.BoundPods{}, func(in runtime.Object) (runtime.Object, error) { + err = r.AtomicUpdate(contKey, &api.BoundPods{}, true, func(in runtime.Object) (runtime.Object, error) { boundPodList := in.(*api.BoundPods) boundPodList.Items = append(boundPodList.Items, *boundPod) if errors := constraint.Allowed(boundPodList.Items); len(errors) > 0 { @@ -261,7 +261,7 @@ func (r *Registry) UpdatePod(ctx api.Context, pod *api.Pod) error { } containerKey := makeBoundPodsKey(podOut.Status.Host) - return r.AtomicUpdate(containerKey, &api.BoundPods{}, func(in runtime.Object) (runtime.Object, error) { + return r.AtomicUpdate(containerKey, &api.BoundPods{}, true, func(in runtime.Object) (runtime.Object, error) { boundPods := in.(*api.BoundPods) for ix := range boundPods.Items { if boundPods.Items[ix].Name == pod.Name { @@ -299,7 +299,7 @@ func (r *Registry) DeletePod(ctx api.Context, podID string) error { } // Next, remove the pod from the machine atomically. contKey := makeBoundPodsKey(machine) - return r.AtomicUpdate(contKey, &api.BoundPods{}, func(in runtime.Object) (runtime.Object, error) { + return r.AtomicUpdate(contKey, &api.BoundPods{}, true, func(in runtime.Object) (runtime.Object, error) { pods := in.(*api.BoundPods) newPods := make([]api.BoundPod, 0, len(pods.Items)) found := false @@ -553,7 +553,7 @@ func (r *Registry) UpdateEndpoints(ctx api.Context, endpoints *api.Endpoints) er return err } // TODO: this is a really bad misuse of AtomicUpdate, need to compute a diff inside the loop. - err = r.AtomicUpdate(key, &api.Endpoints{}, + err = r.AtomicUpdate(key, &api.Endpoints{}, true, func(input runtime.Object) (runtime.Object, error) { // TODO: racy - label query is returning different results for two simultaneous updaters return endpoints, nil diff --git a/pkg/tools/etcd_tools.go b/pkg/tools/etcd_tools.go index 1e5e832423..1aeb7cd939 100644 --- a/pkg/tools/etcd_tools.go +++ b/pkg/tools/etcd_tools.go @@ -310,7 +310,7 @@ type EtcdUpdateFunc func(input runtime.Object) (output runtime.Object, err error // Example: // // h := &util.EtcdHelper{client, encoding, versioning} -// err := h.AtomicUpdate("myKey", &MyType{}, func(input runtime.Object) (runtime.Object, error) { +// err := h.AtomicUpdate("myKey", &MyType{}, true, func(input runtime.Object) (runtime.Object, error) { // // Before this function is called, currentObj has been reset to etcd's current // // contents for "myKey". // @@ -323,7 +323,7 @@ type EtcdUpdateFunc func(input runtime.Object) (output runtime.Object, err error // return cur, nil // }) // -func (h *EtcdHelper) AtomicUpdate(key string, ptrToType runtime.Object, tryUpdate EtcdUpdateFunc) error { +func (h *EtcdHelper) AtomicUpdate(key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate EtcdUpdateFunc) error { v, err := conversion.EnforcePtr(ptrToType) if err != nil { // Panic is appropriate, because this is a programming error. @@ -331,7 +331,7 @@ func (h *EtcdHelper) AtomicUpdate(key string, ptrToType runtime.Object, tryUpdat } for { obj := reflect.New(v.Type()).Interface().(runtime.Object) - origBody, index, err := h.bodyAndExtractObj(key, obj, true) + origBody, index, err := h.bodyAndExtractObj(key, obj, ignoreNotFound) if err != nil { return err } diff --git a/pkg/tools/etcd_tools_test.go b/pkg/tools/etcd_tools_test.go index fadcd06202..621ede84ba 100644 --- a/pkg/tools/etcd_tools_test.go +++ b/pkg/tools/etcd_tools_test.go @@ -456,7 +456,7 @@ func TestAtomicUpdate(t *testing.T) { // Create a new node. fakeClient.ExpectNotFoundGet("/some/key") obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1} - err := helper.AtomicUpdate("/some/key", &TestResource{}, func(in runtime.Object) (runtime.Object, error) { + err := helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, error) { return obj, nil }) if err != nil { @@ -475,7 +475,7 @@ func TestAtomicUpdate(t *testing.T) { // Update an existing node. callbackCalled := false objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 2} - err = helper.AtomicUpdate("/some/key", &TestResource{}, func(in runtime.Object) (runtime.Object, error) { + err = helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, error) { callbackCalled = true if in.(*TestResource).Value != 1 { @@ -510,7 +510,7 @@ func TestAtomicUpdateNoChange(t *testing.T) { // Create a new node. fakeClient.ExpectNotFoundGet("/some/key") obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1} - err := helper.AtomicUpdate("/some/key", &TestResource{}, func(in runtime.Object) (runtime.Object, error) { + err := helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, error) { return obj, nil }) if err != nil { @@ -521,7 +521,7 @@ func TestAtomicUpdateNoChange(t *testing.T) { callbackCalled := false objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1} fakeClient.Err = errors.New("should not be called") - err = helper.AtomicUpdate("/some/key", &TestResource{}, func(in runtime.Object) (runtime.Object, error) { + err = helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, error) { callbackCalled = true return objUpdate, nil }) @@ -533,6 +533,32 @@ func TestAtomicUpdateNoChange(t *testing.T) { } } +func TestAtomicUpdateKeyNotFound(t *testing.T) { + fakeClient := NewFakeEtcdClient(t) + fakeClient.TestIndex = true + helper := EtcdHelper{fakeClient, codec, versioner} + + // Create a new node. + fakeClient.ExpectNotFoundGet("/some/key") + obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1} + + f := func(in runtime.Object) (runtime.Object, error) { + return obj, nil + } + + ignoreNotFound := false + err := helper.AtomicUpdate("/some/key", &TestResource{}, ignoreNotFound, f) + if err == nil { + t.Errorf("Expected error for key not found.") + } + + ignoreNotFound = true + err = helper.AtomicUpdate("/some/key", &TestResource{}, ignoreNotFound, f) + if err != nil { + t.Errorf("Unexpected error %v.", err) + } +} + func TestAtomicUpdate_CreateCollision(t *testing.T) { fakeClient := NewFakeEtcdClient(t) fakeClient.TestIndex = true @@ -552,7 +578,7 @@ func TestAtomicUpdate_CreateCollision(t *testing.T) { defer wgDone.Done() firstCall := true - err := helper.AtomicUpdate("/some/key", &TestResource{}, func(in runtime.Object) (runtime.Object, error) { + err := helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, error) { defer func() { firstCall = false }() if firstCall {