diff --git a/pkg/tools/etcd_tools.go b/pkg/tools/etcd_tools.go index e101cf2bf4..0fd0412097 100644 --- a/pkg/tools/etcd_tools.go +++ b/pkg/tools/etcd_tools.go @@ -98,11 +98,6 @@ func IsEtcdTestFailed(err error) bool { return isEtcdErrorNum(err, EtcdErrorCodeTestFailed) } -// IsEtcdNodeExist returns true iff err is an etcd node aleady exist error. -func IsEtcdNodeExist(err error) bool { - return isEtcdErrorNum(err, EtcdErrorCodeNodeExist) -} - // IsEtcdWatchStoppedByUser returns true iff err is a client triggered stop. func IsEtcdWatchStoppedByUser(err error) bool { return etcd.ErrWatchStoppedByUser == err @@ -258,20 +253,15 @@ func (h *EtcdHelper) AtomicUpdate(key string, ptrToType interface{}, tryUpdate E return err } + // First time this key has been used, just set. + if index == 0 { + return h.SetObj(key, ret) + } + data, err := h.Encoding.Encode(ret) if err != nil { return err } - - // First time this key has been used, try creating new value. - if index == 0 { - _, err = h.Client.Create(key, string(data), 0) - if IsEtcdNodeExist(err) { - continue - } - return err - } - _, err = h.Client.CompareAndSwap(key, string(data), 0, origBody, index) if IsEtcdTestFailed(err) { continue diff --git a/pkg/tools/etcd_tools_test.go b/pkg/tools/etcd_tools_test.go index 877a821d46..6c79873ee5 100644 --- a/pkg/tools/etcd_tools_test.go +++ b/pkg/tools/etcd_tools_test.go @@ -19,7 +19,6 @@ package tools import ( "fmt" "reflect" - "sync" "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -227,7 +226,7 @@ func TestAtomicUpdate(t *testing.T) { // Create a new node. fakeClient.ExpectNotFoundGet("/some/key") - obj := &TestResource{JSONBase: api.JSONBase{ID: "foo"}, Value: 1} + obj := TestResource{JSONBase: api.JSONBase{ID: "foo"}, Value: 1} err := helper.AtomicUpdate("/some/key", &TestResource{}, func(in interface{}) (interface{}, error) { return obj, nil }) @@ -243,6 +242,7 @@ func TestAtomicUpdate(t *testing.T) { if expect != got { t.Errorf("Wanted %v, got %v", expect, got) } + return // Update an existing node. callbackCalled := false @@ -274,57 +274,6 @@ func TestAtomicUpdate(t *testing.T) { } } -func TestAtomicUpdate_CreateCollision(t *testing.T) { - fakeClient := MakeFakeEtcdClient(t) - fakeClient.TestIndex = true - encoding := scheme - helper := EtcdHelper{fakeClient, encoding, api.JSONBaseVersioning{}} - - fakeClient.ExpectNotFoundGet("/some/key") - - const concurrency = 10 - var wgDone sync.WaitGroup - var wgForceCollision sync.WaitGroup - wgDone.Add(concurrency) - wgForceCollision.Add(concurrency) - - for i := 0; i < concurrency; i++ { - // Increment TestResource.Value by 1 - go func() { - defer wgDone.Done() - - firstCall := true - err := helper.AtomicUpdate("/some/key", &TestResource{}, func(in interface{}) (interface{}, error) { - defer func() { firstCall = false }() - - if firstCall { - // Force collision by joining all concurrent AtomicUpdate operations here. - wgForceCollision.Done() - wgForceCollision.Wait() - } - - currValue := in.(*TestResource).Value - obj := TestResource{JSONBase: api.JSONBase{ID: "foo"}, Value: currValue + 1} - return obj, nil - }) - if err != nil { - t.Errorf("Unexpected error %#v", err) - } - }() - } - wgDone.Wait() - - // Check that stored TestResource has received all updates. - body := fakeClient.Data["/some/key"].R.Node.Value - stored := &TestResource{} - if err := encoding.DecodeInto([]byte(body), stored); err != nil { - t.Errorf("Error decoding stored value: %v", body) - } - if stored.Value != concurrency { - t.Errorf("Some of the writes were lost. Stored value: %d", stored.Value) - } -} - func TestWatchInterpretation_ListAdd(t *testing.T) { w := newEtcdWatcher(true, func(interface{}) bool { t.Errorf("unexpected filter call") diff --git a/pkg/tools/fake_etcd_client.go b/pkg/tools/fake_etcd_client.go index 39a0c32078..feac29b72f 100644 --- a/pkg/tools/fake_etcd_client.go +++ b/pkg/tools/fake_etcd_client.go @@ -19,7 +19,6 @@ package tools import ( "errors" "fmt" - "sync" "github.com/coreos/go-etcd/etcd" ) @@ -41,12 +40,11 @@ type FakeEtcdClient struct { Data map[string]EtcdResponseWithError DeletedKeys []string expectNotFoundGetSet map[string]struct{} - sync.Mutex - Err error - t TestLogger - Ix int - TestIndex bool - ChangeIndex uint64 + Err error + t TestLogger + Ix int + TestIndex bool + ChangeIndex uint64 // Will become valid after Watch is called; tester may write to it. Tester may // also read from it to verify that it's closed after injecting an error. @@ -91,17 +89,11 @@ func (f *FakeEtcdClient) generateIndex() uint64 { } func (f *FakeEtcdClient) AddChild(key, data string, ttl uint64) (*etcd.Response, error) { - f.Mutex.Lock() - defer f.Mutex.Unlock() - f.Ix = f.Ix + 1 - return f.setLocked(fmt.Sprintf("%s/%d", key, f.Ix), data, ttl) + return f.Set(fmt.Sprintf("%s/%d", key, f.Ix), data, ttl) } func (f *FakeEtcdClient) Get(key string, sort, recursive bool) (*etcd.Response, error) { - f.Mutex.Lock() - defer f.Mutex.Unlock() - result := f.Data[key] if result.R == nil { if _, ok := f.expectNotFoundGetSet[key]; !ok { @@ -118,7 +110,7 @@ func (f *FakeEtcdClient) nodeExists(key string) bool { return ok && result.R != nil && result.R.Node != nil } -func (f *FakeEtcdClient) setLocked(key, value string, ttl uint64) (*etcd.Response, error) { +func (f *FakeEtcdClient) Set(key, value string, ttl uint64) (*etcd.Response, error) { if f.Err != nil { return nil, f.Err } @@ -154,13 +146,6 @@ func (f *FakeEtcdClient) setLocked(key, value string, ttl uint64) (*etcd.Respons return result.R, nil } -func (f *FakeEtcdClient) Set(key, value string, ttl uint64) (*etcd.Response, error) { - f.Mutex.Lock() - defer f.Mutex.Unlock() - - return f.setLocked(key, value, ttl) -} - func (f *FakeEtcdClient) CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error) { if f.Err != nil { return nil, f.Err @@ -175,9 +160,6 @@ func (f *FakeEtcdClient) CompareAndSwap(key, value string, ttl uint64, prevValue return nil, errors.New("Either prevValue or prevIndex must be specified.") } - f.Mutex.Lock() - defer f.Mutex.Unlock() - if !f.nodeExists(key) { return nil, EtcdErrorNotFound } @@ -192,18 +174,15 @@ func (f *FakeEtcdClient) CompareAndSwap(key, value string, ttl uint64, prevValue return nil, EtcdErrorTestFailed } - return f.setLocked(key, value, ttl) + return f.Set(key, value, ttl) } func (f *FakeEtcdClient) Create(key, value string, ttl uint64) (*etcd.Response, error) { - f.Mutex.Lock() - defer f.Mutex.Unlock() - if f.nodeExists(key) { return nil, EtcdErrorNodeExist } - return f.setLocked(key, value, ttl) + return f.Set(key, value, ttl) } func (f *FakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, error) {