From b05bc22a62c07fb0006f5e45b02657edc8fde50e Mon Sep 17 00:00:00 2001 From: Brendan Burns Date: Mon, 9 Jun 2014 13:47:25 -0700 Subject: [PATCH] Refactor the kubelet for testability. Add unit tests. Test coverage to 56.9% --- pkg/kubelet/kubelet.go | 165 ++++++++------ pkg/kubelet/kubelet_test.go | 377 ++++++++++++++++++++++++++++--- pkg/registry/fake_etcd_client.go | 10 +- 3 files changed, 441 insertions(+), 111 deletions(-) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 627439d98f..c58ebf977a 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -21,10 +21,12 @@ import ( "bytes" "encoding/json" "fmt" + "io" "io/ioutil" "log" "math/rand" "net/http" + "os" "os/exec" "strconv" "strings" @@ -237,18 +239,15 @@ func dockerNameToManifestAndContainer(name string) (manifestId, containerName st return } -func (kl *Kubelet) RunContainer(manifest *api.ContainerManifest, container *api.Container) (name string, err error) { - err = kl.pullImage(container.Image) - if err != nil { - return "", err - } - - name = manifestAndContainerToDockerName(manifest, container) - envVariables := []string{} +func makeEnvironmentVariables(container *api.Container) []string { + var result []string for _, value := range container.Env { - envVariables = append(envVariables, fmt.Sprintf("%s=%s", value.Name, value.Value)) + result = append(result, fmt.Sprintf("%s=%s", value.Name, value.Value)) } + return result +} +func makeVolumesAndBinds(container *api.Container) (map[string]struct{}, []string) { volumes := map[string]struct{}{} binds := []string{} for _, volume := range container.VolumeMounts { @@ -259,7 +258,10 @@ func (kl *Kubelet) RunContainer(manifest *api.ContainerManifest, container *api. } binds = append(binds, basePath) } + return volumes, binds +} +func makePortsAndBindings(container *api.Container) (map[docker.Port]struct{}, map[docker.Port][]docker.PortBinding) { exposedPorts := map[docker.Port]struct{}{} portBindings := map[docker.Port][]docker.PortBinding{} for _, port := range container.Ports { @@ -275,10 +277,24 @@ func (kl *Kubelet) RunContainer(manifest *api.ContainerManifest, container *api. }, } } + return exposedPorts, portBindings +} + +func makeCommandLine(container *api.Container) []string { var cmdList []string if len(container.Command) > 0 { cmdList = strings.Split(container.Command, " ") } + return cmdList +} + +func (kl *Kubelet) RunContainer(manifest *api.ContainerManifest, container *api.Container) (name string, err error) { + name = manifestAndContainerToDockerName(manifest, container) + + envVariables := makeEnvironmentVariables(container) + volumes, binds := makeVolumesAndBinds(container) + exposedPorts, portBindings := makePortsAndBindings(container) + opts := docker.CreateContainerOptions{ Name: name, Config: &docker.Config{ @@ -287,7 +303,7 @@ func (kl *Kubelet) RunContainer(manifest *api.ContainerManifest, container *api. Env: envVariables, Volumes: volumes, WorkingDir: container.WorkingDir, - Cmd: cmdList, + Cmd: makeCommandLine(container), }, } dockerContainer, err := kl.DockerClient.CreateContainer(opts) @@ -320,76 +336,78 @@ func (kl *Kubelet) KillContainer(name string) error { return err } +func (kl *Kubelet) extractFromFile(lastData []byte, name string, changeChannel chan<- api.ContainerManifest) ([]byte, error) { + var file *os.File + var err error + if file, err = os.Open(name); err != nil { + return lastData, err + } + + return kl.extractFromReader(lastData, file, changeChannel) +} + +func (kl *Kubelet) extractFromReader(lastData []byte, reader io.Reader, changeChannel chan<- api.ContainerManifest) ([]byte, error) { + var manifest api.ContainerManifest + data, err := ioutil.ReadAll(reader) + if err != nil { + log.Printf("Couldn't read file: %v", err) + return lastData, err + } + if err = kl.ExtractYAMLData(data, &manifest); err != nil { + return lastData, err + } + if !bytes.Equal(lastData, data) { + lastData = data + // Ok, we have a valid configuration, send to channel for + // rejiggering. + changeChannel <- manifest + return data, nil + } + return lastData, nil +} + // Watch a file for changes to the set of pods that should run on this Kubelet // This function loops forever and is intended to be run as a goroutine func (kl *Kubelet) WatchFile(file string, changeChannel chan<- api.ContainerManifest) { var lastData []byte for { + var err error time.Sleep(kl.FileCheckFrequency) - var manifest api.ContainerManifest - data, err := ioutil.ReadFile(file) + lastData, err = kl.extractFromFile(lastData, file, changeChannel) if err != nil { - log.Printf("Couldn't read file: %s : %v", file, err) - continue - } - if err = kl.ExtractYAMLData(data, &manifest); err != nil { - continue - } - if !bytes.Equal(lastData, data) { - lastData = data - // Ok, we have a valid configuration, send to channel for - // rejiggering. - changeChannel <- manifest - continue + log.Printf("Error polling file: %#v", err) } } } +func (kl *Kubelet) extractFromHTTP(lastData []byte, url string, changeChannel chan<- api.ContainerManifest) ([]byte, error) { + client := &http.Client{} + request, err := http.NewRequest("GET", url, nil) + if err != nil { + return lastData, err + } + response, err := client.Do(request) + if err != nil { + return lastData, err + } + defer response.Body.Close() + return kl.extractFromReader(lastData, response.Body, changeChannel) +} + // Watch an HTTP endpoint for changes to the set of pods that should run on this Kubelet // This function runs forever and is intended to be run as a goroutine func (kl *Kubelet) WatchHTTP(url string, changeChannel chan<- api.ContainerManifest) { var lastData []byte - client := &http.Client{} for { + var err error time.Sleep(kl.HTTPCheckFrequency) - var config api.ContainerManifest - data, err := kl.SyncHTTP(client, url, &config) - log.Printf("Containers: %#v", config) + lastData, err = kl.extractFromHTTP(lastData, url, changeChannel) if err != nil { - log.Printf("Error syncing HTTP: %#v", err) - continue - } - if !bytes.Equal(lastData, data) { - lastData = data - changeChannel <- config - continue + log.Printf("Error syncing http: %#v", err) } } } -// SyncHTTP reads from url a yaml manifest and populates config. Returns the -// raw bytes, if something was read. Returns an error if something goes wrong. -// 'client' is used to execute the request, to allow caching of clients. -func (kl *Kubelet) SyncHTTP(client *http.Client, url string, config *api.ContainerManifest) ([]byte, error) { - request, err := http.NewRequest("GET", url, nil) - if err != nil { - return nil, err - } - response, err := client.Do(request) - if err != nil { - return nil, err - } - defer response.Body.Close() - body, err := ioutil.ReadAll(response.Body) - if err != nil { - return nil, err - } - if err = kl.ExtractYAMLData(body, &config); err != nil { - return body, err - } - return body, nil -} - // Take an etcd Response object, and turn it into a structured list of containers // Return a list of containers, or an error if one occurs. func (kl *Kubelet) ResponseToManifests(response *etcd.Response) ([]api.ContainerManifest, error) { @@ -473,30 +491,32 @@ func (kl *Kubelet) ExtractYAMLData(buf []byte, output interface{}) error { return nil } +func (kl *Kubelet) extractFromEtcd(response *etcd.Response) ([]api.ContainerManifest, error) { + var manifests []api.ContainerManifest + if response.Node == nil || len(response.Node.Value) == 0 { + return manifests, fmt.Errorf("No nodes field: %#v", response) + } + err := kl.ExtractYAMLData([]byte(response.Node.Value), &manifests) + return manifests, err +} + // Watch etcd for changes, receives config objects from the etcd client watch. -// This function loops forever and is intended to be run as a goroutine. +// This function loops until the watchChannel is closed, and is intended to be run as a goroutine. func (kl *Kubelet) WatchEtcd(watchChannel <-chan *etcd.Response, changeChannel chan<- []api.ContainerManifest) { defer util.HandleCrash() for { watchResponse := <-watchChannel log.Printf("Got change: %#v", watchResponse) - // This means the channel has been closed. if watchResponse == nil { return } - - if watchResponse.Node == nil || len(watchResponse.Node.Value) == 0 { - log.Printf("No nodes field: %#v", watchResponse) - if watchResponse.Node != nil { - log.Printf("Node: %#v", watchResponse.Node) - } - } - log.Printf("Got data: %v", watchResponse.Node.Value) - var manifests []api.ContainerManifest - if err := kl.ExtractYAMLData([]byte(watchResponse.Node.Value), &manifests); err != nil { + manifests, err := kl.extractFromEtcd(watchResponse) + if err != nil { + log.Printf("Error handling response from etcd: %#v", err) continue } + log.Printf("manifests: %#v", manifests) // Ok, we have a valid configuration, send to channel for // rejiggering. changeChannel <- manifests @@ -518,6 +538,11 @@ func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error { } if !exists { log.Printf("%#v doesn't exist, creating", element) + err = kl.pullImage(element.Image) + if err != nil { + log.Printf("Error pulling container: %#v", err) + continue + } actualName, err = kl.RunContainer(&manifest, &element) // For some reason, list gives back names that start with '/' actualName = "/" + actualName diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 5f73b32ab6..659204f557 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -16,10 +16,13 @@ limitations under the License. package kubelet import ( + "bytes" "encoding/json" "fmt" - "net/http" + "io/ioutil" "net/http/httptest" + "reflect" + "strings" "sync" "testing" @@ -191,6 +194,14 @@ func TestContainerExists(t *testing.T) { if err != nil { t.Errorf("Unexpected error: %#v", err) } + + fakeDocker.clearCalls() + missingManifest := api.ContainerManifest{Id: "foobar"} + exists, _, err = kubelet.ContainerExists(&missingManifest, &container) + verifyCalls(t, fakeDocker, []string{"list"}) + if exists { + t.Errorf("Failed to not find container %#v, missingManifest") + } } func TestGetContainerID(t *testing.T) { @@ -322,42 +333,6 @@ func TestKillContainer(t *testing.T) { verifyCalls(t, fakeDocker, []string{"list", "stop"}) } -func TestSyncHTTP(t *testing.T) { - containers := api.ContainerManifest{ - Containers: []api.Container{ - { - Name: "foo", - Image: "dockerfile/foo", - }, - { - Name: "bar", - Image: "dockerfile/bar", - }, - }, - } - data, _ := json.Marshal(containers) - fakeHandler := util.FakeHandler{ - StatusCode: 200, - ResponseBody: string(data), - } - testServer := httptest.NewServer(&fakeHandler) - kubelet := Kubelet{} - - var containersOut api.ContainerManifest - data, err := kubelet.SyncHTTP(&http.Client{}, testServer.URL, &containersOut) - if err != nil { - t.Errorf("Unexpected error: %#v", err) - } - if len(containers.Containers) != len(containersOut.Containers) { - t.Errorf("Container sizes don't match. Expected: %d Received %d, %#v", len(containers.Containers), len(containersOut.Containers), containersOut) - } - expectedData, _ := json.Marshal(containers) - actualData, _ := json.Marshal(containersOut) - if string(expectedData) != string(actualData) { - t.Errorf("Container data doesn't match. Expected: %s Received %s", string(expectedData), string(actualData)) - } -} - func TestResponseToContainersNil(t *testing.T) { kubelet := Kubelet{} list, err := kubelet.ResponseToManifests(&etcd.Response{Node: nil}) @@ -560,3 +535,331 @@ func TestSyncManifestsDeletes(t *testing.T) { t.Errorf("Unexpected call sequence: %#v", fakeDocker.called) } } + +func TestEventWriting(t *testing.T) { + fakeEtcd := registry.MakeFakeEtcdClient(t) + kubelet := &Kubelet{ + Client: fakeEtcd, + } + expectedEvent := api.Event{ + Event: "test", + Container: &api.Container{ + Name: "foo", + }, + } + err := kubelet.LogEvent(&expectedEvent) + expectNoError(t, err) + if fakeEtcd.Ix != 1 { + t.Errorf("Unexpected number of children added: %d, expected 1", fakeEtcd.Ix) + } + response, err := fakeEtcd.Get("/events/foo/1", false, false) + expectNoError(t, err) + var event api.Event + err = json.Unmarshal([]byte(response.Node.Value), &event) + expectNoError(t, err) + if event.Event != expectedEvent.Event || + event.Container.Name != expectedEvent.Container.Name { + t.Errorf("Event's don't match. Expected: %#v Saw: %#v", expectedEvent, event) + } +} + +func TestEventWritingError(t *testing.T) { + fakeEtcd := registry.MakeFakeEtcdClient(t) + kubelet := &Kubelet{ + Client: fakeEtcd, + } + fakeEtcd.Err = fmt.Errorf("Test error") + err := kubelet.LogEvent(&api.Event{ + Event: "test", + Container: &api.Container{ + Name: "foo", + }, + }) + if err == nil { + t.Errorf("Unexpected non-error") + } +} + +func TestMakeCommandLine(t *testing.T) { + expected := []string{"echo", "hello", "world"} + container := api.Container{ + Command: strings.Join(expected, " "), + } + cmdLine := makeCommandLine(&container) + if !reflect.DeepEqual(expected, cmdLine) { + t.Error("Unexpected command line. Expected %#v, got %#v", expected, cmdLine) + } +} + +func TestMakeEnvVariables(t *testing.T) { + container := api.Container{ + Env: []api.EnvVar{ + api.EnvVar{ + Name: "foo", + Value: "bar", + }, + api.EnvVar{ + Name: "baz", + Value: "blah", + }, + }, + } + vars := makeEnvironmentVariables(&container) + if len(vars) != len(container.Env) { + t.Errorf("Vars don't match. Expected: %#v Found: %#v", container.Env, vars) + } + for ix, env := range container.Env { + value := fmt.Sprintf("%s=%s", env.Name, env.Value) + if value != vars[ix] { + t.Errorf("Unexpected value: %s. Expected: %s", vars[ix], value) + } + } +} + +func TestMakeVolumesAndBinds(t *testing.T) { + container := api.Container{ + VolumeMounts: []api.VolumeMount{ + api.VolumeMount{ + MountPath: "/mnt/path", + Name: "disk", + ReadOnly: false, + }, + api.VolumeMount{ + MountPath: "/mnt/path2", + Name: "disk2", + ReadOnly: true, + }, + }, + } + volumes, binds := makeVolumesAndBinds(&container) + if len(volumes) != len(container.VolumeMounts) || + len(binds) != len(container.VolumeMounts) { + t.Errorf("Unexpected volumes and binds: %#v %#v. Container was: %#v", volumes, binds, container) + } + for ix, volume := range container.VolumeMounts { + expectedBind := "/exports/" + volume.Name + ":" + volume.MountPath + if volume.ReadOnly { + expectedBind = expectedBind + ":ro" + } + if binds[ix] != expectedBind { + t.Errorf("Unexpected bind. Expected %s. Found %s", expectedBind, binds[ix]) + } + if _, ok := volumes[volume.MountPath]; !ok { + t.Errorf("Map is missing key: %s. %#v", volume.MountPath, volumes) + } + } +} + +func TestMakePortsAndBindings(t *testing.T) { + container := api.Container{ + Ports: []api.Port{ + api.Port{ + ContainerPort: 80, + HostPort: 8080, + }, + api.Port{ + ContainerPort: 443, + HostPort: 443, + }, + }, + } + exposedPorts, bindings := makePortsAndBindings(&container) + if len(container.Ports) != len(exposedPorts) || + len(container.Ports) != len(bindings) { + t.Errorf("Unexpected ports and bindings, %#v %#v %#v", container, exposedPorts, bindings) + } +} + +func TestExtractFromNonExistentFile(t *testing.T) { + kubelet := Kubelet{} + changeChannel := make(chan api.ContainerManifest) + lastData := []byte{1, 2, 3} + data, err := kubelet.extractFromFile(lastData, "/some/fake/file", changeChannel) + if err == nil { + t.Error("Unexpected non-error.") + } + if !bytes.Equal(data, lastData) { + t.Errorf("Unexpected data response. Expected %#v, found %#v", lastData, data) + } +} + +func TestExtractFromBadDataFile(t *testing.T) { + kubelet := Kubelet{} + changeChannel := make(chan api.ContainerManifest) + lastData := []byte{1, 2, 3} + file, err := ioutil.TempFile("", "foo") + expectNoError(t, err) + name := file.Name() + file.Close() + ioutil.WriteFile(name, lastData, 0755) + data, err := kubelet.extractFromFile(lastData, name, changeChannel) + + if err == nil { + t.Error("Unexpected non-error.") + } + if !bytes.Equal(data, lastData) { + t.Errorf("Unexpected data response. Expected %#v, found %#v", lastData, data) + } +} + +func TestExtractFromSameDataFile(t *testing.T) { + kubelet := Kubelet{} + changeChannel := make(chan api.ContainerManifest) + manifest := api.ContainerManifest{ + Id: "foo", + } + lastData, err := json.Marshal(manifest) + expectNoError(t, err) + file, err := ioutil.TempFile("", "foo") + expectNoError(t, err) + name := file.Name() + expectNoError(t, file.Close()) + ioutil.WriteFile(name, lastData, 0755) + data, err := kubelet.extractFromFile(lastData, name, changeChannel) + + expectNoError(t, err) + if !bytes.Equal(data, lastData) { + t.Errorf("Unexpected data response. Expected %#v, found %#v", lastData, data) + } +} + +func TestExtractFromChangedDataFile(t *testing.T) { + kubelet := Kubelet{} + changeChannel := make(chan api.ContainerManifest) + reader := startReadingSingle(changeChannel) + oldManifest := api.ContainerManifest{ + Id: "foo", + } + newManifest := api.ContainerManifest{ + Id: "bar", + } + lastData, err := json.Marshal(oldManifest) + expectNoError(t, err) + newData, err := json.Marshal(newManifest) + expectNoError(t, err) + file, err := ioutil.TempFile("", "foo") + expectNoError(t, err) + name := file.Name() + expectNoError(t, file.Close()) + ioutil.WriteFile(name, newData, 0755) + data, err := kubelet.extractFromFile(lastData, name, changeChannel) + close(changeChannel) + + expectNoError(t, err) + if !bytes.Equal(data, newData) { + t.Errorf("Unexpected data response. Expected %#v, found %#v", lastData, data) + } + read := reader.GetList() + if len(read) != 1 { + t.Errorf("Unexpected channel traffic: %#v", read) + } + if !reflect.DeepEqual(read[0], newManifest) { + t.Errorf("Unexpected difference. Expected %#v, got %#v", newManifest, read[0]) + } +} + +func TestExtractFromHttpBadness(t *testing.T) { + kubelet := Kubelet{} + lastData := []byte{1, 2, 3} + changeChannel := make(chan api.ContainerManifest) + data, err := kubelet.extractFromHTTP(lastData, "http://localhost:12345", changeChannel) + if err == nil { + t.Error("Unexpected non-error.") + } + if !bytes.Equal(lastData, data) { + t.Errorf("Unexpected difference. Expected: %#v, Saw: %#v", lastData, data) + } +} + +func TestExtractFromHttpNoChange(t *testing.T) { + kubelet := Kubelet{} + changeChannel := make(chan api.ContainerManifest) + + manifest := api.ContainerManifest{ + Id: "foo", + } + lastData, err := json.Marshal(manifest) + + fakeHandler := util.FakeHandler{ + StatusCode: 200, + ResponseBody: string(lastData), + } + testServer := httptest.NewServer(&fakeHandler) + + data, err := kubelet.extractFromHTTP(lastData, testServer.URL, changeChannel) + if err != nil { + t.Errorf("Unexpected error: %#v", err) + } + if !bytes.Equal(lastData, data) { + t.Errorf("Unexpected difference. Expected: %#v, Saw: %#v", lastData, data) + } +} + +func TestExtractFromHttpChanges(t *testing.T) { + kubelet := Kubelet{} + changeChannel := make(chan api.ContainerManifest) + reader := startReadingSingle(changeChannel) + + manifest := api.ContainerManifest{ + Id: "foo", + } + newManifest := api.ContainerManifest{ + Id: "bar", + } + lastData, _ := json.Marshal(manifest) + newData, _ := json.Marshal(newManifest) + fakeHandler := util.FakeHandler{ + StatusCode: 200, + ResponseBody: string(newData), + } + testServer := httptest.NewServer(&fakeHandler) + + data, err := kubelet.extractFromHTTP(lastData, testServer.URL, changeChannel) + close(changeChannel) + + read := reader.GetList() + + if err != nil { + t.Errorf("Unexpected error: %#v", err) + } + if len(read) != 1 { + t.Errorf("Unexpected list: %#v", read) + } + if !bytes.Equal(newData, data) { + t.Errorf("Unexpected difference. Expected: %#v, Saw: %#v", lastData, data) + } + if !reflect.DeepEqual(newManifest, read[0]) { + t.Errorf("Unexpected difference. Expected: %#v, Saw: %#v", newManifest, read[0]) + } +} + +func TestWatchEtcd(t *testing.T) { + watchChannel := make(chan *etcd.Response) + changeChannel := make(chan []api.ContainerManifest) + kubelet := Kubelet{} + reader := startReading(changeChannel) + + manifest := []api.ContainerManifest{ + api.ContainerManifest{ + Id: "foo", + }, + } + data, err := json.Marshal(manifest) + expectNoError(t, err) + + go kubelet.WatchEtcd(watchChannel, changeChannel) + + watchChannel <- &etcd.Response{ + Node: &etcd.Node{ + Value: string(data), + }, + } + close(watchChannel) + close(changeChannel) + + read := reader.GetList() + if len(read) != 1 || + !reflect.DeepEqual(read[0], manifest) { + t.Errorf("Unexpected manifest(s) %#v %#v", read[0], manifest) + } +} diff --git a/pkg/registry/fake_etcd_client.go b/pkg/registry/fake_etcd_client.go index 95a4084095..c03a6443c4 100644 --- a/pkg/registry/fake_etcd_client.go +++ b/pkg/registry/fake_etcd_client.go @@ -30,8 +30,9 @@ type EtcdResponseWithError struct { type FakeEtcdClient struct { Data map[string]EtcdResponseWithError deletedKeys []string - err error + Err error t *testing.T + Ix int } func MakeFakeEtcdClient(t *testing.T) *FakeEtcdClient { @@ -42,7 +43,8 @@ func MakeFakeEtcdClient(t *testing.T) *FakeEtcdClient { } func (f *FakeEtcdClient) AddChild(key, data string, ttl uint64) (*etcd.Response, error) { - return f.Set(key, data, ttl) + f.Ix = f.Ix + 1 + return f.Set(fmt.Sprintf("%s/%d", key, f.Ix), data, ttl) } func (f *FakeEtcdClient) Get(key string, sort, recursive bool) (*etcd.Response, error) { @@ -63,14 +65,14 @@ func (f *FakeEtcdClient) Set(key, value string, ttl uint64) (*etcd.Response, err }, } f.Data[key] = result - return result.R, f.err + return result.R, f.Err } func (f *FakeEtcdClient) Create(key, value string, ttl uint64) (*etcd.Response, error) { return f.Set(key, value, ttl) } func (f *FakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, error) { f.deletedKeys = append(f.deletedKeys, key) - return &etcd.Response{}, f.err + return &etcd.Response{}, f.Err } func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) {