Refactor the kubelet for testability.

Add unit tests.  Test coverage to 56.9%
pull/6/head
Brendan Burns 2014-06-09 13:47:25 -07:00
parent 54d1e7e86d
commit b05bc22a62
3 changed files with 441 additions and 111 deletions

View File

@ -21,10 +21,12 @@ import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"math/rand"
"net/http"
"os"
"os/exec"
"strconv"
"strings"
@ -237,18 +239,15 @@ func dockerNameToManifestAndContainer(name string) (manifestId, containerName st
return
}
func (kl *Kubelet) RunContainer(manifest *api.ContainerManifest, container *api.Container) (name string, err error) {
err = kl.pullImage(container.Image)
if err != nil {
return "", err
}
name = manifestAndContainerToDockerName(manifest, container)
envVariables := []string{}
func makeEnvironmentVariables(container *api.Container) []string {
var result []string
for _, value := range container.Env {
envVariables = append(envVariables, fmt.Sprintf("%s=%s", value.Name, value.Value))
result = append(result, fmt.Sprintf("%s=%s", value.Name, value.Value))
}
return result
}
func makeVolumesAndBinds(container *api.Container) (map[string]struct{}, []string) {
volumes := map[string]struct{}{}
binds := []string{}
for _, volume := range container.VolumeMounts {
@ -259,7 +258,10 @@ func (kl *Kubelet) RunContainer(manifest *api.ContainerManifest, container *api.
}
binds = append(binds, basePath)
}
return volumes, binds
}
func makePortsAndBindings(container *api.Container) (map[docker.Port]struct{}, map[docker.Port][]docker.PortBinding) {
exposedPorts := map[docker.Port]struct{}{}
portBindings := map[docker.Port][]docker.PortBinding{}
for _, port := range container.Ports {
@ -275,10 +277,24 @@ func (kl *Kubelet) RunContainer(manifest *api.ContainerManifest, container *api.
},
}
}
return exposedPorts, portBindings
}
func makeCommandLine(container *api.Container) []string {
var cmdList []string
if len(container.Command) > 0 {
cmdList = strings.Split(container.Command, " ")
}
return cmdList
}
func (kl *Kubelet) RunContainer(manifest *api.ContainerManifest, container *api.Container) (name string, err error) {
name = manifestAndContainerToDockerName(manifest, container)
envVariables := makeEnvironmentVariables(container)
volumes, binds := makeVolumesAndBinds(container)
exposedPorts, portBindings := makePortsAndBindings(container)
opts := docker.CreateContainerOptions{
Name: name,
Config: &docker.Config{
@ -287,7 +303,7 @@ func (kl *Kubelet) RunContainer(manifest *api.ContainerManifest, container *api.
Env: envVariables,
Volumes: volumes,
WorkingDir: container.WorkingDir,
Cmd: cmdList,
Cmd: makeCommandLine(container),
},
}
dockerContainer, err := kl.DockerClient.CreateContainer(opts)
@ -320,76 +336,78 @@ func (kl *Kubelet) KillContainer(name string) error {
return err
}
func (kl *Kubelet) extractFromFile(lastData []byte, name string, changeChannel chan<- api.ContainerManifest) ([]byte, error) {
var file *os.File
var err error
if file, err = os.Open(name); err != nil {
return lastData, err
}
return kl.extractFromReader(lastData, file, changeChannel)
}
func (kl *Kubelet) extractFromReader(lastData []byte, reader io.Reader, changeChannel chan<- api.ContainerManifest) ([]byte, error) {
var manifest api.ContainerManifest
data, err := ioutil.ReadAll(reader)
if err != nil {
log.Printf("Couldn't read file: %v", err)
return lastData, err
}
if err = kl.ExtractYAMLData(data, &manifest); err != nil {
return lastData, err
}
if !bytes.Equal(lastData, data) {
lastData = data
// Ok, we have a valid configuration, send to channel for
// rejiggering.
changeChannel <- manifest
return data, nil
}
return lastData, nil
}
// Watch a file for changes to the set of pods that should run on this Kubelet
// This function loops forever and is intended to be run as a goroutine
func (kl *Kubelet) WatchFile(file string, changeChannel chan<- api.ContainerManifest) {
var lastData []byte
for {
var err error
time.Sleep(kl.FileCheckFrequency)
var manifest api.ContainerManifest
data, err := ioutil.ReadFile(file)
lastData, err = kl.extractFromFile(lastData, file, changeChannel)
if err != nil {
log.Printf("Couldn't read file: %s : %v", file, err)
continue
}
if err = kl.ExtractYAMLData(data, &manifest); err != nil {
continue
}
if !bytes.Equal(lastData, data) {
lastData = data
// Ok, we have a valid configuration, send to channel for
// rejiggering.
changeChannel <- manifest
continue
log.Printf("Error polling file: %#v", err)
}
}
}
func (kl *Kubelet) extractFromHTTP(lastData []byte, url string, changeChannel chan<- api.ContainerManifest) ([]byte, error) {
client := &http.Client{}
request, err := http.NewRequest("GET", url, nil)
if err != nil {
return lastData, err
}
response, err := client.Do(request)
if err != nil {
return lastData, err
}
defer response.Body.Close()
return kl.extractFromReader(lastData, response.Body, changeChannel)
}
// Watch an HTTP endpoint for changes to the set of pods that should run on this Kubelet
// This function runs forever and is intended to be run as a goroutine
func (kl *Kubelet) WatchHTTP(url string, changeChannel chan<- api.ContainerManifest) {
var lastData []byte
client := &http.Client{}
for {
var err error
time.Sleep(kl.HTTPCheckFrequency)
var config api.ContainerManifest
data, err := kl.SyncHTTP(client, url, &config)
log.Printf("Containers: %#v", config)
lastData, err = kl.extractFromHTTP(lastData, url, changeChannel)
if err != nil {
log.Printf("Error syncing HTTP: %#v", err)
continue
}
if !bytes.Equal(lastData, data) {
lastData = data
changeChannel <- config
continue
log.Printf("Error syncing http: %#v", err)
}
}
}
// SyncHTTP reads from url a yaml manifest and populates config. Returns the
// raw bytes, if something was read. Returns an error if something goes wrong.
// 'client' is used to execute the request, to allow caching of clients.
func (kl *Kubelet) SyncHTTP(client *http.Client, url string, config *api.ContainerManifest) ([]byte, error) {
request, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
response, err := client.Do(request)
if err != nil {
return nil, err
}
defer response.Body.Close()
body, err := ioutil.ReadAll(response.Body)
if err != nil {
return nil, err
}
if err = kl.ExtractYAMLData(body, &config); err != nil {
return body, err
}
return body, nil
}
// Take an etcd Response object, and turn it into a structured list of containers
// Return a list of containers, or an error if one occurs.
func (kl *Kubelet) ResponseToManifests(response *etcd.Response) ([]api.ContainerManifest, error) {
@ -473,30 +491,32 @@ func (kl *Kubelet) ExtractYAMLData(buf []byte, output interface{}) error {
return nil
}
func (kl *Kubelet) extractFromEtcd(response *etcd.Response) ([]api.ContainerManifest, error) {
var manifests []api.ContainerManifest
if response.Node == nil || len(response.Node.Value) == 0 {
return manifests, fmt.Errorf("No nodes field: %#v", response)
}
err := kl.ExtractYAMLData([]byte(response.Node.Value), &manifests)
return manifests, err
}
// Watch etcd for changes, receives config objects from the etcd client watch.
// This function loops forever and is intended to be run as a goroutine.
// This function loops until the watchChannel is closed, and is intended to be run as a goroutine.
func (kl *Kubelet) WatchEtcd(watchChannel <-chan *etcd.Response, changeChannel chan<- []api.ContainerManifest) {
defer util.HandleCrash()
for {
watchResponse := <-watchChannel
log.Printf("Got change: %#v", watchResponse)
// This means the channel has been closed.
if watchResponse == nil {
return
}
if watchResponse.Node == nil || len(watchResponse.Node.Value) == 0 {
log.Printf("No nodes field: %#v", watchResponse)
if watchResponse.Node != nil {
log.Printf("Node: %#v", watchResponse.Node)
}
}
log.Printf("Got data: %v", watchResponse.Node.Value)
var manifests []api.ContainerManifest
if err := kl.ExtractYAMLData([]byte(watchResponse.Node.Value), &manifests); err != nil {
manifests, err := kl.extractFromEtcd(watchResponse)
if err != nil {
log.Printf("Error handling response from etcd: %#v", err)
continue
}
log.Printf("manifests: %#v", manifests)
// Ok, we have a valid configuration, send to channel for
// rejiggering.
changeChannel <- manifests
@ -518,6 +538,11 @@ func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error {
}
if !exists {
log.Printf("%#v doesn't exist, creating", element)
err = kl.pullImage(element.Image)
if err != nil {
log.Printf("Error pulling container: %#v", err)
continue
}
actualName, err = kl.RunContainer(&manifest, &element)
// For some reason, list gives back names that start with '/'
actualName = "/" + actualName

View File

@ -16,10 +16,13 @@ limitations under the License.
package kubelet
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"io/ioutil"
"net/http/httptest"
"reflect"
"strings"
"sync"
"testing"
@ -191,6 +194,14 @@ func TestContainerExists(t *testing.T) {
if err != nil {
t.Errorf("Unexpected error: %#v", err)
}
fakeDocker.clearCalls()
missingManifest := api.ContainerManifest{Id: "foobar"}
exists, _, err = kubelet.ContainerExists(&missingManifest, &container)
verifyCalls(t, fakeDocker, []string{"list"})
if exists {
t.Errorf("Failed to not find container %#v, missingManifest")
}
}
func TestGetContainerID(t *testing.T) {
@ -322,42 +333,6 @@ func TestKillContainer(t *testing.T) {
verifyCalls(t, fakeDocker, []string{"list", "stop"})
}
func TestSyncHTTP(t *testing.T) {
containers := api.ContainerManifest{
Containers: []api.Container{
{
Name: "foo",
Image: "dockerfile/foo",
},
{
Name: "bar",
Image: "dockerfile/bar",
},
},
}
data, _ := json.Marshal(containers)
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(data),
}
testServer := httptest.NewServer(&fakeHandler)
kubelet := Kubelet{}
var containersOut api.ContainerManifest
data, err := kubelet.SyncHTTP(&http.Client{}, testServer.URL, &containersOut)
if err != nil {
t.Errorf("Unexpected error: %#v", err)
}
if len(containers.Containers) != len(containersOut.Containers) {
t.Errorf("Container sizes don't match. Expected: %d Received %d, %#v", len(containers.Containers), len(containersOut.Containers), containersOut)
}
expectedData, _ := json.Marshal(containers)
actualData, _ := json.Marshal(containersOut)
if string(expectedData) != string(actualData) {
t.Errorf("Container data doesn't match. Expected: %s Received %s", string(expectedData), string(actualData))
}
}
func TestResponseToContainersNil(t *testing.T) {
kubelet := Kubelet{}
list, err := kubelet.ResponseToManifests(&etcd.Response{Node: nil})
@ -560,3 +535,331 @@ func TestSyncManifestsDeletes(t *testing.T) {
t.Errorf("Unexpected call sequence: %#v", fakeDocker.called)
}
}
func TestEventWriting(t *testing.T) {
fakeEtcd := registry.MakeFakeEtcdClient(t)
kubelet := &Kubelet{
Client: fakeEtcd,
}
expectedEvent := api.Event{
Event: "test",
Container: &api.Container{
Name: "foo",
},
}
err := kubelet.LogEvent(&expectedEvent)
expectNoError(t, err)
if fakeEtcd.Ix != 1 {
t.Errorf("Unexpected number of children added: %d, expected 1", fakeEtcd.Ix)
}
response, err := fakeEtcd.Get("/events/foo/1", false, false)
expectNoError(t, err)
var event api.Event
err = json.Unmarshal([]byte(response.Node.Value), &event)
expectNoError(t, err)
if event.Event != expectedEvent.Event ||
event.Container.Name != expectedEvent.Container.Name {
t.Errorf("Event's don't match. Expected: %#v Saw: %#v", expectedEvent, event)
}
}
func TestEventWritingError(t *testing.T) {
fakeEtcd := registry.MakeFakeEtcdClient(t)
kubelet := &Kubelet{
Client: fakeEtcd,
}
fakeEtcd.Err = fmt.Errorf("Test error")
err := kubelet.LogEvent(&api.Event{
Event: "test",
Container: &api.Container{
Name: "foo",
},
})
if err == nil {
t.Errorf("Unexpected non-error")
}
}
func TestMakeCommandLine(t *testing.T) {
expected := []string{"echo", "hello", "world"}
container := api.Container{
Command: strings.Join(expected, " "),
}
cmdLine := makeCommandLine(&container)
if !reflect.DeepEqual(expected, cmdLine) {
t.Error("Unexpected command line. Expected %#v, got %#v", expected, cmdLine)
}
}
func TestMakeEnvVariables(t *testing.T) {
container := api.Container{
Env: []api.EnvVar{
api.EnvVar{
Name: "foo",
Value: "bar",
},
api.EnvVar{
Name: "baz",
Value: "blah",
},
},
}
vars := makeEnvironmentVariables(&container)
if len(vars) != len(container.Env) {
t.Errorf("Vars don't match. Expected: %#v Found: %#v", container.Env, vars)
}
for ix, env := range container.Env {
value := fmt.Sprintf("%s=%s", env.Name, env.Value)
if value != vars[ix] {
t.Errorf("Unexpected value: %s. Expected: %s", vars[ix], value)
}
}
}
func TestMakeVolumesAndBinds(t *testing.T) {
container := api.Container{
VolumeMounts: []api.VolumeMount{
api.VolumeMount{
MountPath: "/mnt/path",
Name: "disk",
ReadOnly: false,
},
api.VolumeMount{
MountPath: "/mnt/path2",
Name: "disk2",
ReadOnly: true,
},
},
}
volumes, binds := makeVolumesAndBinds(&container)
if len(volumes) != len(container.VolumeMounts) ||
len(binds) != len(container.VolumeMounts) {
t.Errorf("Unexpected volumes and binds: %#v %#v. Container was: %#v", volumes, binds, container)
}
for ix, volume := range container.VolumeMounts {
expectedBind := "/exports/" + volume.Name + ":" + volume.MountPath
if volume.ReadOnly {
expectedBind = expectedBind + ":ro"
}
if binds[ix] != expectedBind {
t.Errorf("Unexpected bind. Expected %s. Found %s", expectedBind, binds[ix])
}
if _, ok := volumes[volume.MountPath]; !ok {
t.Errorf("Map is missing key: %s. %#v", volume.MountPath, volumes)
}
}
}
func TestMakePortsAndBindings(t *testing.T) {
container := api.Container{
Ports: []api.Port{
api.Port{
ContainerPort: 80,
HostPort: 8080,
},
api.Port{
ContainerPort: 443,
HostPort: 443,
},
},
}
exposedPorts, bindings := makePortsAndBindings(&container)
if len(container.Ports) != len(exposedPorts) ||
len(container.Ports) != len(bindings) {
t.Errorf("Unexpected ports and bindings, %#v %#v %#v", container, exposedPorts, bindings)
}
}
func TestExtractFromNonExistentFile(t *testing.T) {
kubelet := Kubelet{}
changeChannel := make(chan api.ContainerManifest)
lastData := []byte{1, 2, 3}
data, err := kubelet.extractFromFile(lastData, "/some/fake/file", changeChannel)
if err == nil {
t.Error("Unexpected non-error.")
}
if !bytes.Equal(data, lastData) {
t.Errorf("Unexpected data response. Expected %#v, found %#v", lastData, data)
}
}
func TestExtractFromBadDataFile(t *testing.T) {
kubelet := Kubelet{}
changeChannel := make(chan api.ContainerManifest)
lastData := []byte{1, 2, 3}
file, err := ioutil.TempFile("", "foo")
expectNoError(t, err)
name := file.Name()
file.Close()
ioutil.WriteFile(name, lastData, 0755)
data, err := kubelet.extractFromFile(lastData, name, changeChannel)
if err == nil {
t.Error("Unexpected non-error.")
}
if !bytes.Equal(data, lastData) {
t.Errorf("Unexpected data response. Expected %#v, found %#v", lastData, data)
}
}
func TestExtractFromSameDataFile(t *testing.T) {
kubelet := Kubelet{}
changeChannel := make(chan api.ContainerManifest)
manifest := api.ContainerManifest{
Id: "foo",
}
lastData, err := json.Marshal(manifest)
expectNoError(t, err)
file, err := ioutil.TempFile("", "foo")
expectNoError(t, err)
name := file.Name()
expectNoError(t, file.Close())
ioutil.WriteFile(name, lastData, 0755)
data, err := kubelet.extractFromFile(lastData, name, changeChannel)
expectNoError(t, err)
if !bytes.Equal(data, lastData) {
t.Errorf("Unexpected data response. Expected %#v, found %#v", lastData, data)
}
}
func TestExtractFromChangedDataFile(t *testing.T) {
kubelet := Kubelet{}
changeChannel := make(chan api.ContainerManifest)
reader := startReadingSingle(changeChannel)
oldManifest := api.ContainerManifest{
Id: "foo",
}
newManifest := api.ContainerManifest{
Id: "bar",
}
lastData, err := json.Marshal(oldManifest)
expectNoError(t, err)
newData, err := json.Marshal(newManifest)
expectNoError(t, err)
file, err := ioutil.TempFile("", "foo")
expectNoError(t, err)
name := file.Name()
expectNoError(t, file.Close())
ioutil.WriteFile(name, newData, 0755)
data, err := kubelet.extractFromFile(lastData, name, changeChannel)
close(changeChannel)
expectNoError(t, err)
if !bytes.Equal(data, newData) {
t.Errorf("Unexpected data response. Expected %#v, found %#v", lastData, data)
}
read := reader.GetList()
if len(read) != 1 {
t.Errorf("Unexpected channel traffic: %#v", read)
}
if !reflect.DeepEqual(read[0], newManifest) {
t.Errorf("Unexpected difference. Expected %#v, got %#v", newManifest, read[0])
}
}
func TestExtractFromHttpBadness(t *testing.T) {
kubelet := Kubelet{}
lastData := []byte{1, 2, 3}
changeChannel := make(chan api.ContainerManifest)
data, err := kubelet.extractFromHTTP(lastData, "http://localhost:12345", changeChannel)
if err == nil {
t.Error("Unexpected non-error.")
}
if !bytes.Equal(lastData, data) {
t.Errorf("Unexpected difference. Expected: %#v, Saw: %#v", lastData, data)
}
}
func TestExtractFromHttpNoChange(t *testing.T) {
kubelet := Kubelet{}
changeChannel := make(chan api.ContainerManifest)
manifest := api.ContainerManifest{
Id: "foo",
}
lastData, err := json.Marshal(manifest)
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(lastData),
}
testServer := httptest.NewServer(&fakeHandler)
data, err := kubelet.extractFromHTTP(lastData, testServer.URL, changeChannel)
if err != nil {
t.Errorf("Unexpected error: %#v", err)
}
if !bytes.Equal(lastData, data) {
t.Errorf("Unexpected difference. Expected: %#v, Saw: %#v", lastData, data)
}
}
func TestExtractFromHttpChanges(t *testing.T) {
kubelet := Kubelet{}
changeChannel := make(chan api.ContainerManifest)
reader := startReadingSingle(changeChannel)
manifest := api.ContainerManifest{
Id: "foo",
}
newManifest := api.ContainerManifest{
Id: "bar",
}
lastData, _ := json.Marshal(manifest)
newData, _ := json.Marshal(newManifest)
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(newData),
}
testServer := httptest.NewServer(&fakeHandler)
data, err := kubelet.extractFromHTTP(lastData, testServer.URL, changeChannel)
close(changeChannel)
read := reader.GetList()
if err != nil {
t.Errorf("Unexpected error: %#v", err)
}
if len(read) != 1 {
t.Errorf("Unexpected list: %#v", read)
}
if !bytes.Equal(newData, data) {
t.Errorf("Unexpected difference. Expected: %#v, Saw: %#v", lastData, data)
}
if !reflect.DeepEqual(newManifest, read[0]) {
t.Errorf("Unexpected difference. Expected: %#v, Saw: %#v", newManifest, read[0])
}
}
func TestWatchEtcd(t *testing.T) {
watchChannel := make(chan *etcd.Response)
changeChannel := make(chan []api.ContainerManifest)
kubelet := Kubelet{}
reader := startReading(changeChannel)
manifest := []api.ContainerManifest{
api.ContainerManifest{
Id: "foo",
},
}
data, err := json.Marshal(manifest)
expectNoError(t, err)
go kubelet.WatchEtcd(watchChannel, changeChannel)
watchChannel <- &etcd.Response{
Node: &etcd.Node{
Value: string(data),
},
}
close(watchChannel)
close(changeChannel)
read := reader.GetList()
if len(read) != 1 ||
!reflect.DeepEqual(read[0], manifest) {
t.Errorf("Unexpected manifest(s) %#v %#v", read[0], manifest)
}
}

View File

@ -30,8 +30,9 @@ type EtcdResponseWithError struct {
type FakeEtcdClient struct {
Data map[string]EtcdResponseWithError
deletedKeys []string
err error
Err error
t *testing.T
Ix int
}
func MakeFakeEtcdClient(t *testing.T) *FakeEtcdClient {
@ -42,7 +43,8 @@ func MakeFakeEtcdClient(t *testing.T) *FakeEtcdClient {
}
func (f *FakeEtcdClient) AddChild(key, data string, ttl uint64) (*etcd.Response, error) {
return f.Set(key, data, ttl)
f.Ix = f.Ix + 1
return f.Set(fmt.Sprintf("%s/%d", key, f.Ix), data, ttl)
}
func (f *FakeEtcdClient) Get(key string, sort, recursive bool) (*etcd.Response, error) {
@ -63,14 +65,14 @@ func (f *FakeEtcdClient) Set(key, value string, ttl uint64) (*etcd.Response, err
},
}
f.Data[key] = result
return result.R, f.err
return result.R, f.Err
}
func (f *FakeEtcdClient) Create(key, value string, ttl uint64) (*etcd.Response, error) {
return f.Set(key, value, ttl)
}
func (f *FakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, error) {
f.deletedKeys = append(f.deletedKeys, key)
return &etcd.Response{}, f.err
return &etcd.Response{}, f.Err
}
func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) {