mirror of https://github.com/k3s-io/k3s
Merge pull request #77 from brendandburns/kublet
Expand unit tests, coverage now to 56.9%pull/6/head
commit
e1a5c6268e
|
@ -21,10 +21,12 @@ import (
|
|||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/exec"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
@ -237,18 +239,15 @@ func dockerNameToManifestAndContainer(name string) (manifestId, containerName st
|
|||
return
|
||||
}
|
||||
|
||||
func (kl *Kubelet) RunContainer(manifest *api.ContainerManifest, container *api.Container) (name string, err error) {
|
||||
err = kl.pullImage(container.Image)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
name = manifestAndContainerToDockerName(manifest, container)
|
||||
envVariables := []string{}
|
||||
func makeEnvironmentVariables(container *api.Container) []string {
|
||||
var result []string
|
||||
for _, value := range container.Env {
|
||||
envVariables = append(envVariables, fmt.Sprintf("%s=%s", value.Name, value.Value))
|
||||
result = append(result, fmt.Sprintf("%s=%s", value.Name, value.Value))
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func makeVolumesAndBinds(container *api.Container) (map[string]struct{}, []string) {
|
||||
volumes := map[string]struct{}{}
|
||||
binds := []string{}
|
||||
for _, volume := range container.VolumeMounts {
|
||||
|
@ -259,7 +258,10 @@ func (kl *Kubelet) RunContainer(manifest *api.ContainerManifest, container *api.
|
|||
}
|
||||
binds = append(binds, basePath)
|
||||
}
|
||||
return volumes, binds
|
||||
}
|
||||
|
||||
func makePortsAndBindings(container *api.Container) (map[docker.Port]struct{}, map[docker.Port][]docker.PortBinding) {
|
||||
exposedPorts := map[docker.Port]struct{}{}
|
||||
portBindings := map[docker.Port][]docker.PortBinding{}
|
||||
for _, port := range container.Ports {
|
||||
|
@ -275,10 +277,24 @@ func (kl *Kubelet) RunContainer(manifest *api.ContainerManifest, container *api.
|
|||
},
|
||||
}
|
||||
}
|
||||
return exposedPorts, portBindings
|
||||
}
|
||||
|
||||
func makeCommandLine(container *api.Container) []string {
|
||||
var cmdList []string
|
||||
if len(container.Command) > 0 {
|
||||
cmdList = strings.Split(container.Command, " ")
|
||||
}
|
||||
return cmdList
|
||||
}
|
||||
|
||||
func (kl *Kubelet) RunContainer(manifest *api.ContainerManifest, container *api.Container) (name string, err error) {
|
||||
name = manifestAndContainerToDockerName(manifest, container)
|
||||
|
||||
envVariables := makeEnvironmentVariables(container)
|
||||
volumes, binds := makeVolumesAndBinds(container)
|
||||
exposedPorts, portBindings := makePortsAndBindings(container)
|
||||
|
||||
opts := docker.CreateContainerOptions{
|
||||
Name: name,
|
||||
Config: &docker.Config{
|
||||
|
@ -287,7 +303,7 @@ func (kl *Kubelet) RunContainer(manifest *api.ContainerManifest, container *api.
|
|||
Env: envVariables,
|
||||
Volumes: volumes,
|
||||
WorkingDir: container.WorkingDir,
|
||||
Cmd: cmdList,
|
||||
Cmd: makeCommandLine(container),
|
||||
},
|
||||
}
|
||||
dockerContainer, err := kl.DockerClient.CreateContainer(opts)
|
||||
|
@ -320,76 +336,78 @@ func (kl *Kubelet) KillContainer(name string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
func (kl *Kubelet) extractFromFile(lastData []byte, name string, changeChannel chan<- api.ContainerManifest) ([]byte, error) {
|
||||
var file *os.File
|
||||
var err error
|
||||
if file, err = os.Open(name); err != nil {
|
||||
return lastData, err
|
||||
}
|
||||
|
||||
return kl.extractFromReader(lastData, file, changeChannel)
|
||||
}
|
||||
|
||||
func (kl *Kubelet) extractFromReader(lastData []byte, reader io.Reader, changeChannel chan<- api.ContainerManifest) ([]byte, error) {
|
||||
var manifest api.ContainerManifest
|
||||
data, err := ioutil.ReadAll(reader)
|
||||
if err != nil {
|
||||
log.Printf("Couldn't read file: %v", err)
|
||||
return lastData, err
|
||||
}
|
||||
if err = kl.ExtractYAMLData(data, &manifest); err != nil {
|
||||
return lastData, err
|
||||
}
|
||||
if !bytes.Equal(lastData, data) {
|
||||
lastData = data
|
||||
// Ok, we have a valid configuration, send to channel for
|
||||
// rejiggering.
|
||||
changeChannel <- manifest
|
||||
return data, nil
|
||||
}
|
||||
return lastData, nil
|
||||
}
|
||||
|
||||
// Watch a file for changes to the set of pods that should run on this Kubelet
|
||||
// This function loops forever and is intended to be run as a goroutine
|
||||
func (kl *Kubelet) WatchFile(file string, changeChannel chan<- api.ContainerManifest) {
|
||||
var lastData []byte
|
||||
for {
|
||||
var err error
|
||||
time.Sleep(kl.FileCheckFrequency)
|
||||
var manifest api.ContainerManifest
|
||||
data, err := ioutil.ReadFile(file)
|
||||
lastData, err = kl.extractFromFile(lastData, file, changeChannel)
|
||||
if err != nil {
|
||||
log.Printf("Couldn't read file: %s : %v", file, err)
|
||||
continue
|
||||
}
|
||||
if err = kl.ExtractYAMLData(data, &manifest); err != nil {
|
||||
continue
|
||||
}
|
||||
if !bytes.Equal(lastData, data) {
|
||||
lastData = data
|
||||
// Ok, we have a valid configuration, send to channel for
|
||||
// rejiggering.
|
||||
changeChannel <- manifest
|
||||
continue
|
||||
log.Printf("Error polling file: %#v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (kl *Kubelet) extractFromHTTP(lastData []byte, url string, changeChannel chan<- api.ContainerManifest) ([]byte, error) {
|
||||
client := &http.Client{}
|
||||
request, err := http.NewRequest("GET", url, nil)
|
||||
if err != nil {
|
||||
return lastData, err
|
||||
}
|
||||
response, err := client.Do(request)
|
||||
if err != nil {
|
||||
return lastData, err
|
||||
}
|
||||
defer response.Body.Close()
|
||||
return kl.extractFromReader(lastData, response.Body, changeChannel)
|
||||
}
|
||||
|
||||
// Watch an HTTP endpoint for changes to the set of pods that should run on this Kubelet
|
||||
// This function runs forever and is intended to be run as a goroutine
|
||||
func (kl *Kubelet) WatchHTTP(url string, changeChannel chan<- api.ContainerManifest) {
|
||||
var lastData []byte
|
||||
client := &http.Client{}
|
||||
for {
|
||||
var err error
|
||||
time.Sleep(kl.HTTPCheckFrequency)
|
||||
var config api.ContainerManifest
|
||||
data, err := kl.SyncHTTP(client, url, &config)
|
||||
log.Printf("Containers: %#v", config)
|
||||
lastData, err = kl.extractFromHTTP(lastData, url, changeChannel)
|
||||
if err != nil {
|
||||
log.Printf("Error syncing HTTP: %#v", err)
|
||||
continue
|
||||
}
|
||||
if !bytes.Equal(lastData, data) {
|
||||
lastData = data
|
||||
changeChannel <- config
|
||||
continue
|
||||
log.Printf("Error syncing http: %#v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// SyncHTTP reads from url a yaml manifest and populates config. Returns the
|
||||
// raw bytes, if something was read. Returns an error if something goes wrong.
|
||||
// 'client' is used to execute the request, to allow caching of clients.
|
||||
func (kl *Kubelet) SyncHTTP(client *http.Client, url string, config *api.ContainerManifest) ([]byte, error) {
|
||||
request, err := http.NewRequest("GET", url, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
response, err := client.Do(request)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer response.Body.Close()
|
||||
body, err := ioutil.ReadAll(response.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = kl.ExtractYAMLData(body, &config); err != nil {
|
||||
return body, err
|
||||
}
|
||||
return body, nil
|
||||
}
|
||||
|
||||
// Take an etcd Response object, and turn it into a structured list of containers
|
||||
// Return a list of containers, or an error if one occurs.
|
||||
func (kl *Kubelet) ResponseToManifests(response *etcd.Response) ([]api.ContainerManifest, error) {
|
||||
|
@ -473,30 +491,32 @@ func (kl *Kubelet) ExtractYAMLData(buf []byte, output interface{}) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (kl *Kubelet) extractFromEtcd(response *etcd.Response) ([]api.ContainerManifest, error) {
|
||||
var manifests []api.ContainerManifest
|
||||
if response.Node == nil || len(response.Node.Value) == 0 {
|
||||
return manifests, fmt.Errorf("No nodes field: %#v", response)
|
||||
}
|
||||
err := kl.ExtractYAMLData([]byte(response.Node.Value), &manifests)
|
||||
return manifests, err
|
||||
}
|
||||
|
||||
// Watch etcd for changes, receives config objects from the etcd client watch.
|
||||
// This function loops forever and is intended to be run as a goroutine.
|
||||
// This function loops until the watchChannel is closed, and is intended to be run as a goroutine.
|
||||
func (kl *Kubelet) WatchEtcd(watchChannel <-chan *etcd.Response, changeChannel chan<- []api.ContainerManifest) {
|
||||
defer util.HandleCrash()
|
||||
for {
|
||||
watchResponse := <-watchChannel
|
||||
log.Printf("Got change: %#v", watchResponse)
|
||||
|
||||
// This means the channel has been closed.
|
||||
if watchResponse == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if watchResponse.Node == nil || len(watchResponse.Node.Value) == 0 {
|
||||
log.Printf("No nodes field: %#v", watchResponse)
|
||||
if watchResponse.Node != nil {
|
||||
log.Printf("Node: %#v", watchResponse.Node)
|
||||
}
|
||||
}
|
||||
log.Printf("Got data: %v", watchResponse.Node.Value)
|
||||
var manifests []api.ContainerManifest
|
||||
if err := kl.ExtractYAMLData([]byte(watchResponse.Node.Value), &manifests); err != nil {
|
||||
manifests, err := kl.extractFromEtcd(watchResponse)
|
||||
if err != nil {
|
||||
log.Printf("Error handling response from etcd: %#v", err)
|
||||
continue
|
||||
}
|
||||
log.Printf("manifests: %#v", manifests)
|
||||
// Ok, we have a valid configuration, send to channel for
|
||||
// rejiggering.
|
||||
changeChannel <- manifests
|
||||
|
@ -518,6 +538,11 @@ func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error {
|
|||
}
|
||||
if !exists {
|
||||
log.Printf("%#v doesn't exist, creating", element)
|
||||
err = kl.pullImage(element.Image)
|
||||
if err != nil {
|
||||
log.Printf("Error pulling container: %#v", err)
|
||||
continue
|
||||
}
|
||||
actualName, err = kl.RunContainer(&manifest, &element)
|
||||
// For some reason, list gives back names that start with '/'
|
||||
actualName = "/" + actualName
|
||||
|
|
|
@ -16,10 +16,13 @@ limitations under the License.
|
|||
package kubelet
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"io/ioutil"
|
||||
"net/http/httptest"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
|
@ -191,6 +194,14 @@ func TestContainerExists(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Errorf("Unexpected error: %#v", err)
|
||||
}
|
||||
|
||||
fakeDocker.clearCalls()
|
||||
missingManifest := api.ContainerManifest{Id: "foobar"}
|
||||
exists, _, err = kubelet.ContainerExists(&missingManifest, &container)
|
||||
verifyCalls(t, fakeDocker, []string{"list"})
|
||||
if exists {
|
||||
t.Errorf("Failed to not find container %#v, missingManifest")
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetContainerID(t *testing.T) {
|
||||
|
@ -322,42 +333,6 @@ func TestKillContainer(t *testing.T) {
|
|||
verifyCalls(t, fakeDocker, []string{"list", "stop"})
|
||||
}
|
||||
|
||||
func TestSyncHTTP(t *testing.T) {
|
||||
containers := api.ContainerManifest{
|
||||
Containers: []api.Container{
|
||||
{
|
||||
Name: "foo",
|
||||
Image: "dockerfile/foo",
|
||||
},
|
||||
{
|
||||
Name: "bar",
|
||||
Image: "dockerfile/bar",
|
||||
},
|
||||
},
|
||||
}
|
||||
data, _ := json.Marshal(containers)
|
||||
fakeHandler := util.FakeHandler{
|
||||
StatusCode: 200,
|
||||
ResponseBody: string(data),
|
||||
}
|
||||
testServer := httptest.NewServer(&fakeHandler)
|
||||
kubelet := Kubelet{}
|
||||
|
||||
var containersOut api.ContainerManifest
|
||||
data, err := kubelet.SyncHTTP(&http.Client{}, testServer.URL, &containersOut)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %#v", err)
|
||||
}
|
||||
if len(containers.Containers) != len(containersOut.Containers) {
|
||||
t.Errorf("Container sizes don't match. Expected: %d Received %d, %#v", len(containers.Containers), len(containersOut.Containers), containersOut)
|
||||
}
|
||||
expectedData, _ := json.Marshal(containers)
|
||||
actualData, _ := json.Marshal(containersOut)
|
||||
if string(expectedData) != string(actualData) {
|
||||
t.Errorf("Container data doesn't match. Expected: %s Received %s", string(expectedData), string(actualData))
|
||||
}
|
||||
}
|
||||
|
||||
func TestResponseToContainersNil(t *testing.T) {
|
||||
kubelet := Kubelet{}
|
||||
list, err := kubelet.ResponseToManifests(&etcd.Response{Node: nil})
|
||||
|
@ -560,3 +535,331 @@ func TestSyncManifestsDeletes(t *testing.T) {
|
|||
t.Errorf("Unexpected call sequence: %#v", fakeDocker.called)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventWriting(t *testing.T) {
|
||||
fakeEtcd := registry.MakeFakeEtcdClient(t)
|
||||
kubelet := &Kubelet{
|
||||
Client: fakeEtcd,
|
||||
}
|
||||
expectedEvent := api.Event{
|
||||
Event: "test",
|
||||
Container: &api.Container{
|
||||
Name: "foo",
|
||||
},
|
||||
}
|
||||
err := kubelet.LogEvent(&expectedEvent)
|
||||
expectNoError(t, err)
|
||||
if fakeEtcd.Ix != 1 {
|
||||
t.Errorf("Unexpected number of children added: %d, expected 1", fakeEtcd.Ix)
|
||||
}
|
||||
response, err := fakeEtcd.Get("/events/foo/1", false, false)
|
||||
expectNoError(t, err)
|
||||
var event api.Event
|
||||
err = json.Unmarshal([]byte(response.Node.Value), &event)
|
||||
expectNoError(t, err)
|
||||
if event.Event != expectedEvent.Event ||
|
||||
event.Container.Name != expectedEvent.Container.Name {
|
||||
t.Errorf("Event's don't match. Expected: %#v Saw: %#v", expectedEvent, event)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventWritingError(t *testing.T) {
|
||||
fakeEtcd := registry.MakeFakeEtcdClient(t)
|
||||
kubelet := &Kubelet{
|
||||
Client: fakeEtcd,
|
||||
}
|
||||
fakeEtcd.Err = fmt.Errorf("Test error")
|
||||
err := kubelet.LogEvent(&api.Event{
|
||||
Event: "test",
|
||||
Container: &api.Container{
|
||||
Name: "foo",
|
||||
},
|
||||
})
|
||||
if err == nil {
|
||||
t.Errorf("Unexpected non-error")
|
||||
}
|
||||
}
|
||||
|
||||
func TestMakeCommandLine(t *testing.T) {
|
||||
expected := []string{"echo", "hello", "world"}
|
||||
container := api.Container{
|
||||
Command: strings.Join(expected, " "),
|
||||
}
|
||||
cmdLine := makeCommandLine(&container)
|
||||
if !reflect.DeepEqual(expected, cmdLine) {
|
||||
t.Error("Unexpected command line. Expected %#v, got %#v", expected, cmdLine)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMakeEnvVariables(t *testing.T) {
|
||||
container := api.Container{
|
||||
Env: []api.EnvVar{
|
||||
api.EnvVar{
|
||||
Name: "foo",
|
||||
Value: "bar",
|
||||
},
|
||||
api.EnvVar{
|
||||
Name: "baz",
|
||||
Value: "blah",
|
||||
},
|
||||
},
|
||||
}
|
||||
vars := makeEnvironmentVariables(&container)
|
||||
if len(vars) != len(container.Env) {
|
||||
t.Errorf("Vars don't match. Expected: %#v Found: %#v", container.Env, vars)
|
||||
}
|
||||
for ix, env := range container.Env {
|
||||
value := fmt.Sprintf("%s=%s", env.Name, env.Value)
|
||||
if value != vars[ix] {
|
||||
t.Errorf("Unexpected value: %s. Expected: %s", vars[ix], value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestMakeVolumesAndBinds(t *testing.T) {
|
||||
container := api.Container{
|
||||
VolumeMounts: []api.VolumeMount{
|
||||
api.VolumeMount{
|
||||
MountPath: "/mnt/path",
|
||||
Name: "disk",
|
||||
ReadOnly: false,
|
||||
},
|
||||
api.VolumeMount{
|
||||
MountPath: "/mnt/path2",
|
||||
Name: "disk2",
|
||||
ReadOnly: true,
|
||||
},
|
||||
},
|
||||
}
|
||||
volumes, binds := makeVolumesAndBinds(&container)
|
||||
if len(volumes) != len(container.VolumeMounts) ||
|
||||
len(binds) != len(container.VolumeMounts) {
|
||||
t.Errorf("Unexpected volumes and binds: %#v %#v. Container was: %#v", volumes, binds, container)
|
||||
}
|
||||
for ix, volume := range container.VolumeMounts {
|
||||
expectedBind := "/exports/" + volume.Name + ":" + volume.MountPath
|
||||
if volume.ReadOnly {
|
||||
expectedBind = expectedBind + ":ro"
|
||||
}
|
||||
if binds[ix] != expectedBind {
|
||||
t.Errorf("Unexpected bind. Expected %s. Found %s", expectedBind, binds[ix])
|
||||
}
|
||||
if _, ok := volumes[volume.MountPath]; !ok {
|
||||
t.Errorf("Map is missing key: %s. %#v", volume.MountPath, volumes)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestMakePortsAndBindings(t *testing.T) {
|
||||
container := api.Container{
|
||||
Ports: []api.Port{
|
||||
api.Port{
|
||||
ContainerPort: 80,
|
||||
HostPort: 8080,
|
||||
},
|
||||
api.Port{
|
||||
ContainerPort: 443,
|
||||
HostPort: 443,
|
||||
},
|
||||
},
|
||||
}
|
||||
exposedPorts, bindings := makePortsAndBindings(&container)
|
||||
if len(container.Ports) != len(exposedPorts) ||
|
||||
len(container.Ports) != len(bindings) {
|
||||
t.Errorf("Unexpected ports and bindings, %#v %#v %#v", container, exposedPorts, bindings)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExtractFromNonExistentFile(t *testing.T) {
|
||||
kubelet := Kubelet{}
|
||||
changeChannel := make(chan api.ContainerManifest)
|
||||
lastData := []byte{1, 2, 3}
|
||||
data, err := kubelet.extractFromFile(lastData, "/some/fake/file", changeChannel)
|
||||
if err == nil {
|
||||
t.Error("Unexpected non-error.")
|
||||
}
|
||||
if !bytes.Equal(data, lastData) {
|
||||
t.Errorf("Unexpected data response. Expected %#v, found %#v", lastData, data)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExtractFromBadDataFile(t *testing.T) {
|
||||
kubelet := Kubelet{}
|
||||
changeChannel := make(chan api.ContainerManifest)
|
||||
lastData := []byte{1, 2, 3}
|
||||
file, err := ioutil.TempFile("", "foo")
|
||||
expectNoError(t, err)
|
||||
name := file.Name()
|
||||
file.Close()
|
||||
ioutil.WriteFile(name, lastData, 0755)
|
||||
data, err := kubelet.extractFromFile(lastData, name, changeChannel)
|
||||
|
||||
if err == nil {
|
||||
t.Error("Unexpected non-error.")
|
||||
}
|
||||
if !bytes.Equal(data, lastData) {
|
||||
t.Errorf("Unexpected data response. Expected %#v, found %#v", lastData, data)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExtractFromSameDataFile(t *testing.T) {
|
||||
kubelet := Kubelet{}
|
||||
changeChannel := make(chan api.ContainerManifest)
|
||||
manifest := api.ContainerManifest{
|
||||
Id: "foo",
|
||||
}
|
||||
lastData, err := json.Marshal(manifest)
|
||||
expectNoError(t, err)
|
||||
file, err := ioutil.TempFile("", "foo")
|
||||
expectNoError(t, err)
|
||||
name := file.Name()
|
||||
expectNoError(t, file.Close())
|
||||
ioutil.WriteFile(name, lastData, 0755)
|
||||
data, err := kubelet.extractFromFile(lastData, name, changeChannel)
|
||||
|
||||
expectNoError(t, err)
|
||||
if !bytes.Equal(data, lastData) {
|
||||
t.Errorf("Unexpected data response. Expected %#v, found %#v", lastData, data)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExtractFromChangedDataFile(t *testing.T) {
|
||||
kubelet := Kubelet{}
|
||||
changeChannel := make(chan api.ContainerManifest)
|
||||
reader := startReadingSingle(changeChannel)
|
||||
oldManifest := api.ContainerManifest{
|
||||
Id: "foo",
|
||||
}
|
||||
newManifest := api.ContainerManifest{
|
||||
Id: "bar",
|
||||
}
|
||||
lastData, err := json.Marshal(oldManifest)
|
||||
expectNoError(t, err)
|
||||
newData, err := json.Marshal(newManifest)
|
||||
expectNoError(t, err)
|
||||
file, err := ioutil.TempFile("", "foo")
|
||||
expectNoError(t, err)
|
||||
name := file.Name()
|
||||
expectNoError(t, file.Close())
|
||||
ioutil.WriteFile(name, newData, 0755)
|
||||
data, err := kubelet.extractFromFile(lastData, name, changeChannel)
|
||||
close(changeChannel)
|
||||
|
||||
expectNoError(t, err)
|
||||
if !bytes.Equal(data, newData) {
|
||||
t.Errorf("Unexpected data response. Expected %#v, found %#v", lastData, data)
|
||||
}
|
||||
read := reader.GetList()
|
||||
if len(read) != 1 {
|
||||
t.Errorf("Unexpected channel traffic: %#v", read)
|
||||
}
|
||||
if !reflect.DeepEqual(read[0], newManifest) {
|
||||
t.Errorf("Unexpected difference. Expected %#v, got %#v", newManifest, read[0])
|
||||
}
|
||||
}
|
||||
|
||||
func TestExtractFromHttpBadness(t *testing.T) {
|
||||
kubelet := Kubelet{}
|
||||
lastData := []byte{1, 2, 3}
|
||||
changeChannel := make(chan api.ContainerManifest)
|
||||
data, err := kubelet.extractFromHTTP(lastData, "http://localhost:12345", changeChannel)
|
||||
if err == nil {
|
||||
t.Error("Unexpected non-error.")
|
||||
}
|
||||
if !bytes.Equal(lastData, data) {
|
||||
t.Errorf("Unexpected difference. Expected: %#v, Saw: %#v", lastData, data)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExtractFromHttpNoChange(t *testing.T) {
|
||||
kubelet := Kubelet{}
|
||||
changeChannel := make(chan api.ContainerManifest)
|
||||
|
||||
manifest := api.ContainerManifest{
|
||||
Id: "foo",
|
||||
}
|
||||
lastData, err := json.Marshal(manifest)
|
||||
|
||||
fakeHandler := util.FakeHandler{
|
||||
StatusCode: 200,
|
||||
ResponseBody: string(lastData),
|
||||
}
|
||||
testServer := httptest.NewServer(&fakeHandler)
|
||||
|
||||
data, err := kubelet.extractFromHTTP(lastData, testServer.URL, changeChannel)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %#v", err)
|
||||
}
|
||||
if !bytes.Equal(lastData, data) {
|
||||
t.Errorf("Unexpected difference. Expected: %#v, Saw: %#v", lastData, data)
|
||||
}
|
||||
}
|
||||
|
||||
func TestExtractFromHttpChanges(t *testing.T) {
|
||||
kubelet := Kubelet{}
|
||||
changeChannel := make(chan api.ContainerManifest)
|
||||
reader := startReadingSingle(changeChannel)
|
||||
|
||||
manifest := api.ContainerManifest{
|
||||
Id: "foo",
|
||||
}
|
||||
newManifest := api.ContainerManifest{
|
||||
Id: "bar",
|
||||
}
|
||||
lastData, _ := json.Marshal(manifest)
|
||||
newData, _ := json.Marshal(newManifest)
|
||||
fakeHandler := util.FakeHandler{
|
||||
StatusCode: 200,
|
||||
ResponseBody: string(newData),
|
||||
}
|
||||
testServer := httptest.NewServer(&fakeHandler)
|
||||
|
||||
data, err := kubelet.extractFromHTTP(lastData, testServer.URL, changeChannel)
|
||||
close(changeChannel)
|
||||
|
||||
read := reader.GetList()
|
||||
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %#v", err)
|
||||
}
|
||||
if len(read) != 1 {
|
||||
t.Errorf("Unexpected list: %#v", read)
|
||||
}
|
||||
if !bytes.Equal(newData, data) {
|
||||
t.Errorf("Unexpected difference. Expected: %#v, Saw: %#v", lastData, data)
|
||||
}
|
||||
if !reflect.DeepEqual(newManifest, read[0]) {
|
||||
t.Errorf("Unexpected difference. Expected: %#v, Saw: %#v", newManifest, read[0])
|
||||
}
|
||||
}
|
||||
|
||||
func TestWatchEtcd(t *testing.T) {
|
||||
watchChannel := make(chan *etcd.Response)
|
||||
changeChannel := make(chan []api.ContainerManifest)
|
||||
kubelet := Kubelet{}
|
||||
reader := startReading(changeChannel)
|
||||
|
||||
manifest := []api.ContainerManifest{
|
||||
api.ContainerManifest{
|
||||
Id: "foo",
|
||||
},
|
||||
}
|
||||
data, err := json.Marshal(manifest)
|
||||
expectNoError(t, err)
|
||||
|
||||
go kubelet.WatchEtcd(watchChannel, changeChannel)
|
||||
|
||||
watchChannel <- &etcd.Response{
|
||||
Node: &etcd.Node{
|
||||
Value: string(data),
|
||||
},
|
||||
}
|
||||
close(watchChannel)
|
||||
close(changeChannel)
|
||||
|
||||
read := reader.GetList()
|
||||
if len(read) != 1 ||
|
||||
!reflect.DeepEqual(read[0], manifest) {
|
||||
t.Errorf("Unexpected manifest(s) %#v %#v", read[0], manifest)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,8 +30,9 @@ type EtcdResponseWithError struct {
|
|||
type FakeEtcdClient struct {
|
||||
Data map[string]EtcdResponseWithError
|
||||
deletedKeys []string
|
||||
err error
|
||||
Err error
|
||||
t *testing.T
|
||||
Ix int
|
||||
}
|
||||
|
||||
func MakeFakeEtcdClient(t *testing.T) *FakeEtcdClient {
|
||||
|
@ -42,7 +43,8 @@ func MakeFakeEtcdClient(t *testing.T) *FakeEtcdClient {
|
|||
}
|
||||
|
||||
func (f *FakeEtcdClient) AddChild(key, data string, ttl uint64) (*etcd.Response, error) {
|
||||
return f.Set(key, data, ttl)
|
||||
f.Ix = f.Ix + 1
|
||||
return f.Set(fmt.Sprintf("%s/%d", key, f.Ix), data, ttl)
|
||||
}
|
||||
|
||||
func (f *FakeEtcdClient) Get(key string, sort, recursive bool) (*etcd.Response, error) {
|
||||
|
@ -63,14 +65,14 @@ func (f *FakeEtcdClient) Set(key, value string, ttl uint64) (*etcd.Response, err
|
|||
},
|
||||
}
|
||||
f.Data[key] = result
|
||||
return result.R, f.err
|
||||
return result.R, f.Err
|
||||
}
|
||||
func (f *FakeEtcdClient) Create(key, value string, ttl uint64) (*etcd.Response, error) {
|
||||
return f.Set(key, value, ttl)
|
||||
}
|
||||
func (f *FakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, error) {
|
||||
f.deletedKeys = append(f.deletedKeys, key)
|
||||
return &etcd.Response{}, f.err
|
||||
return &etcd.Response{}, f.Err
|
||||
}
|
||||
|
||||
func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) {
|
||||
|
|
Loading…
Reference in New Issue