diff --git a/pkg/api/helper.go b/pkg/api/helper.go index a1365c3d3d..7b7174b961 100644 --- a/pkg/api/helper.go +++ b/pkg/api/helper.go @@ -25,19 +25,23 @@ import ( "gopkg.in/v1/yaml" ) -type EncodingInterface interface { +// codec defines methods for serializing and deserializing API +// objects +type codec interface { Encode(obj interface{}) (data []byte, err error) Decode(data []byte) (interface{}, error) DecodeInto(data []byte, obj interface{}) error } -type VersioningInterface interface { +// resourceVersioner provides methods for setting and retrieving +// the resource version from an API object +type resourceVersioner interface { SetResourceVersion(obj interface{}, version uint64) error ResourceVersion(obj interface{}) (uint64, error) } -var Encoding EncodingInterface -var Versioning VersioningInterface +var Codec codec +var ResourceVersioner resourceVersioner var conversionScheme *conversion.Scheme @@ -101,8 +105,8 @@ func init() { }, ) - Encoding = conversionScheme - Versioning = JSONBaseVersioning{} + Codec = conversionScheme + ResourceVersioner = NewJSONBaseResourceVersioner() } // AddKnownTypes registers the types of the arguments to the marshaller of the package api. diff --git a/pkg/api/jsonbase.go b/pkg/api/jsonbase.go index 46b62418f3..80d57c82b0 100644 --- a/pkg/api/jsonbase.go +++ b/pkg/api/jsonbase.go @@ -21,10 +21,15 @@ import ( "reflect" ) -// versionedJSONBase allows access to the version state of a JSONBase object -type JSONBaseVersioning struct{} +// NewJSONBaseVersioner returns a resourceVersioner that can set or retrieve +// ResourceVersion on objects derived from JSONBase. +func NewJSONBaseResourceVersioner() resourceVersioner { + return &jsonBaseResourceVersioner{} +} -func (v JSONBaseVersioning) ResourceVersion(obj interface{}) (uint64, error) { +type jsonBaseResourceVersioner struct{} + +func (v jsonBaseResourceVersioner) ResourceVersion(obj interface{}) (uint64, error) { json, err := FindJSONBaseRO(obj) if err != nil { return 0, err @@ -32,7 +37,7 @@ func (v JSONBaseVersioning) ResourceVersion(obj interface{}) (uint64, error) { return json.ResourceVersion, nil } -func (v JSONBaseVersioning) SetResourceVersion(obj interface{}, version uint64) error { +func (v jsonBaseResourceVersioner) SetResourceVersion(obj interface{}, version uint64) error { json, err := FindJSONBase(obj) if err != nil { return err diff --git a/pkg/api/jsonbase_test.go b/pkg/api/jsonbase_test.go index 413183a0a0..4a81479705 100644 --- a/pkg/api/jsonbase_test.go +++ b/pkg/api/jsonbase_test.go @@ -67,7 +67,7 @@ func TestGenericJSONBase(t *testing.T) { } } -func TestVersioningOfAPI(t *testing.T) { +func TestResourceVersionerOfAPI(t *testing.T) { type T struct { Object interface{} Expected uint64 @@ -77,7 +77,7 @@ func TestVersioningOfAPI(t *testing.T) { "api object with version": {Service{JSONBase: JSONBase{ResourceVersion: 1}}, 1}, "pointer to api object with version": {&Service{JSONBase: JSONBase{ResourceVersion: 1}}, 1}, } - versioning := JSONBaseVersioning{} + versioning := NewJSONBaseResourceVersioner() for key, testCase := range testCases { actual, err := versioning.ResourceVersion(testCase.Object) if err != nil { diff --git a/pkg/registry/etcdregistry.go b/pkg/registry/etcdregistry.go index 04e04c552e..f6aea24e7d 100644 --- a/pkg/registry/etcdregistry.go +++ b/pkg/registry/etcdregistry.go @@ -32,7 +32,6 @@ import ( // EtcdRegistry implements PodRegistry, ControllerRegistry and ServiceRegistry with backed by etcd. type EtcdRegistry struct { - client tools.EtcdClient helper tools.EtcdHelper machines MinionRegistry manifestFactory ManifestFactory @@ -44,8 +43,7 @@ type EtcdRegistry struct { // 'scheduler' is the scheduling algorithm to use. func MakeEtcdRegistry(client tools.EtcdClient, machines MinionRegistry) *EtcdRegistry { registry := &EtcdRegistry{ - client: client, - helper: tools.EtcdHelper{client, api.Encoding, api.Versioning}, + helper: tools.EtcdHelper{client, api.Codec, api.ResourceVersioner}, machines: machines, } registry.manifestFactory = &BasicManifestFactory{ @@ -118,7 +116,7 @@ func (registry *EtcdRegistry) runPod(pod api.Pod, machine string) error { }) if err != nil { // Don't strand stuff. - _, err2 := registry.client.Delete(podKey, false) + err2 := registry.helper.Delete(podKey, false) if err2 != nil { glog.Errorf("Probably stranding a pod, couldn't delete %v: %#v", podKey, err2) } @@ -143,7 +141,7 @@ func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error // First delete the pod, so a scheduler doesn't notice it getting removed from the // machine and attempt to put it somewhere. podKey := makePodKey(machine, podID) - _, err := registry.client.Delete(podKey, true) + err := registry.helper.Delete(podKey, true) if tools.IsEtcdNotFound(err) { return apiserver.NewNotFoundErr("pod", podID) } @@ -247,7 +245,7 @@ func (registry *EtcdRegistry) UpdateController(controller api.ReplicationControl // DeleteController deletes a ReplicationController specified by its ID. func (registry *EtcdRegistry) DeleteController(controllerID string) error { key := makeControllerKey(controllerID) - _, err := registry.client.Delete(key, false) + err := registry.helper.Delete(key, false) if tools.IsEtcdNotFound(err) { return apiserver.NewNotFoundErr("replicationController", controllerID) } @@ -295,7 +293,7 @@ func makeServiceEndpointsKey(name string) string { // DeleteService deletes a Service specified by its name. func (registry *EtcdRegistry) DeleteService(name string) error { key := makeServiceKey(name) - _, err := registry.client.Delete(key, true) + err := registry.helper.Delete(key, true) if tools.IsEtcdNotFound(err) { return apiserver.NewNotFoundErr("service", name) } @@ -303,7 +301,7 @@ func (registry *EtcdRegistry) DeleteService(name string) error { return err } key = makeServiceEndpointsKey(name) - _, err = registry.client.Delete(key, true) + err = registry.helper.Delete(key, true) if !tools.IsEtcdNotFound(err) { return err } diff --git a/pkg/tools/etcd_tools.go b/pkg/tools/etcd_tools.go index 2307c1f347..4355ec1b50 100644 --- a/pkg/tools/etcd_tools.go +++ b/pkg/tools/etcd_tools.go @@ -42,13 +42,15 @@ var ( EtcdErrorValueRequired = &etcd.EtcdError{ErrorCode: EtcdErrorCodeValueRequired} ) -type Encoding interface { +// Codec provides methods for transforming Etcd values into objects and back +type Codec interface { Encode(obj interface{}) (data []byte, err error) Decode(data []byte) (interface{}, error) DecodeInto(data []byte, obj interface{}) error } -type Versioning interface { +// ResourceVersioner provides methods for managing object modification tracking +type ResourceVersioner interface { SetResourceVersion(obj interface{}, version uint64) error ResourceVersion(obj interface{}) (uint64, error) } @@ -71,16 +73,17 @@ type EtcdGetSet interface { Get(key string, sort, recursive bool) (*etcd.Response, error) Set(key, value string, ttl uint64) (*etcd.Response, error) Create(key, value string, ttl uint64) (*etcd.Response, error) + Delete(key string, recursive bool) (*etcd.Response, error) CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error) Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) } // EtcdHelper offers common object marshalling/unmarshalling operations on an etcd client. type EtcdHelper struct { - Client EtcdGetSet - Encoding Encoding - // optional - Versioning Versioning + Client EtcdGetSet + Codec Codec + // optional, no atomic operations can be performed without this interface + ResourceVersioner ResourceVersioner } // IsEtcdNotFound returns true iff err is an etcd not found error. @@ -136,7 +139,7 @@ func (h *EtcdHelper) ExtractList(key string, slicePtr interface{}) error { v := pv.Elem() for _, node := range nodes { obj := reflect.New(v.Type().Elem()) - err = h.Encoding.DecodeInto([]byte(node.Value), obj.Interface()) + err = h.Codec.DecodeInto([]byte(node.Value), obj.Interface()) if err != nil { return err } @@ -145,7 +148,7 @@ func (h *EtcdHelper) ExtractList(key string, slicePtr interface{}) error { return nil } -// Unmarshals json found at key into objPtr. On a not found error, will either return +// ExtractObj unmarshals json found at key into objPtr. On a not found error, will either return // a zero object of the requested type, or an error, depending on ignoreNotFound. Treats // empty responses and nil response nodes exactly like a not found error. func (h *EtcdHelper) ExtractObj(key string, objPtr interface{}, ignoreNotFound bool) error { @@ -170,21 +173,22 @@ func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr interface{}, ignoreNot return "", 0, fmt.Errorf("key '%v' found no nodes field: %#v", key, response) } body = response.Node.Value - err = h.Encoding.DecodeInto([]byte(body), objPtr) - if h.Versioning != nil { - _ = h.Versioning.SetResourceVersion(objPtr, response.Node.ModifiedIndex) + err = h.Codec.DecodeInto([]byte(body), objPtr) + if h.ResourceVersioner != nil { + _ = h.ResourceVersioner.SetResourceVersion(objPtr, response.Node.ModifiedIndex) // being unable to set the version does not prevent the object from being extracted } return body, response.Node.ModifiedIndex, err } +// Create adds a new object at a key unless it already exists func (h *EtcdHelper) CreateObj(key string, obj interface{}) error { - data, err := h.Encoding.Encode(obj) + data, err := h.Codec.Encode(obj) if err != nil { return err } - if h.Versioning != nil { - if version, err := h.Versioning.ResourceVersion(obj); err == nil && version != 0 { + if h.ResourceVersioner != nil { + if version, err := h.ResourceVersioner.ResourceVersion(obj); err == nil && version != 0 { return errors.New("resourceVersion may not be set on objects to be created") } } @@ -193,15 +197,21 @@ func (h *EtcdHelper) CreateObj(key string, obj interface{}) error { return err } +// Delete removes the specified key +func (h *EtcdHelper) Delete(key string, recursive bool) error { + _, err := h.Client.Delete(key, recursive) + return err +} + // SetObj marshals obj via json, and stores under key. Will do an // atomic update if obj's ResourceVersion field is set. func (h *EtcdHelper) SetObj(key string, obj interface{}) error { - data, err := h.Encoding.Encode(obj) + data, err := h.Codec.Encode(obj) if err != nil { return err } - if h.Versioning != nil { - if version, err := h.Versioning.ResourceVersion(obj); err == nil && version != 0 { + if h.ResourceVersioner != nil { + if version, err := h.ResourceVersioner.ResourceVersion(obj); err == nil && version != 0 { _, err = h.Client.CompareAndSwap(key, string(data), 0, "", version) return err // err is shadowed! } @@ -253,7 +263,7 @@ func (h *EtcdHelper) AtomicUpdate(key string, ptrToType interface{}, tryUpdate E return err } - data, err := h.Encoding.Encode(ret) + data, err := h.Codec.Encode(ret) if err != nil { return err } @@ -288,7 +298,7 @@ func Everything(interface{}) bool { // API objects, and any items passing 'filter' are sent down the returned // watch.Interface. func (h *EtcdHelper) WatchList(key string, filter FilterFunc) (watch.Interface, error) { - w := newEtcdWatcher(true, filter, h.Encoding) + w := newEtcdWatcher(true, filter, h.Codec) go w.etcdWatch(h.Client, key) return w, nil } @@ -296,14 +306,14 @@ func (h *EtcdHelper) WatchList(key string, filter FilterFunc) (watch.Interface, // Watch begins watching the specified key. Events are decoded into // API objects and sent down the returned watch.Interface. func (h *EtcdHelper) Watch(key string) (watch.Interface, error) { - w := newEtcdWatcher(false, nil, h.Encoding) + w := newEtcdWatcher(false, nil, h.Codec) go w.etcdWatch(h.Client, key) return w, nil } // etcdWatcher converts a native etcd watch to a watch.Interface. type etcdWatcher struct { - encoding Encoding + encoding Codec list bool // If we're doing a recursive watch, should be true. filter FilterFunc @@ -322,7 +332,7 @@ type etcdWatcher struct { } // Returns a new etcdWatcher; if list is true, watch sub-nodes. -func newEtcdWatcher(list bool, filter FilterFunc, encoding Encoding) *etcdWatcher { +func newEtcdWatcher(list bool, filter FilterFunc, encoding Codec) *etcdWatcher { w := &etcdWatcher{ encoding: encoding, list: list, diff --git a/pkg/tools/etcd_tools_test.go b/pkg/tools/etcd_tools_test.go index 877a821d46..736ee915e4 100644 --- a/pkg/tools/etcd_tools_test.go +++ b/pkg/tools/etcd_tools_test.go @@ -40,8 +40,8 @@ type TestResource struct { } var scheme *conversion.Scheme -var encoding = api.Encoding -var versioning = api.Versioning +var codec = api.Codec +var versioner = api.ResourceVersioner func init() { scheme = conversion.NewScheme() @@ -87,7 +87,7 @@ func TestExtractList(t *testing.T) { {JSONBase: api.JSONBase{ID: "baz"}}, } var got []api.Pod - helper := EtcdHelper{fakeClient, encoding, versioning} + helper := EtcdHelper{fakeClient, codec, versioner} err := helper.ExtractList("/some/key", &got) if err != nil { t.Errorf("Unexpected error %#v", err) @@ -101,7 +101,7 @@ func TestExtractObj(t *testing.T) { fakeClient := MakeFakeEtcdClient(t) expect := api.Pod{JSONBase: api.JSONBase{ID: "foo"}} fakeClient.Set("/some/key", util.MakeJSONString(expect), 0) - helper := EtcdHelper{fakeClient, encoding, versioning} + helper := EtcdHelper{fakeClient, codec, versioner} var got api.Pod err := helper.ExtractObj("/some/key", &got, false) if err != nil { @@ -134,7 +134,7 @@ func TestExtractObjNotFoundErr(t *testing.T) { }, }, } - helper := EtcdHelper{fakeClient, encoding, versioning} + helper := EtcdHelper{fakeClient, codec, versioner} try := func(key string) { var got api.Pod err := helper.ExtractObj(key, &got, false) @@ -155,12 +155,12 @@ func TestExtractObjNotFoundErr(t *testing.T) { func TestSetObj(t *testing.T) { obj := api.Pod{JSONBase: api.JSONBase{ID: "foo"}} fakeClient := MakeFakeEtcdClient(t) - helper := EtcdHelper{fakeClient, encoding, versioning} + helper := EtcdHelper{fakeClient, codec, versioner} err := helper.SetObj("/some/key", obj) if err != nil { t.Errorf("Unexpected error %#v", err) } - data, err := encoding.Encode(obj) + data, err := codec.Encode(obj) if err != nil { t.Errorf("Unexpected error %#v", err) } @@ -184,12 +184,12 @@ func TestSetObjWithVersion(t *testing.T) { }, } - helper := EtcdHelper{fakeClient, encoding, versioning} + helper := EtcdHelper{fakeClient, codec, versioner} err := helper.SetObj("/some/key", obj) if err != nil { t.Fatalf("Unexpected error %#v", err) } - data, err := encoding.Encode(obj) + data, err := codec.Encode(obj) if err != nil { t.Fatalf("Unexpected error %#v", err) } @@ -200,15 +200,15 @@ func TestSetObjWithVersion(t *testing.T) { } } -func TestSetObjWithoutVersioning(t *testing.T) { +func TestSetObjWithoutResourceVersioner(t *testing.T) { obj := api.Pod{JSONBase: api.JSONBase{ID: "foo"}} fakeClient := MakeFakeEtcdClient(t) - helper := EtcdHelper{fakeClient, encoding, nil} + helper := EtcdHelper{fakeClient, codec, nil} err := helper.SetObj("/some/key", obj) if err != nil { t.Errorf("Unexpected error %#v", err) } - data, err := encoding.Encode(obj) + data, err := codec.Encode(obj) if err != nil { t.Errorf("Unexpected error %#v", err) } @@ -222,8 +222,8 @@ func TestSetObjWithoutVersioning(t *testing.T) { func TestAtomicUpdate(t *testing.T) { fakeClient := MakeFakeEtcdClient(t) fakeClient.TestIndex = true - encoding := scheme - helper := EtcdHelper{fakeClient, encoding, api.JSONBaseVersioning{}} + codec := scheme + helper := EtcdHelper{fakeClient, codec, api.NewJSONBaseResourceVersioner()} // Create a new node. fakeClient.ExpectNotFoundGet("/some/key") @@ -234,7 +234,7 @@ func TestAtomicUpdate(t *testing.T) { if err != nil { t.Errorf("Unexpected error %#v", err) } - data, err := encoding.Encode(obj) + data, err := codec.Encode(obj) if err != nil { t.Errorf("Unexpected error %#v", err) } @@ -259,7 +259,7 @@ func TestAtomicUpdate(t *testing.T) { if err != nil { t.Errorf("Unexpected error %#v", err) } - data, err = encoding.Encode(objUpdate) + data, err = codec.Encode(objUpdate) if err != nil { t.Errorf("Unexpected error %#v", err) } @@ -277,8 +277,8 @@ func TestAtomicUpdate(t *testing.T) { func TestAtomicUpdate_CreateCollision(t *testing.T) { fakeClient := MakeFakeEtcdClient(t) fakeClient.TestIndex = true - encoding := scheme - helper := EtcdHelper{fakeClient, encoding, api.JSONBaseVersioning{}} + codec := scheme + helper := EtcdHelper{fakeClient, codec, api.NewJSONBaseResourceVersioner()} fakeClient.ExpectNotFoundGet("/some/key") @@ -317,7 +317,7 @@ func TestAtomicUpdate_CreateCollision(t *testing.T) { // Check that stored TestResource has received all updates. body := fakeClient.Data["/some/key"].R.Node.Value stored := &TestResource{} - if err := encoding.DecodeInto([]byte(body), stored); err != nil { + if err := codec.DecodeInto([]byte(body), stored); err != nil { t.Errorf("Error decoding stored value: %v", body) } if stored.Value != concurrency { @@ -329,9 +329,9 @@ func TestWatchInterpretation_ListAdd(t *testing.T) { w := newEtcdWatcher(true, func(interface{}) bool { t.Errorf("unexpected filter call") return true - }, encoding) + }, codec) pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} - podBytes, _ := encoding.Encode(pod) + podBytes, _ := codec.Encode(pod) go w.sendResult(&etcd.Response{ Action: "set", @@ -353,9 +353,9 @@ func TestWatchInterpretation_Delete(t *testing.T) { w := newEtcdWatcher(true, func(interface{}) bool { t.Errorf("unexpected filter call") return true - }, encoding) + }, codec) pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} - podBytes, _ := encoding.Encode(pod) + podBytes, _ := codec.Encode(pod) go w.sendResult(&etcd.Response{ Action: "delete", @@ -377,7 +377,7 @@ func TestWatchInterpretation_ResponseNotSet(t *testing.T) { w := newEtcdWatcher(false, func(interface{}) bool { t.Errorf("unexpected filter call") return true - }, encoding) + }, codec) w.emit = func(e watch.Event) { t.Errorf("Unexpected emit: %v", e) } @@ -391,7 +391,7 @@ func TestWatchInterpretation_ResponseNoNode(t *testing.T) { w := newEtcdWatcher(false, func(interface{}) bool { t.Errorf("unexpected filter call") return true - }, encoding) + }, codec) w.emit = func(e watch.Event) { t.Errorf("Unexpected emit: %v", e) } @@ -404,7 +404,7 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) { w := newEtcdWatcher(false, func(interface{}) bool { t.Errorf("unexpected filter call") return true - }, encoding) + }, codec) w.emit = func(e watch.Event) { t.Errorf("Unexpected emit: %v", e) } @@ -418,7 +418,7 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) { func TestWatch(t *testing.T) { fakeClient := MakeFakeEtcdClient(t) - h := EtcdHelper{fakeClient, encoding, versioning} + h := EtcdHelper{fakeClient, codec, versioner} watching, err := h.Watch("/some/key") if err != nil { @@ -429,7 +429,7 @@ func TestWatch(t *testing.T) { // Test normal case pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} - podBytes, _ := encoding.Encode(pod) + podBytes, _ := codec.Encode(pod) fakeClient.WatchResponse <- &etcd.Response{ Action: "set", Node: &etcd.Node{ @@ -459,7 +459,7 @@ func TestWatch(t *testing.T) { func TestWatchPurposefulShutdown(t *testing.T) { fakeClient := MakeFakeEtcdClient(t) - h := EtcdHelper{fakeClient, encoding, versioning} + h := EtcdHelper{fakeClient, codec, versioner} // Test purposeful shutdown watching, err := h.Watch("/some/key")