Move etcd helper to util so it can be used elsewhere, too

pull/6/head
Daniel Smith 2014-06-17 16:23:52 -07:00
parent a24116c7bd
commit 65d6280936
6 changed files with 364 additions and 188 deletions

View File

@ -27,7 +27,6 @@ import (
"testing" "testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/coreos/go-etcd/etcd" "github.com/coreos/go-etcd/etcd"
"github.com/fsouza/go-dockerclient" "github.com/fsouza/go-dockerclient"
@ -398,13 +397,13 @@ func (cr *channelReader) GetList() [][]api.ContainerManifest {
} }
func TestGetKubeletStateFromEtcdNoData(t *testing.T) { func TestGetKubeletStateFromEtcdNoData(t *testing.T) {
fakeClient := registry.MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
kubelet := Kubelet{ kubelet := Kubelet{
Client: fakeClient, Client: fakeClient,
} }
channel := make(chan []api.ContainerManifest) channel := make(chan []api.ContainerManifest)
reader := startReading(channel) reader := startReading(channel)
fakeClient.Data["/registry/hosts/machine/kubelet"] = registry.EtcdResponseWithError{ fakeClient.Data["/registry/hosts/machine/kubelet"] = util.EtcdResponseWithError{
R: &etcd.Response{}, R: &etcd.Response{},
E: nil, E: nil,
} }
@ -420,13 +419,13 @@ func TestGetKubeletStateFromEtcdNoData(t *testing.T) {
} }
func TestGetKubeletStateFromEtcd(t *testing.T) { func TestGetKubeletStateFromEtcd(t *testing.T) {
fakeClient := registry.MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
kubelet := Kubelet{ kubelet := Kubelet{
Client: fakeClient, Client: fakeClient,
} }
channel := make(chan []api.ContainerManifest) channel := make(chan []api.ContainerManifest)
reader := startReading(channel) reader := startReading(channel)
fakeClient.Data["/registry/hosts/machine/kubelet"] = registry.EtcdResponseWithError{ fakeClient.Data["/registry/hosts/machine/kubelet"] = util.EtcdResponseWithError{
R: &etcd.Response{ R: &etcd.Response{
Node: &etcd.Node{ Node: &etcd.Node{
Value: util.MakeJSONString([]api.Container{}), Value: util.MakeJSONString([]api.Container{}),
@ -444,13 +443,13 @@ func TestGetKubeletStateFromEtcd(t *testing.T) {
} }
func TestGetKubeletStateFromEtcdNotFound(t *testing.T) { func TestGetKubeletStateFromEtcdNotFound(t *testing.T) {
fakeClient := registry.MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
kubelet := Kubelet{ kubelet := Kubelet{
Client: fakeClient, Client: fakeClient,
} }
channel := make(chan []api.ContainerManifest) channel := make(chan []api.ContainerManifest)
reader := startReading(channel) reader := startReading(channel)
fakeClient.Data["/registry/hosts/machine/kubelet"] = registry.EtcdResponseWithError{ fakeClient.Data["/registry/hosts/machine/kubelet"] = util.EtcdResponseWithError{
R: &etcd.Response{}, R: &etcd.Response{},
E: &etcd.EtcdError{ E: &etcd.EtcdError{
ErrorCode: 100, ErrorCode: 100,
@ -466,13 +465,13 @@ func TestGetKubeletStateFromEtcdNotFound(t *testing.T) {
} }
func TestGetKubeletStateFromEtcdError(t *testing.T) { func TestGetKubeletStateFromEtcdError(t *testing.T) {
fakeClient := registry.MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
kubelet := Kubelet{ kubelet := Kubelet{
Client: fakeClient, Client: fakeClient,
} }
channel := make(chan []api.ContainerManifest) channel := make(chan []api.ContainerManifest)
reader := startReading(channel) reader := startReading(channel)
fakeClient.Data["/registry/hosts/machine/kubelet"] = registry.EtcdResponseWithError{ fakeClient.Data["/registry/hosts/machine/kubelet"] = util.EtcdResponseWithError{
R: &etcd.Response{}, R: &etcd.Response{},
E: &etcd.EtcdError{ E: &etcd.EtcdError{
ErrorCode: 200, // non not found error ErrorCode: 200, // non not found error
@ -554,7 +553,7 @@ func TestSyncManifestsDeletes(t *testing.T) {
} }
func TestEventWriting(t *testing.T) { func TestEventWriting(t *testing.T) {
fakeEtcd := registry.MakeFakeEtcdClient(t) fakeEtcd := util.MakeFakeEtcdClient(t)
kubelet := &Kubelet{ kubelet := &Kubelet{
Client: fakeEtcd, Client: fakeEtcd,
} }
@ -581,7 +580,7 @@ func TestEventWriting(t *testing.T) {
} }
func TestEventWritingError(t *testing.T) { func TestEventWritingError(t *testing.T) {
fakeEtcd := registry.MakeFakeEtcdClient(t) fakeEtcd := util.MakeFakeEtcdClient(t)
kubelet := &Kubelet{ kubelet := &Kubelet{
Client: fakeEtcd, Client: fakeEtcd,
} }

View File

@ -13,18 +13,19 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package registry package registry
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"log" "log"
"reflect"
"github.com/coreos/go-etcd/etcd" "github.com/coreos/go-etcd/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
) )
// TODO: Need to add a reconciler loop that makes sure that things in pods are reflected into // TODO: Need to add a reconciler loop that makes sure that things in pods are reflected into
@ -68,11 +69,15 @@ func makePodKey(machine, podID string) string {
return "/registry/hosts/" + machine + "/pods/" + podID return "/registry/hosts/" + machine + "/pods/" + podID
} }
func (registry *EtcdRegistry) helper() *util.EtcdHelper {
return &util.EtcdHelper{registry.etcdClient}
}
func (registry *EtcdRegistry) ListPods(query labels.Query) ([]api.Pod, error) { func (registry *EtcdRegistry) ListPods(query labels.Query) ([]api.Pod, error) {
pods := []api.Pod{} pods := []api.Pod{}
for _, machine := range registry.machines { for _, machine := range registry.machines {
var machinePods []api.Pod var machinePods []api.Pod
err := registry.extractList("/registry/hosts/"+machine+"/pods", &machinePods) err := registry.helper().ExtractList("/registry/hosts/"+machine+"/pods", &machinePods)
if err != nil { if err != nil {
return pods, err return pods, err
} }
@ -86,80 +91,6 @@ func (registry *EtcdRegistry) ListPods(query labels.Query) ([]api.Pod, error) {
return pods, nil return pods, nil
} }
func (registry *EtcdRegistry) listEtcdNode(key string) ([]*etcd.Node, error) {
result, err := registry.etcdClient.Get(key, false, true)
if err != nil {
nodes := make([]*etcd.Node, 0)
if isEtcdNotFound(err) {
return nodes, nil
} else {
return nodes, err
}
}
return result.Node.Nodes, nil
}
// Extract a go object per etcd node into a slice.
func (r *EtcdRegistry) extractList(key string, slicePtr interface{}) error {
nodes, err := r.listEtcdNode(key)
if err != nil {
return err
}
pv := reflect.ValueOf(slicePtr)
if pv.Type().Kind() != reflect.Ptr || pv.Type().Elem().Kind() != reflect.Slice {
// This should not happen at runtime.
panic("need ptr to slice")
}
v := pv.Elem()
for _, node := range nodes {
obj := reflect.New(v.Type().Elem())
err = json.Unmarshal([]byte(node.Value), obj.Interface())
if err != nil {
return err
}
v.Set(reflect.Append(v, obj.Elem()))
}
return nil
}
// Unmarshals json found at key into objPtr. On a not found error, will either return
// a zero object of the requested type, or an error, depending on ignoreNotFound. Treats
// empty responses and nil response nodes exactly like a not found error.
func (r *EtcdRegistry) extractObj(key string, objPtr interface{}, ignoreNotFound bool) error {
response, err := r.etcdClient.Get(key, false, false)
returnZero := false
if err != nil {
if ignoreNotFound && isEtcdNotFound(err) {
returnZero = true
} else {
return err
}
}
if !returnZero && (response.Node == nil || len(response.Node.Value) == 0) {
if ignoreNotFound {
returnZero = true
} else {
return fmt.Errorf("key '%v' found no nodes field: %#v", key, response)
}
}
if returnZero {
pv := reflect.ValueOf(objPtr)
pv.Elem().Set(reflect.Zero(pv.Type().Elem()))
return nil
}
return json.Unmarshal([]byte(response.Node.Value), objPtr)
}
// json marshals obj, and stores under key.
func (r *EtcdRegistry) setObj(key string, obj interface{}) error {
data, err := json.Marshal(obj)
if err != nil {
return err
}
_, err = r.etcdClient.Set(key, string(data), 0)
return err
}
func (registry *EtcdRegistry) GetPod(podID string) (*api.Pod, error) { func (registry *EtcdRegistry) GetPod(podID string) (*api.Pod, error) {
pod, _, err := registry.findPod(podID) pod, _, err := registry.findPod(podID)
return &pod, err return &pod, err
@ -170,12 +101,12 @@ func makeContainerKey(machine string) string {
} }
func (registry *EtcdRegistry) loadManifests(machine string) (manifests []api.ContainerManifest, err error) { func (registry *EtcdRegistry) loadManifests(machine string) (manifests []api.ContainerManifest, err error) {
err = registry.extractObj(makeContainerKey(machine), &manifests, true) err = registry.helper().ExtractObj(makeContainerKey(machine), &manifests, true)
return return
} }
func (registry *EtcdRegistry) updateManifests(machine string, manifests []api.ContainerManifest) error { func (registry *EtcdRegistry) updateManifests(machine string, manifests []api.ContainerManifest) error {
return registry.setObj(makeContainerKey(machine), manifests) return registry.helper().SetObj(makeContainerKey(machine), manifests)
} }
func (registry *EtcdRegistry) CreatePod(machineIn string, pod api.Pod) error { func (registry *EtcdRegistry) CreatePod(machineIn string, pod api.Pod) error {
@ -249,7 +180,7 @@ func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error
func (registry *EtcdRegistry) getPodForMachine(machine, podID string) (pod api.Pod, err error) { func (registry *EtcdRegistry) getPodForMachine(machine, podID string) (pod api.Pod, err error) {
key := makePodKey(machine, podID) key := makePodKey(machine, podID)
err = registry.extractObj(key, &pod, false) err = registry.helper().ExtractObj(key, &pod, false)
if err != nil { if err != nil {
return return
} }
@ -267,26 +198,9 @@ func (registry *EtcdRegistry) findPod(podID string) (api.Pod, string, error) {
return api.Pod{}, "", fmt.Errorf("pod not found %s", podID) return api.Pod{}, "", fmt.Errorf("pod not found %s", podID)
} }
func isEtcdNotFound(err error) bool {
if err == nil {
return false
}
switch err.(type) {
case *etcd.EtcdError:
etcdError := err.(*etcd.EtcdError)
if etcdError == nil {
return false
}
if etcdError.ErrorCode == 100 {
return true
}
}
return false
}
func (registry *EtcdRegistry) ListControllers() ([]api.ReplicationController, error) { func (registry *EtcdRegistry) ListControllers() ([]api.ReplicationController, error) {
var controllers []api.ReplicationController var controllers []api.ReplicationController
err := registry.extractList("/registry/controllers", &controllers) err := registry.helper().ExtractList("/registry/controllers", &controllers)
return controllers, err return controllers, err
} }
@ -297,7 +211,7 @@ func makeControllerKey(id string) string {
func (registry *EtcdRegistry) GetController(controllerID string) (*api.ReplicationController, error) { func (registry *EtcdRegistry) GetController(controllerID string) (*api.ReplicationController, error) {
var controller api.ReplicationController var controller api.ReplicationController
key := makeControllerKey(controllerID) key := makeControllerKey(controllerID)
err := registry.extractObj(key, &controller, false) err := registry.helper().ExtractObj(key, &controller, false)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -310,7 +224,7 @@ func (registry *EtcdRegistry) CreateController(controller api.ReplicationControl
} }
func (registry *EtcdRegistry) UpdateController(controller api.ReplicationController) error { func (registry *EtcdRegistry) UpdateController(controller api.ReplicationController) error {
return registry.setObj(makeControllerKey(controller.ID), controller) return registry.helper().SetObj(makeControllerKey(controller.ID), controller)
} }
func (registry *EtcdRegistry) DeleteController(controllerID string) error { func (registry *EtcdRegistry) DeleteController(controllerID string) error {
@ -325,18 +239,18 @@ func makeServiceKey(name string) string {
func (registry *EtcdRegistry) ListServices() (api.ServiceList, error) { func (registry *EtcdRegistry) ListServices() (api.ServiceList, error) {
var list api.ServiceList var list api.ServiceList
err := registry.extractList("/registry/services/specs", &list.Items) err := registry.helper().ExtractList("/registry/services/specs", &list.Items)
return list, err return list, err
} }
func (registry *EtcdRegistry) CreateService(svc api.Service) error { func (registry *EtcdRegistry) CreateService(svc api.Service) error {
return registry.setObj(makeServiceKey(svc.ID), svc) return registry.helper().SetObj(makeServiceKey(svc.ID), svc)
} }
func (registry *EtcdRegistry) GetService(name string) (*api.Service, error) { func (registry *EtcdRegistry) GetService(name string) (*api.Service, error) {
key := makeServiceKey(name) key := makeServiceKey(name)
var svc api.Service var svc api.Service
err := registry.extractObj(key, &svc, false) err := registry.helper().ExtractObj(key, &svc, false)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -359,5 +273,5 @@ func (registry *EtcdRegistry) UpdateService(svc api.Service) error {
} }
func (registry *EtcdRegistry) UpdateEndpoints(e api.Endpoints) error { func (registry *EtcdRegistry) UpdateEndpoints(e api.Endpoints) error {
return registry.setObj("/registry/services/endpoints/"+e.Name, e) return registry.helper().SetObj("/registry/services/endpoints/"+e.Name, e)
} }

View File

@ -26,8 +26,16 @@ import (
"github.com/coreos/go-etcd/etcd" "github.com/coreos/go-etcd/etcd"
) )
func MakeTestEtcdRegistry(client EtcdClient, machines []string) *EtcdRegistry {
registry := MakeEtcdRegistry(client, machines)
registry.manifestFactory = &BasicManifestFactory{
serviceRegistry: &MockServiceRegistry{},
}
return registry
}
func TestEtcdGetPod(t *testing.T) { func TestEtcdGetPod(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
fakeClient.Set("/registry/hosts/machine/pods/foo", util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), 0) fakeClient.Set("/registry/hosts/machine/pods/foo", util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), 0)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
pod, err := registry.GetPod("foo") pod, err := registry.GetPod("foo")
@ -38,8 +46,8 @@ func TestEtcdGetPod(t *testing.T) {
} }
func TestEtcdGetPodNotFound(t *testing.T) { func TestEtcdGetPodNotFound(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
fakeClient.Data["/registry/hosts/machine/pods/foo"] = EtcdResponseWithError{ fakeClient.Data["/registry/hosts/machine/pods/foo"] = util.EtcdResponseWithError{
R: &etcd.Response{ R: &etcd.Response{
Node: nil, Node: nil,
}, },
@ -55,8 +63,8 @@ func TestEtcdGetPodNotFound(t *testing.T) {
} }
func TestEtcdCreatePod(t *testing.T) { func TestEtcdCreatePod(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
fakeClient.Data["/registry/hosts/machine/pods/foo"] = EtcdResponseWithError{ fakeClient.Data["/registry/hosts/machine/pods/foo"] = util.EtcdResponseWithError{
R: &etcd.Response{ R: &etcd.Response{
Node: nil, Node: nil,
}, },
@ -97,8 +105,8 @@ func TestEtcdCreatePod(t *testing.T) {
} }
func TestEtcdCreatePodAlreadyExisting(t *testing.T) { func TestEtcdCreatePodAlreadyExisting(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
fakeClient.Data["/registry/hosts/machine/pods/foo"] = EtcdResponseWithError{ fakeClient.Data["/registry/hosts/machine/pods/foo"] = util.EtcdResponseWithError{
R: &etcd.Response{ R: &etcd.Response{
Node: &etcd.Node{ Node: &etcd.Node{
Value: util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), Value: util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}),
@ -118,14 +126,14 @@ func TestEtcdCreatePodAlreadyExisting(t *testing.T) {
} }
func TestEtcdCreatePodWithContainersError(t *testing.T) { func TestEtcdCreatePodWithContainersError(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
fakeClient.Data["/registry/hosts/machine/pods/foo"] = EtcdResponseWithError{ fakeClient.Data["/registry/hosts/machine/pods/foo"] = util.EtcdResponseWithError{
R: &etcd.Response{ R: &etcd.Response{
Node: nil, Node: nil,
}, },
E: &etcd.EtcdError{ErrorCode: 100}, E: &etcd.EtcdError{ErrorCode: 100},
} }
fakeClient.Data["/registry/hosts/machine/kubelet"] = EtcdResponseWithError{ fakeClient.Data["/registry/hosts/machine/kubelet"] = util.EtcdResponseWithError{
R: &etcd.Response{ R: &etcd.Response{
Node: nil, Node: nil,
}, },
@ -150,14 +158,14 @@ func TestEtcdCreatePodWithContainersError(t *testing.T) {
} }
func TestEtcdCreatePodWithContainersNotFound(t *testing.T) { func TestEtcdCreatePodWithContainersNotFound(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
fakeClient.Data["/registry/hosts/machine/pods/foo"] = EtcdResponseWithError{ fakeClient.Data["/registry/hosts/machine/pods/foo"] = util.EtcdResponseWithError{
R: &etcd.Response{ R: &etcd.Response{
Node: nil, Node: nil,
}, },
E: &etcd.EtcdError{ErrorCode: 100}, E: &etcd.EtcdError{ErrorCode: 100},
} }
fakeClient.Data["/registry/hosts/machine/kubelet"] = EtcdResponseWithError{ fakeClient.Data["/registry/hosts/machine/kubelet"] = util.EtcdResponseWithError{
R: &etcd.Response{ R: &etcd.Response{
Node: nil, Node: nil,
}, },
@ -198,8 +206,8 @@ func TestEtcdCreatePodWithContainersNotFound(t *testing.T) {
} }
func TestEtcdCreatePodWithExistingContainers(t *testing.T) { func TestEtcdCreatePodWithExistingContainers(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
fakeClient.Data["/registry/hosts/machine/pods/foo"] = EtcdResponseWithError{ fakeClient.Data["/registry/hosts/machine/pods/foo"] = util.EtcdResponseWithError{
R: &etcd.Response{ R: &etcd.Response{
Node: nil, Node: nil,
}, },
@ -245,7 +253,7 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) {
} }
func TestEtcdDeletePod(t *testing.T) { func TestEtcdDeletePod(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
key := "/registry/hosts/machine/pods/foo" key := "/registry/hosts/machine/pods/foo"
fakeClient.Set(key, util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), 0) fakeClient.Set(key, util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), 0)
fakeClient.Set("/registry/hosts/machine/kubelet", util.MakeJSONString([]api.ContainerManifest{ fakeClient.Set("/registry/hosts/machine/kubelet", util.MakeJSONString([]api.ContainerManifest{
@ -256,11 +264,11 @@ func TestEtcdDeletePod(t *testing.T) {
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
err := registry.DeletePod("foo") err := registry.DeletePod("foo")
expectNoError(t, err) expectNoError(t, err)
if len(fakeClient.deletedKeys) != 1 { if len(fakeClient.DeletedKeys) != 1 {
t.Errorf("Expected 1 delete, found %#v", fakeClient.deletedKeys) t.Errorf("Expected 1 delete, found %#v", fakeClient.DeletedKeys)
} }
if fakeClient.deletedKeys[0] != key { if fakeClient.DeletedKeys[0] != key {
t.Errorf("Unexpected key: %s, expected %s", fakeClient.deletedKeys[0], key) t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key)
} }
response, _ := fakeClient.Get("/registry/hosts/machine/kubelet", false, false) response, _ := fakeClient.Get("/registry/hosts/machine/kubelet", false, false)
if response.Node.Value != "[]" { if response.Node.Value != "[]" {
@ -269,7 +277,7 @@ func TestEtcdDeletePod(t *testing.T) {
} }
func TestEtcdDeletePodMultipleContainers(t *testing.T) { func TestEtcdDeletePodMultipleContainers(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
key := "/registry/hosts/machine/pods/foo" key := "/registry/hosts/machine/pods/foo"
fakeClient.Set(key, util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), 0) fakeClient.Set(key, util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), 0)
fakeClient.Set("/registry/hosts/machine/kubelet", util.MakeJSONString([]api.ContainerManifest{ fakeClient.Set("/registry/hosts/machine/kubelet", util.MakeJSONString([]api.ContainerManifest{
@ -279,11 +287,11 @@ func TestEtcdDeletePodMultipleContainers(t *testing.T) {
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
err := registry.DeletePod("foo") err := registry.DeletePod("foo")
expectNoError(t, err) expectNoError(t, err)
if len(fakeClient.deletedKeys) != 1 { if len(fakeClient.DeletedKeys) != 1 {
t.Errorf("Expected 1 delete, found %#v", fakeClient.deletedKeys) t.Errorf("Expected 1 delete, found %#v", fakeClient.DeletedKeys)
} }
if fakeClient.deletedKeys[0] != key { if fakeClient.DeletedKeys[0] != key {
t.Errorf("Unexpected key: %s, expected %s", fakeClient.deletedKeys[0], key) t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key)
} }
response, _ := fakeClient.Get("/registry/hosts/machine/kubelet", false, false) response, _ := fakeClient.Get("/registry/hosts/machine/kubelet", false, false)
var manifests []api.ContainerManifest var manifests []api.ContainerManifest
@ -297,9 +305,9 @@ func TestEtcdDeletePodMultipleContainers(t *testing.T) {
} }
func TestEtcdEmptyListPods(t *testing.T) { func TestEtcdEmptyListPods(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
key := "/registry/hosts/machine/pods" key := "/registry/hosts/machine/pods"
fakeClient.Data[key] = EtcdResponseWithError{ fakeClient.Data[key] = util.EtcdResponseWithError{
R: &etcd.Response{ R: &etcd.Response{
Node: &etcd.Node{ Node: &etcd.Node{
Nodes: []*etcd.Node{}, Nodes: []*etcd.Node{},
@ -316,9 +324,9 @@ func TestEtcdEmptyListPods(t *testing.T) {
} }
func TestEtcdListPodsNotFound(t *testing.T) { func TestEtcdListPodsNotFound(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
key := "/registry/hosts/machine/pods" key := "/registry/hosts/machine/pods"
fakeClient.Data[key] = EtcdResponseWithError{ fakeClient.Data[key] = util.EtcdResponseWithError{
R: &etcd.Response{}, R: &etcd.Response{},
E: &etcd.EtcdError{ErrorCode: 100}, E: &etcd.EtcdError{ErrorCode: 100},
} }
@ -331,9 +339,9 @@ func TestEtcdListPodsNotFound(t *testing.T) {
} }
func TestEtcdListPods(t *testing.T) { func TestEtcdListPods(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
key := "/registry/hosts/machine/pods" key := "/registry/hosts/machine/pods"
fakeClient.Data[key] = EtcdResponseWithError{ fakeClient.Data[key] = util.EtcdResponseWithError{
R: &etcd.Response{ R: &etcd.Response{
Node: &etcd.Node{ Node: &etcd.Node{
Nodes: []*etcd.Node{ Nodes: []*etcd.Node{
@ -361,9 +369,9 @@ func TestEtcdListPods(t *testing.T) {
} }
func TestEtcdListControllersNotFound(t *testing.T) { func TestEtcdListControllersNotFound(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
key := "/registry/controllers" key := "/registry/controllers"
fakeClient.Data[key] = EtcdResponseWithError{ fakeClient.Data[key] = util.EtcdResponseWithError{
R: &etcd.Response{}, R: &etcd.Response{},
E: &etcd.EtcdError{ErrorCode: 100}, E: &etcd.EtcdError{ErrorCode: 100},
} }
@ -376,9 +384,9 @@ func TestEtcdListControllersNotFound(t *testing.T) {
} }
func TestEtcdListServicesNotFound(t *testing.T) { func TestEtcdListServicesNotFound(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
key := "/registry/services/specs" key := "/registry/services/specs"
fakeClient.Data[key] = EtcdResponseWithError{ fakeClient.Data[key] = util.EtcdResponseWithError{
R: &etcd.Response{}, R: &etcd.Response{},
E: &etcd.EtcdError{ErrorCode: 100}, E: &etcd.EtcdError{ErrorCode: 100},
} }
@ -391,9 +399,9 @@ func TestEtcdListServicesNotFound(t *testing.T) {
} }
func TestEtcdListControllers(t *testing.T) { func TestEtcdListControllers(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
key := "/registry/controllers" key := "/registry/controllers"
fakeClient.Data[key] = EtcdResponseWithError{ fakeClient.Data[key] = util.EtcdResponseWithError{
R: &etcd.Response{ R: &etcd.Response{
Node: &etcd.Node{ Node: &etcd.Node{
Nodes: []*etcd.Node{ Nodes: []*etcd.Node{
@ -417,7 +425,7 @@ func TestEtcdListControllers(t *testing.T) {
} }
func TestEtcdGetController(t *testing.T) { func TestEtcdGetController(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
fakeClient.Set("/registry/controllers/foo", util.MakeJSONString(api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), 0) fakeClient.Set("/registry/controllers/foo", util.MakeJSONString(api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), 0)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
ctrl, err := registry.GetController("foo") ctrl, err := registry.GetController("foo")
@ -428,8 +436,8 @@ func TestEtcdGetController(t *testing.T) {
} }
func TestEtcdGetControllerNotFound(t *testing.T) { func TestEtcdGetControllerNotFound(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
fakeClient.Data["/registry/controllers/foo"] = EtcdResponseWithError{ fakeClient.Data["/registry/controllers/foo"] = util.EtcdResponseWithError{
R: &etcd.Response{ R: &etcd.Response{
Node: nil, Node: nil,
}, },
@ -448,21 +456,21 @@ func TestEtcdGetControllerNotFound(t *testing.T) {
} }
func TestEtcdDeleteController(t *testing.T) { func TestEtcdDeleteController(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
err := registry.DeleteController("foo") err := registry.DeleteController("foo")
expectNoError(t, err) expectNoError(t, err)
if len(fakeClient.deletedKeys) != 1 { if len(fakeClient.DeletedKeys) != 1 {
t.Errorf("Expected 1 delete, found %#v", fakeClient.deletedKeys) t.Errorf("Expected 1 delete, found %#v", fakeClient.DeletedKeys)
} }
key := "/registry/controllers/foo" key := "/registry/controllers/foo"
if fakeClient.deletedKeys[0] != key { if fakeClient.DeletedKeys[0] != key {
t.Errorf("Unexpected key: %s, expected %s", fakeClient.deletedKeys[0], key) t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key)
} }
} }
func TestEtcdCreateController(t *testing.T) { func TestEtcdCreateController(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
err := registry.CreateController(api.ReplicationController{ err := registry.CreateController(api.ReplicationController{
JSONBase: api.JSONBase{ JSONBase: api.JSONBase{
@ -481,7 +489,7 @@ func TestEtcdCreateController(t *testing.T) {
} }
func TestEtcdUpdateController(t *testing.T) { func TestEtcdUpdateController(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
fakeClient.Set("/registry/controllers/foo", util.MakeJSONString(api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), 0) fakeClient.Set("/registry/controllers/foo", util.MakeJSONString(api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), 0)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
err := registry.UpdateController(api.ReplicationController{ err := registry.UpdateController(api.ReplicationController{
@ -498,9 +506,9 @@ func TestEtcdUpdateController(t *testing.T) {
} }
func TestEtcdListServices(t *testing.T) { func TestEtcdListServices(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
key := "/registry/services/specs" key := "/registry/services/specs"
fakeClient.Data[key] = EtcdResponseWithError{ fakeClient.Data[key] = util.EtcdResponseWithError{
R: &etcd.Response{ R: &etcd.Response{
Node: &etcd.Node{ Node: &etcd.Node{
Nodes: []*etcd.Node{ Nodes: []*etcd.Node{
@ -524,8 +532,8 @@ func TestEtcdListServices(t *testing.T) {
} }
func TestEtcdCreateService(t *testing.T) { func TestEtcdCreateService(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
fakeClient.Data["/registry/services/specs/foo"] = EtcdResponseWithError{ fakeClient.Data["/registry/services/specs/foo"] = util.EtcdResponseWithError{
R: &etcd.Response{ R: &etcd.Response{
Node: nil, Node: nil,
}, },
@ -547,7 +555,7 @@ func TestEtcdCreateService(t *testing.T) {
} }
func TestEtcdGetService(t *testing.T) { func TestEtcdGetService(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
fakeClient.Set("/registry/services/specs/foo", util.MakeJSONString(api.Service{JSONBase: api.JSONBase{ID: "foo"}}), 0) fakeClient.Set("/registry/services/specs/foo", util.MakeJSONString(api.Service{JSONBase: api.JSONBase{ID: "foo"}}), 0)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
service, err := registry.GetService("foo") service, err := registry.GetService("foo")
@ -558,8 +566,8 @@ func TestEtcdGetService(t *testing.T) {
} }
func TestEtcdGetServiceNotFound(t *testing.T) { func TestEtcdGetServiceNotFound(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
fakeClient.Data["/registry/services/specs/foo"] = EtcdResponseWithError{ fakeClient.Data["/registry/services/specs/foo"] = util.EtcdResponseWithError{
R: &etcd.Response{ R: &etcd.Response{
Node: nil, Node: nil,
}, },
@ -575,25 +583,25 @@ func TestEtcdGetServiceNotFound(t *testing.T) {
} }
func TestEtcdDeleteService(t *testing.T) { func TestEtcdDeleteService(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
err := registry.DeleteService("foo") err := registry.DeleteService("foo")
expectNoError(t, err) expectNoError(t, err)
if len(fakeClient.deletedKeys) != 2 { if len(fakeClient.DeletedKeys) != 2 {
t.Errorf("Expected 2 delete, found %#v", fakeClient.deletedKeys) t.Errorf("Expected 2 delete, found %#v", fakeClient.DeletedKeys)
} }
key := "/registry/services/specs/foo" key := "/registry/services/specs/foo"
if fakeClient.deletedKeys[0] != key { if fakeClient.DeletedKeys[0] != key {
t.Errorf("Unexpected key: %s, expected %s", fakeClient.deletedKeys[0], key) t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key)
} }
key = "/registry/services/endpoints/foo" key = "/registry/services/endpoints/foo"
if fakeClient.deletedKeys[1] != key { if fakeClient.DeletedKeys[1] != key {
t.Errorf("Unexpected key: %s, expected %s", fakeClient.deletedKeys[1], key) t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[1], key)
} }
} }
func TestEtcdUpdateService(t *testing.T) { func TestEtcdUpdateService(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
fakeClient.Set("/registry/services/specs/foo", util.MakeJSONString(api.Service{JSONBase: api.JSONBase{ID: "foo"}}), 0) fakeClient.Set("/registry/services/specs/foo", util.MakeJSONString(api.Service{JSONBase: api.JSONBase{ID: "foo"}}), 0)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
err := registry.UpdateService(api.Service{ err := registry.UpdateService(api.Service{
@ -610,7 +618,7 @@ func TestEtcdUpdateService(t *testing.T) {
} }
func TestEtcdUpdateEndpoints(t *testing.T) { func TestEtcdUpdateEndpoints(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t) fakeClient := util.MakeFakeEtcdClient(t)
registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"})
endpoints := api.Endpoints{ endpoints := api.Endpoints{
Name: "foo", Name: "foo",

128
pkg/util/etcd_tools.go Normal file
View File

@ -0,0 +1,128 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"encoding/json"
"fmt"
"reflect"
"github.com/coreos/go-etcd/etcd"
)
// Interface exposing only the etcd operations needed by EtcdHelper.
type EtcdGetSet interface {
Get(key string, sort, recursive bool) (*etcd.Response, error)
Set(key, value string, ttl uint64) (*etcd.Response, error)
}
// EtcdHelper offers common object marshalling/unmarshalling operations on an etcd client.
type EtcdHelper struct {
Client EtcdGetSet
}
// Returns true iff err is an etcd not found error.
func IsEtcdNotFound(err error) bool {
if err == nil {
return false
}
switch err.(type) {
case *etcd.EtcdError:
etcdError := err.(*etcd.EtcdError)
if etcdError == nil {
return false
}
if etcdError.ErrorCode == 100 {
return true
}
}
return false
}
func (h *EtcdHelper) listEtcdNode(key string) ([]*etcd.Node, error) {
result, err := h.Client.Get(key, false, true)
if err != nil {
nodes := make([]*etcd.Node, 0)
if IsEtcdNotFound(err) {
return nodes, nil
} else {
return nodes, err
}
}
return result.Node.Nodes, nil
}
// Extract a go object per etcd node into a slice.
func (h *EtcdHelper) ExtractList(key string, slicePtr interface{}) error {
nodes, err := h.listEtcdNode(key)
if err != nil {
return err
}
pv := reflect.ValueOf(slicePtr)
if pv.Type().Kind() != reflect.Ptr || pv.Type().Elem().Kind() != reflect.Slice {
// This should not happen at runtime.
panic("need ptr to slice")
}
v := pv.Elem()
for _, node := range nodes {
obj := reflect.New(v.Type().Elem())
err = json.Unmarshal([]byte(node.Value), obj.Interface())
if err != nil {
return err
}
v.Set(reflect.Append(v, obj.Elem()))
}
return nil
}
// Unmarshals json found at key into objPtr. On a not found error, will either return
// a zero object of the requested type, or an error, depending on ignoreNotFound. Treats
// empty responses and nil response nodes exactly like a not found error.
func (h *EtcdHelper) ExtractObj(key string, objPtr interface{}, ignoreNotFound bool) error {
response, err := h.Client.Get(key, false, false)
returnZero := false
if err != nil {
if ignoreNotFound && IsEtcdNotFound(err) {
returnZero = true
} else {
return err
}
}
if !returnZero && (response.Node == nil || len(response.Node.Value) == 0) {
if ignoreNotFound {
returnZero = true
} else {
return fmt.Errorf("key '%v' found no nodes field: %#v", key, response)
}
}
if returnZero {
pv := reflect.ValueOf(objPtr)
pv.Elem().Set(reflect.Zero(pv.Type().Elem()))
return nil
}
return json.Unmarshal([]byte(response.Node.Value), objPtr)
}
// SetObj marshals obj via json, and stores under key.
func (h *EtcdHelper) SetObj(key string, obj interface{}) error {
data, err := json.Marshal(obj)
if err != nil {
return err
}
_, err = h.Client.Set(key, string(data), 0)
return err
}

135
pkg/util/etcd_tools_test.go Normal file
View File

@ -0,0 +1,135 @@
package util
import (
"fmt"
"reflect"
"testing"
"github.com/coreos/go-etcd/etcd"
)
type fakeEtcdGetSet struct {
get func(key string, sort, recursive bool) (*etcd.Response, error)
set func(key, value string, ttl uint64) (*etcd.Response, error)
}
func TestIsNotFoundErr(t *testing.T) {
try := func(err error, isNotFound bool) {
if IsEtcdNotFound(err) != isNotFound {
t.Errorf("Expected %#v to return %v, but it did not", err, isNotFound)
}
}
try(&etcd.EtcdError{ErrorCode: 100}, true)
try(&etcd.EtcdError{ErrorCode: 101}, false)
try(nil, false)
try(fmt.Errorf("some other kind of error"), false)
}
type testMarshalType struct {
ID string `json:"id"`
}
func TestExtractList(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t)
fakeClient.Data["/some/key"] = EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Nodes: []*etcd.Node{
{
Value: `{"id":"foo"}`,
},
{
Value: `{"id":"bar"}`,
},
{
Value: `{"id":"baz"}`,
},
},
},
},
}
expect := []testMarshalType{
{"foo"},
{"bar"},
{"baz"},
}
var got []testMarshalType
helper := EtcdHelper{fakeClient}
err := helper.ExtractList("/some/key", &got)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
if !reflect.DeepEqual(got, expect) {
t.Errorf("Wanted %#v, got %#v", expect, got)
}
}
func TestExtractObj(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t)
expect := testMarshalType{ID: "foo"}
fakeClient.Set("/some/key", MakeJSONString(expect), 0)
helper := EtcdHelper{fakeClient}
var got testMarshalType
err := helper.ExtractObj("/some/key", &got, false)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
if !reflect.DeepEqual(got, expect) {
t.Errorf("Wanted %#v, got %#v", expect, got)
}
}
func TestExtractObjNotFoundErr(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t)
fakeClient.Data["/some/key"] = EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
E: &etcd.EtcdError{
ErrorCode: 100,
},
}
fakeClient.Data["/some/key2"] = EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
}
fakeClient.Data["/some/key3"] = EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: "",
},
},
}
helper := EtcdHelper{fakeClient}
try := func(key string) {
var got testMarshalType
err := helper.ExtractObj(key, &got, false)
if err == nil {
t.Errorf("%s: wanted error but didn't get one", key)
}
err = helper.ExtractObj(key, &got, true)
if err != nil {
t.Errorf("%s: didn't want error but got %#v", key, err)
}
}
try("/some/key")
try("/some/key2")
try("/some/key3")
}
func TestSetObj(t *testing.T) {
obj := testMarshalType{ID: "foo"}
fakeClient := MakeFakeEtcdClient(t)
helper := EtcdHelper{fakeClient}
err := helper.SetObj("/some/key", obj)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
expect := MakeJSONString(obj)
got := fakeClient.Data["/some/key"].R.Node.Value
if expect != got {
t.Errorf("Wanted %v, got %v", expect, got)
}
}

View File

@ -13,7 +13,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package registry package util
import ( import (
"fmt" "fmt"
@ -29,7 +29,7 @@ type EtcdResponseWithError struct {
type FakeEtcdClient struct { type FakeEtcdClient struct {
Data map[string]EtcdResponseWithError Data map[string]EtcdResponseWithError
deletedKeys []string DeletedKeys []string
Err error Err error
t *testing.T t *testing.T
Ix int Ix int
@ -71,18 +71,10 @@ func (f *FakeEtcdClient) Create(key, value string, ttl uint64) (*etcd.Response,
return f.Set(key, value, ttl) return f.Set(key, value, ttl)
} }
func (f *FakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, error) { func (f *FakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, error) {
f.deletedKeys = append(f.deletedKeys, key) f.DeletedKeys = append(f.DeletedKeys, key)
return &etcd.Response{}, f.Err return &etcd.Response{}, f.Err
} }
func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) { func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) {
return nil, fmt.Errorf("unimplemented") return nil, fmt.Errorf("unimplemented")
} }
func MakeTestEtcdRegistry(client EtcdClient, machines []string) *EtcdRegistry {
registry := MakeEtcdRegistry(client, machines)
registry.manifestFactory = &BasicManifestFactory{
serviceRegistry: &MockServiceRegistry{},
}
return registry
}