Merge pull request #740 from brendandburns/sync

Switched Set to Create in etcd tools
pull/6/head
Daniel Smith 2014-08-04 13:15:08 -07:00
commit f36d2a7ebd
3 changed files with 29 additions and 12 deletions

View File

@ -311,5 +311,6 @@ func (registry *EtcdRegistry) UpdateService(svc api.Service) error {
// UpdateEndpoints update Endpoints of a Service.
func (registry *EtcdRegistry) UpdateEndpoints(e api.Endpoints) error {
return registry.helper.SetObj("/registry/services/endpoints/"+e.ID, e)
updateFunc := func(interface{}) (interface{}, error) { return e, nil }
return registry.helper.AtomicUpdate("/registry/services/endpoints/"+e.ID, &api.Endpoints{}, updateFunc)
}

View File

@ -66,6 +66,7 @@ func TestEtcdGetPodNotFound(t *testing.T) {
func TestEtcdCreatePod(t *testing.T) {
fakeClient := tools.MakeFakeEtcdClient(t)
fakeClient.TestIndex = true
fakeClient.Data["/registry/hosts/machine/pods/foo"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
@ -231,6 +232,7 @@ func TestEtcdCreatePodWithContainersNotFound(t *testing.T) {
func TestEtcdCreatePodWithExistingContainers(t *testing.T) {
fakeClient := tools.MakeFakeEtcdClient(t)
fakeClient.TestIndex = true
fakeClient.Data["/registry/hosts/machine/pods/foo"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
@ -289,6 +291,8 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) {
func TestEtcdDeletePod(t *testing.T) {
fakeClient := tools.MakeFakeEtcdClient(t)
fakeClient.TestIndex = true
key := "/registry/hosts/machine/pods/foo"
fakeClient.Set(key, util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), 0)
fakeClient.Set("/registry/hosts/machine/kubelet", api.EncodeOrDie(&api.ContainerManifestList{
@ -320,6 +324,8 @@ func TestEtcdDeletePod(t *testing.T) {
func TestEtcdDeletePodMultipleContainers(t *testing.T) {
fakeClient := tools.MakeFakeEtcdClient(t)
fakeClient.TestIndex = true
key := "/registry/hosts/machine/pods/foo"
fakeClient.Set(key, util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), 0)
fakeClient.Set("/registry/hosts/machine/kubelet", api.EncodeOrDie(&api.ContainerManifestList{
@ -570,10 +576,12 @@ func TestEtcdCreateController(t *testing.T) {
func TestEtcdUpdateController(t *testing.T) {
fakeClient := tools.MakeFakeEtcdClient(t)
fakeClient.Set("/registry/controllers/foo", util.MakeJSONString(api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), 0)
fakeClient.TestIndex = true
resp, _ := fakeClient.Set("/registry/controllers/foo", util.MakeJSONString(api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), 0)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
err := registry.UpdateController(api.ReplicationController{
JSONBase: api.JSONBase{ID: "foo"},
JSONBase: api.JSONBase{ID: "foo", ResourceVersion: resp.Node.ModifiedIndex},
DesiredState: api.ReplicationControllerState{
Replicas: 2,
},
@ -701,10 +709,12 @@ func TestEtcdDeleteService(t *testing.T) {
func TestEtcdUpdateService(t *testing.T) {
fakeClient := tools.MakeFakeEtcdClient(t)
fakeClient.Set("/registry/services/specs/foo", util.MakeJSONString(api.Service{JSONBase: api.JSONBase{ID: "foo"}}), 0)
fakeClient.TestIndex = true
resp, _ := fakeClient.Set("/registry/services/specs/foo", util.MakeJSONString(api.Service{JSONBase: api.JSONBase{ID: "foo"}}), 0)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
testService := api.Service{
JSONBase: api.JSONBase{ID: "foo"},
JSONBase: api.JSONBase{ID: "foo", ResourceVersion: resp.Node.ModifiedIndex},
Labels: map[string]string{
"baz": "bar",
},
@ -722,18 +732,25 @@ func TestEtcdUpdateService(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}
// Clear modified indices before the equality test.
svc.ResourceVersion = 0
testService.ResourceVersion = 0
if !reflect.DeepEqual(*svc, testService) {
t.Errorf("Unexpected service: got %#v, wanted %#v", svc, testService)
t.Errorf("Unexpected service: got\n %#v\n, wanted\n %#v", svc, testService)
}
}
func TestEtcdUpdateEndpoints(t *testing.T) {
fakeClient := tools.MakeFakeEtcdClient(t)
fakeClient.TestIndex = true
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
endpoints := api.Endpoints{
JSONBase: api.JSONBase{ID: "foo"},
Endpoints: []string{"baz", "bar"},
}
fakeClient.Set("/registry/services/endpoints/foo", util.MakeJSONString(api.Endpoints{}), 0)
err := registry.UpdateEndpoints(endpoints)
if err != nil {
t.Errorf("unexpected error: %v", err)

View File

@ -69,6 +69,7 @@ type EtcdClient interface {
type EtcdGetSet interface {
Get(key string, sort, recursive bool) (*etcd.Response, error)
Set(key, value string, ttl uint64) (*etcd.Response, error)
Create(key, value string, ttl uint64) (*etcd.Response, error)
CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error)
Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error)
}
@ -185,8 +186,8 @@ func (h *EtcdHelper) SetObj(key string, obj interface{}) error {
}
}
// TODO: when client supports atomic creation, integrate this with the above.
_, err = h.Client.Set(key, string(data), 0)
// Create will fail if a key already exists.
_, err = h.Client.Create(key, string(data), 0)
return err
}
@ -232,8 +233,6 @@ func (h *EtcdHelper) AtomicUpdate(key string, ptrToType interface{}, tryUpdate E
}
// First time this key has been used, just set.
// TODO: This is racy. Fix when our client supports prevExist. See:
// https://github.com/coreos/etcd/blob/master/Documentation/api.md#atomic-compare-and-swap
if index == 0 {
return h.SetObj(key, ret)
}
@ -350,7 +349,7 @@ func (w *etcdWatcher) sendResult(res *etcd.Response) {
var action watch.EventType
var data []byte
switch res.Action {
case "set":
case "create", "set":
if res.Node == nil {
glog.Errorf("unexpected nil node: %#v", res)
return
@ -376,7 +375,7 @@ func (w *etcdWatcher) sendResult(res *etcd.Response) {
obj, err := w.encoding.Decode(data)
if err != nil {
glog.Errorf("failure to decode api object: '%v' from %#v", string(data), res)
glog.Errorf("failure to decode api object: '%v' from %#v %#v", string(data), res, res.Node)
// TODO: expose an error through watch.Interface?
w.Stop()
return