Switch the Kubelet to use kubelet/config

Also transfer the Kubelet from using ContainerManifest.ID to source specific
identifiers with namespacing.  Move goroutine behavior out of kubelet/ and
into integration.go and cmd/kubelet/kubelet.go for better isolation.
pull/6/head
Clayton Coleman 2014-07-15 16:24:41 -04:00
parent 09294b90ce
commit 7767c2a2ac
7 changed files with 329 additions and 909 deletions

View File

@ -32,6 +32,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/config"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/master"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
@ -85,6 +86,9 @@ func startComponents(manifestURL string) (apiServerURL string) {
handler := delegateHandler{}
apiserver := httptest.NewServer(&handler)
etcdClient := etcd.NewClient(servers)
cl := client.New(apiserver.URL, nil)
cl.PollPeriod = time.Second * 1
cl.Sync = true
@ -93,32 +97,39 @@ func startComponents(manifestURL string) (apiServerURL string) {
m := master.New(servers, machineList, fakePodInfoGetter{}, nil, "", cl)
handler.delegate = m.ConstructHandler("/api/v1beta1")
controllerManager := controller.MakeReplicationManager(etcd.NewClient(servers), cl)
controllerManager := controller.MakeReplicationManager(etcdClient, cl)
controllerManager.Run(1 * time.Second)
// Kubelet
myKubelet := kubelet.Kubelet{
Hostname: machineList[0],
DockerClient: &fakeDocker1,
DockerPuller: &kubelet.FakeDockerPuller{},
FileCheckFrequency: 5 * time.Second,
SyncFrequency: 5 * time.Second,
HTTPCheckFrequency: 5 * time.Second,
// Kubelet (localhost)
cfg1 := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates)
config.NewSourceEtcd(config.EtcdKeyForHost(machineList[0]), etcdClient, 30*time.Second, cfg1.Channel("etcd"))
config.NewSourceURL(manifestURL, 5*time.Second, cfg1.Channel("url"))
myKubelet := &kubelet.Kubelet{
Hostname: machineList[0],
DockerClient: &fakeDocker1,
DockerPuller: &kubelet.FakeDockerPuller{},
}
go myKubelet.RunKubelet("", "", manifestURL, servers, "localhost", 10250)
go util.Forever(func() { myKubelet.Run(cfg1.Updates()) }, 0)
go util.Forever(cfg1.Sync, 3*time.Second)
go util.Forever(func() {
kubelet.ListenAndServeKubeletServer(myKubelet, cfg1.Channel("http"), http.DefaultServeMux, "localhost", 10250)
}, 0)
// Kubelet (machine)
// Create a second kubelet so that the guestbook example's two redis slaves both
// have a place they can schedule.
otherKubelet := kubelet.Kubelet{
Hostname: machineList[1],
DockerClient: &fakeDocker2,
DockerPuller: &kubelet.FakeDockerPuller{},
FileCheckFrequency: 5 * time.Second,
SyncFrequency: 5 * time.Second,
HTTPCheckFrequency: 5 * time.Second,
cfg2 := config.NewPodConfig(config.PodConfigNotificationSnapshotAndUpdates)
config.NewSourceEtcd(config.EtcdKeyForHost(machineList[1]), etcdClient, 30*time.Second, cfg2.Channel("etcd"))
otherKubelet := &kubelet.Kubelet{
Hostname: machineList[1],
DockerClient: &fakeDocker2,
DockerPuller: &kubelet.FakeDockerPuller{},
}
go otherKubelet.RunKubelet("", "", "", servers, "localhost", 10251)
go util.Forever(func() { otherKubelet.Run(cfg2.Updates()) }, 0)
go util.Forever(cfg2.Sync, 3*time.Second)
go util.Forever(func() {
kubelet.ListenAndServeKubeletServer(otherKubelet, cfg2.Channel("http"), http.DefaultServeMux, "localhost", 10251)
}, 0)
return apiserver.URL
}

View File

@ -23,16 +23,20 @@ package main
import (
"flag"
"math/rand"
"net/http"
"os"
"os/exec"
"strings"
"time"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/healthz"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
kconfig "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/config"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/coreos/go-etcd/etcd"
"github.com/fsouza/go-dockerclient"
"github.com/golang/glog"
"github.com/google/cadvisor/client"
)
var (
@ -77,7 +81,7 @@ func getHostname() string {
}
hostname = fqdn
}
return string(hostname)
return strings.TrimSpace(string(hostname))
}
func main() {
@ -93,12 +97,57 @@ func main() {
glog.Fatal("Couldn't connect to docker.")
}
k := kubelet.Kubelet{
Hostname: getHostname(),
DockerClient: dockerClient,
FileCheckFrequency: *fileCheckFrequency,
SyncFrequency: *syncFrequency,
HTTPCheckFrequency: *httpCheckFrequency,
cadvisorClient, err := cadvisor.NewClient("http://127.0.0.1:5000")
if err != nil {
glog.Errorf("Error on creating cadvisor client: %v", err)
}
k.RunKubelet(*dockerEndpoint, *config, *manifestURL, etcdServerList, *address, *port)
hostname := getHostname()
k := &kubelet.Kubelet{
Hostname: hostname,
DockerClient: dockerClient,
CadvisorClient: cadvisorClient,
}
// source of all configuration
cfg := kconfig.NewPodConfig(kconfig.PodConfigNotificationSnapshotAndUpdates)
// define file config source
if *config != "" {
kconfig.NewSourceFile(*config, *fileCheckFrequency, cfg.Channel("file"))
}
// define url config source
if *manifestURL != "" {
kconfig.NewSourceURL(*manifestURL, *httpCheckFrequency, cfg.Channel("http"))
}
// define etcd config source and initialize etcd client
if len(etcdServerList) > 0 {
glog.Infof("Watching for etcd configs at %v", etcdServerList)
k.EtcdClient = etcd.NewClient(etcdServerList)
kconfig.NewSourceEtcd(kconfig.EtcdKeyForHost(hostname), k.EtcdClient, 30*time.Second, cfg.Channel("etcd"))
}
// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
// up into "per source" synchronizations
// start the kubelet
go util.Forever(func() { k.Run(cfg.Updates()) }, 0)
// resynchronize periodically
// TODO: make this part of PodConfig so that it is only delivered after syncFrequency has elapsed without
// an update
go util.Forever(cfg.Sync, *syncFrequency)
// start the kubelet server
if *address != "" {
go util.Forever(func() {
kubelet.ListenAndServeKubeletServer(k, cfg.Channel("http"), http.DefaultServeMux, *address, *port)
}, 0)
}
// runs forever
select {}
}

View File

@ -174,14 +174,14 @@ func unescapeDash(in string) (out string) {
const containerNamePrefix = "k8s"
// Creates a name which can be reversed to identify both manifest id and container name.
func buildDockerName(manifest *api.ContainerManifest, container *api.Container) string {
func buildDockerName(pod *Pod, container *api.Container) string {
// Note, manifest.ID could be blank.
return fmt.Sprintf("%s--%s--%s--%08x", containerNamePrefix, escapeDash(container.Name), escapeDash(manifest.ID), rand.Uint32())
return fmt.Sprintf("%s--%s--%s--%08x", containerNamePrefix, escapeDash(container.Name), escapeDash(GetPodFullName(pod)), rand.Uint32())
}
// Upacks a container name, returning the manifest id and container name we would have used to
// construct the docker name. If the docker name isn't one we created, we may return empty strings.
func parseDockerName(name string) (manifestID, containerName string) {
func parseDockerName(name string) (podFullName, containerName string) {
// For some reason docker appears to be appending '/' to names.
// If its there, strip it.
if name[0] == '/' {
@ -195,7 +195,7 @@ func parseDockerName(name string) (manifestID, containerName string) {
containerName = unescapeDash(parts[1])
}
if len(parts) > 2 {
manifestID = unescapeDash(parts[2])
podFullName = unescapeDash(parts[2])
}
return
}

View File

@ -20,13 +20,7 @@ import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
"path"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
@ -40,9 +34,7 @@ import (
"github.com/coreos/go-etcd/etcd"
"github.com/fsouza/go-dockerclient"
"github.com/golang/glog"
"github.com/google/cadvisor/client"
"github.com/google/cadvisor/info"
"gopkg.in/v1/yaml"
)
const defaultChanSize = 1024
@ -58,6 +50,13 @@ type CadvisorInterface interface {
MachineInfo() (*info.MachineInfo, error)
}
// SyncHandler is an interface implemented by Kubelet, for testability
type SyncHandler interface {
SyncPods([]Pod) error
}
type volumeMap map[string]volume.Interface
// New creates a new Kubelet.
// TODO: currently it is only called by test code.
// Need cleanup.
@ -65,94 +64,35 @@ func New() *Kubelet {
return &Kubelet{}
}
type volumeMap map[string]volume.Interface
// Kubelet is the main kubelet implementation.
type Kubelet struct {
Hostname string
EtcdClient tools.EtcdClient
DockerClient DockerInterface
DockerPuller DockerPuller
CadvisorClient CadvisorInterface
FileCheckFrequency time.Duration
SyncFrequency time.Duration
HTTPCheckFrequency time.Duration
pullLock sync.Mutex
HealthChecker health.HealthChecker
LogServer http.Handler
Hostname string
DockerClient DockerInterface
// Optional, no events will be sent without it
EtcdClient tools.EtcdClient
// Optional, no statistics will be available if omitted
CadvisorClient CadvisorInterface
// Optional, defaults to simple implementaiton
HealthChecker health.HealthChecker
// Optional, defaults to simple Docker implementation
DockerPuller DockerPuller
// Optional, defaults to /logs/ from /var/log
LogServer http.Handler
}
type manifestUpdate struct {
source string
manifests []api.ContainerManifest
}
const (
fileSource = "file"
etcdSource = "etcd"
httpClientSource = "http_client"
httpServerSource = "http_server"
)
// RunKubelet starts background goroutines. If config_path, manifest_url, or address are empty,
// they are not watched. Never returns.
func (kl *Kubelet) RunKubelet(dockerEndpoint, configPath, manifestURL string, etcdServers []string, address string, port uint) {
// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan PodUpdate) {
if kl.LogServer == nil {
kl.LogServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
}
if kl.CadvisorClient == nil {
var err error
kl.CadvisorClient, err = cadvisor.NewClient("http://127.0.0.1:5000")
if err != nil {
glog.Errorf("Error on creating cadvisor client: %v", err)
}
}
if kl.DockerPuller == nil {
kl.DockerPuller = NewDockerPuller(kl.DockerClient)
}
updateChannel := make(chan manifestUpdate)
if configPath != "" {
glog.Infof("Watching for file configs at %s", configPath)
go util.Forever(func() {
kl.WatchFiles(configPath, updateChannel)
}, kl.FileCheckFrequency)
if kl.HealthChecker == nil {
kl.HealthChecker = health.NewHealthChecker()
}
if manifestURL != "" {
glog.Infof("Watching for HTTP configs at %s", manifestURL)
go util.Forever(func() {
if err := kl.extractFromHTTP(manifestURL, updateChannel); err != nil {
glog.Errorf("Error syncing http: %v", err)
}
}, kl.HTTPCheckFrequency)
}
if len(etcdServers) > 0 {
glog.Infof("Watching for etcd configs at %v", etcdServers)
kl.EtcdClient = etcd.NewClient(etcdServers)
go util.Forever(func() { kl.SyncAndSetupEtcdWatch(updateChannel) }, 20*time.Second)
}
if address != "" {
glog.Infof("Starting to listen on %s:%d", address, port)
handler := Server{
Kubelet: kl,
UpdateChannel: updateChannel,
DelegateHandler: http.DefaultServeMux,
}
s := &http.Server{
Addr: net.JoinHostPort(address, strconv.FormatUint(uint64(port), 10)),
Handler: &handler,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
MaxHeaderBytes: 1 << 20,
}
go util.Forever(func() { s.ListenAndServe() }, 0)
}
kl.HealthChecker = health.NewHealthChecker()
kl.syncLoop(updateChannel, kl)
}
// SyncHandler is an interface implemented by Kubelet, for testability
type SyncHandler interface {
SyncManifests([]api.ContainerManifest) error
kl.syncLoop(updates, kl)
}
// LogEvent logs an event to the etcd backend.
@ -186,7 +126,7 @@ func makeEnvironmentVariables(container *api.Container) []string {
return result
}
func makeVolumesAndBinds(manifestID string, container *api.Container, podVolumes volumeMap) (map[string]struct{}, []string) {
func makeVolumesAndBinds(pod *Pod, container *api.Container, podVolumes volumeMap) (map[string]struct{}, []string) {
volumes := map[string]struct{}{}
binds := []string{}
for _, volume := range container.VolumeMounts {
@ -201,7 +141,7 @@ func makeVolumesAndBinds(manifestID string, container *api.Container, podVolumes
// TODO(jonesdl) This clause should be deleted and an error should be thrown. The default
// behavior is now supported by the EmptyDirectory type.
volumes[volume.MountPath] = struct{}{}
basePath = fmt.Sprintf("/exports/%s/%s:%s", manifestID, volume.Name, volume.MountPath)
basePath = fmt.Sprintf("/exports/%s/%s:%s", GetPodFullName(pod), volume.Name, volume.MountPath)
}
if volume.ReadOnly {
basePath += ":ro"
@ -268,14 +208,14 @@ func (kl *Kubelet) mountExternalVolumes(manifest *api.ContainerManifest) (volume
return podVolumes, nil
}
// Run a single container from a manifest. Returns the docker container ID
func (kl *Kubelet) runContainer(manifest *api.ContainerManifest, container *api.Container, podVolumes volumeMap, netMode string) (id DockerID, err error) {
// Run a single container from a pod. Returns the docker container ID
func (kl *Kubelet) runContainer(pod *Pod, container *api.Container, podVolumes volumeMap, netMode string) (id DockerID, err error) {
envVariables := makeEnvironmentVariables(container)
volumes, binds := makeVolumesAndBinds(manifest.ID, container, podVolumes)
volumes, binds := makeVolumesAndBinds(pod, container, podVolumes)
exposedPorts, portBindings := makePortsAndBindings(container)
opts := docker.CreateContainerOptions{
Name: buildDockerName(manifest, container),
Name: buildDockerName(pod, container),
Config: &docker.Config{
Cmd: container.Command,
Env: envVariables,
@ -301,13 +241,14 @@ func (kl *Kubelet) runContainer(manifest *api.ContainerManifest, container *api.
}
// Kill a docker container
func (kl *Kubelet) killContainer(container docker.APIContainers) error {
err := kl.DockerClient.StopContainer(container.ID, 10)
manifestID, containerName := parseDockerName(container.Names[0])
func (kl *Kubelet) killContainer(dockerContainer docker.APIContainers) error {
err := kl.DockerClient.StopContainer(dockerContainer.ID, 10)
podFullName, containerName := parseDockerName(dockerContainer.Names[0])
kl.LogEvent(&api.Event{
Event: "STOP",
Manifest: &api.ContainerManifest{
ID: manifestID,
//TODO: This should be reported using either the apiserver schema or the kubelet schema
ID: podFullName,
},
Container: &api.Container{
Name: containerName,
@ -317,247 +258,17 @@ func (kl *Kubelet) killContainer(container docker.APIContainers) error {
return err
}
func (kl *Kubelet) extractFromFile(name string) (api.ContainerManifest, error) {
var file *os.File
var err error
var manifest api.ContainerManifest
if file, err = os.Open(name); err != nil {
return manifest, err
}
defer file.Close()
data, err := ioutil.ReadAll(file)
if err != nil {
glog.Errorf("Couldn't read from file: %v", err)
return manifest, err
}
if err = kl.ExtractYAMLData(data, &manifest); err != nil {
return manifest, err
}
return manifest, nil
}
func (kl *Kubelet) extractFromDir(name string) ([]api.ContainerManifest, error) {
var manifests []api.ContainerManifest
files, err := filepath.Glob(filepath.Join(name, "[^.]*"))
if err != nil {
return manifests, err
}
sort.Strings(files)
for _, file := range files {
manifest, err := kl.extractFromFile(file)
if err != nil {
return manifests, err
}
manifests = append(manifests, manifest)
}
return manifests, nil
}
// WatchFiles watches a file or direcory of files for changes to the set of pods that
// should run on this Kubelet.
func (kl *Kubelet) WatchFiles(configPath string, updateChannel chan<- manifestUpdate) {
statInfo, err := os.Stat(configPath)
if err != nil {
if !os.IsNotExist(err) {
glog.Errorf("Error accessing path: %v", err)
}
return
}
switch {
case statInfo.Mode().IsDir():
manifests, err := kl.extractFromDir(configPath)
if err != nil {
glog.Errorf("Error polling dir: %v", err)
return
}
updateChannel <- manifestUpdate{fileSource, manifests}
case statInfo.Mode().IsRegular():
manifest, err := kl.extractFromFile(configPath)
if err != nil {
glog.Errorf("Error polling file: %v", err)
return
}
updateChannel <- manifestUpdate{fileSource, []api.ContainerManifest{manifest}}
default:
glog.Errorf("Error accessing config - not a directory or file")
}
}
func (kl *Kubelet) extractFromHTTP(url string, updateChannel chan<- manifestUpdate) error {
resp, err := http.Get(url)
if err != nil {
return err
}
defer resp.Body.Close()
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
if len(data) == 0 {
return fmt.Errorf("zero-length data received from %v", url)
}
// First try as if it's a single manifest
var manifest api.ContainerManifest
singleErr := yaml.Unmarshal(data, &manifest)
if singleErr == nil && manifest.Version == "" {
// If data is a []ContainerManifest, trying to put it into a ContainerManifest
// will not give an error but also won't set any of the fields.
// Our docs say that the version field is mandatory, so using that to judge wether
// this was actually successful.
singleErr = fmt.Errorf("got blank version field")
}
if singleErr == nil {
updateChannel <- manifestUpdate{httpClientSource, []api.ContainerManifest{manifest}}
return nil
}
// That didn't work, so try an array of manifests.
var manifests []api.ContainerManifest
multiErr := yaml.Unmarshal(data, &manifests)
// We're not sure if the person reading the logs is going to care about the single or
// multiple manifest unmarshalling attempt, so we need to put both in the logs, as is
// done at the end. Hence not returning early here.
if multiErr == nil && len(manifests) > 0 && manifests[0].Version == "" {
multiErr = fmt.Errorf("got blank version field")
}
if multiErr == nil {
updateChannel <- manifestUpdate{httpClientSource, manifests}
return nil
}
return fmt.Errorf("%v: received '%v', but couldn't parse as a "+
"single manifest (%v: %+v) or as multiple manifests (%v: %+v).\n",
url, string(data), singleErr, manifest, multiErr, manifests)
}
// ResponseToManifests takes an etcd Response object, and turns it into a structured list of containers.
// It returns a list of containers, or an error if one occurs.
func (kl *Kubelet) ResponseToManifests(response *etcd.Response) ([]api.ContainerManifest, error) {
if response.Node == nil || len(response.Node.Value) == 0 {
return nil, fmt.Errorf("no nodes field: %v", response)
}
var manifests []api.ContainerManifest
err := kl.ExtractYAMLData([]byte(response.Node.Value), &manifests)
return manifests, err
}
func (kl *Kubelet) getKubeletStateFromEtcd(key string, updateChannel chan<- manifestUpdate) error {
response, err := kl.EtcdClient.Get(key, true, false)
if err != nil {
if tools.IsEtcdNotFound(err) {
return nil
}
glog.Errorf("Error on etcd get of %s: %v", key, err)
return err
}
manifests, err := kl.ResponseToManifests(response)
if err != nil {
glog.Errorf("Error parsing response (%v): %s", response, err)
return err
}
glog.Infof("Got state from etcd: %+v", manifests)
updateChannel <- manifestUpdate{etcdSource, manifests}
return nil
}
// SyncAndSetupEtcdWatch synchronizes with etcd, and sets up an etcd watch for new configurations.
// The channel to send new configurations across
// This function loops forever and is intended to be run in a go routine.
func (kl *Kubelet) SyncAndSetupEtcdWatch(updateChannel chan<- manifestUpdate) {
key := path.Join("registry", "hosts", strings.TrimSpace(kl.Hostname), "kubelet")
// First fetch the initial configuration (watch only gives changes...)
for {
err := kl.getKubeletStateFromEtcd(key, updateChannel)
if err == nil {
// We got a successful response, etcd is up, set up the watch.
break
}
time.Sleep(30 * time.Second)
}
done := make(chan bool)
go util.Forever(func() { kl.TimeoutWatch(done) }, 0)
for {
// The etcd client will close the watch channel when it exits. So we need
// to create and service a new one every time.
watchChannel := make(chan *etcd.Response)
// We don't push this through Forever because if it dies, we just do it again in 30 secs.
// anyway.
go kl.WatchEtcd(watchChannel, updateChannel)
kl.getKubeletStateFromEtcd(key, updateChannel)
glog.V(1).Infof("Setting up a watch for configuration changes in etcd for %s", key)
kl.EtcdClient.Watch(key, 0, true, watchChannel, done)
}
}
// TimeoutWatch timeout the watch after 30 seconds.
func (kl *Kubelet) TimeoutWatch(done chan bool) {
t := time.Tick(30 * time.Second)
for _ = range t {
done <- true
}
}
// ExtractYAMLData extracts data from YAML file into a list of containers.
func (kl *Kubelet) ExtractYAMLData(buf []byte, output interface{}) error {
if err := yaml.Unmarshal(buf, output); err != nil {
glog.Errorf("Couldn't unmarshal configuration: %v", err)
return err
}
return nil
}
func (kl *Kubelet) extractFromEtcd(response *etcd.Response) ([]api.ContainerManifest, error) {
var manifests []api.ContainerManifest
if response.Node == nil || len(response.Node.Value) == 0 {
return manifests, fmt.Errorf("no nodes field: %v", response)
}
err := kl.ExtractYAMLData([]byte(response.Node.Value), &manifests)
return manifests, err
}
// WatchEtcd watches etcd for changes, receives config objects from the etcd client watch.
// This function loops until the watchChannel is closed, and is intended to be run as a goroutine.
func (kl *Kubelet) WatchEtcd(watchChannel <-chan *etcd.Response, updateChannel chan<- manifestUpdate) {
defer util.HandleCrash()
for {
watchResponse := <-watchChannel
// This means the channel has been closed.
if watchResponse == nil {
return
}
glog.Infof("Got etcd change: %v", watchResponse)
manifests, err := kl.extractFromEtcd(watchResponse)
if err != nil {
glog.Errorf("Error handling response from etcd: %v", err)
continue
}
glog.Infof("manifests: %+v", manifests)
// Ok, we have a valid configuration, send to channel for
// rejiggering.
updateChannel <- manifestUpdate{etcdSource, manifests}
}
}
const (
networkContainerName = "net"
networkContainerImage = "kubernetes/pause:latest"
)
// Create a network container for a manifest. Returns the docker container ID of the newly created container.
func (kl *Kubelet) createNetworkContainer(manifest *api.ContainerManifest) (DockerID, error) {
// createNetworkContainer starts the network container for a pod. Returns the docker container ID of the newly created container.
func (kl *Kubelet) createNetworkContainer(pod *Pod) (DockerID, error) {
var ports []api.Port
// Docker only exports ports from the network container. Let's
// collect all of the relevant ports and export them.
for _, container := range manifest.Containers {
for _, container := range pod.Manifest.Containers {
ports = append(ports, container.Ports...)
}
container := &api.Container{
@ -566,32 +277,36 @@ func (kl *Kubelet) createNetworkContainer(manifest *api.ContainerManifest) (Dock
Ports: ports,
}
kl.DockerPuller.Pull(networkContainerImage)
return kl.runContainer(manifest, container, nil, "")
return kl.runContainer(pod, container, nil, "")
}
func (kl *Kubelet) syncManifest(manifest *api.ContainerManifest, dockerContainers DockerContainers, keepChannel chan<- DockerID) error {
// Make sure we have a network container
func (kl *Kubelet) syncPod(pod *Pod, dockerContainers DockerContainers, keepChannel chan<- DockerID) error {
podFullName := GetPodFullName(pod)
var netID DockerID
if networkDockerContainer, found := dockerContainers.FindPodContainer(manifest.ID, networkContainerName); found {
if networkDockerContainer, found := dockerContainers.FindPodContainer(podFullName, networkContainerName); found {
netID = DockerID(networkDockerContainer.ID)
} else {
dockerNetworkID, err := kl.createNetworkContainer(manifest)
glog.Infof("Network container doesn't exist, creating")
dockerNetworkID, err := kl.createNetworkContainer(pod)
if err != nil {
glog.Errorf("Failed to introspect network container. (%v) Skipping manifest %s", err, manifest.ID)
glog.Errorf("Failed to introspect network container. (%v) Skipping pod %s", err, podFullName)
return err
}
netID = dockerNetworkID
}
keepChannel <- netID
podVolumes, err := kl.mountExternalVolumes(manifest)
podVolumes, err := kl.mountExternalVolumes(&pod.Manifest)
if err != nil {
glog.Errorf("Unable to mount volumes for manifest %s: (%v)", manifest.ID, err)
glog.Errorf("Unable to mount volumes for pod %s: (%v)", podFullName, err)
}
for _, container := range manifest.Containers {
if dockerContainer, found := dockerContainers.FindPodContainer(manifest.ID, container.Name); found {
for _, container := range pod.Manifest.Containers {
if dockerContainer, found := dockerContainers.FindPodContainer(podFullName, container.Name); found {
containerID := DockerID(dockerContainer.ID)
glog.Infof("manifest %s container %s exists as %v", manifest.ID, container.Name, containerID)
glog.V(1).Infof("manifest %s container %s exists as %v", manifest.ID, container.Name, containerID)
glog.Infof("pod %s container %s exists as %v", podFullName, container.Name, containerID)
glog.V(1).Infof("pod %s container %s exists as %v", podFullName, container.Name, containerID)
// TODO: This should probably be separated out into a separate goroutine.
healthy, err := kl.healthy(container, dockerContainer)
@ -604,22 +319,22 @@ func (kl *Kubelet) syncManifest(manifest *api.ContainerManifest, dockerContainer
continue
}
glog.V(1).Infof("manifest %s container %s is unhealthy %d.", manifest.ID, container.Name, healthy)
glog.V(1).Infof("pod %s container %s is unhealthy.", podFullName, container.Name, healthy)
if err := kl.killContainer(*dockerContainer); err != nil {
glog.V(1).Infof("Failed to kill container %s: %v", containerID, err)
glog.V(1).Infof("Failed to kill container %s: %v", dockerContainer.ID, err)
continue
}
}
glog.Infof("%+v doesn't exist, creating", container)
glog.Infof("Container doesn't exist, creating %#v", container)
if err := kl.DockerPuller.Pull(container.Image); err != nil {
glog.Errorf("Failed to create container: %v skipping manifest %s container %s.", err, manifest.ID, container.Name)
glog.Errorf("Failed to pull image: %v skipping pod %s container %s.", err, podFullName, container.Name)
continue
}
containerID, err := kl.runContainer(manifest, &container, podVolumes, "container:"+string(netID))
containerID, err := kl.runContainer(pod, &container, podVolumes, "container:"+string(netID))
if err != nil {
// TODO(bburns) : Perhaps blacklist a container after N failures?
glog.Errorf("Error running manifest %s container %s: %v", manifest.ID, container.Name, err)
glog.Errorf("Error running pod %s container %s: %v", podFullName, container.Name, err)
continue
}
keepChannel <- containerID
@ -629,9 +344,10 @@ func (kl *Kubelet) syncManifest(manifest *api.ContainerManifest, dockerContainer
type empty struct{}
// SyncManifests synchronizes the configured list of containers (desired state) with the host current state.
func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error {
glog.Infof("Desired: %+v", config)
// SyncPods synchronizes the configured list of pods (desired state) with the host current state.
func (kl *Kubelet) SyncPods(pods []Pod) error {
glog.Infof("Desired [%s]: %+v", kl.Hostname, pods)
var err error
dockerIdsToKeep := map[DockerID]empty{}
keepChannel := make(chan DockerID, defaultChanSize)
waitGroup := sync.WaitGroup{}
@ -643,18 +359,18 @@ func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error {
}
// Check for any containers that need starting
for ix := range config {
for i := range pods {
waitGroup.Add(1)
go func(index int) {
defer util.HandleCrash()
defer waitGroup.Done()
// necessary to dereference by index here b/c otherwise the shared value
// in the for each is re-used.
err := kl.syncManifest(&config[index], dockerContainers, keepChannel)
err := kl.syncPod(&pods[index], dockerContainers, keepChannel)
if err != nil {
glog.Errorf("Error syncing manifest: %v skipping.", err)
glog.Errorf("Error syncing pod: %v skipping.", err)
}
}(ix)
}(i)
}
ch := make(chan bool)
go func() {
@ -663,7 +379,7 @@ func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error {
}
ch <- true
}()
if len(config) > 0 {
if len(pods) > 0 {
waitGroup.Wait()
}
close(keepChannel)
@ -687,69 +403,51 @@ func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error {
return err
}
// Check that all Port.HostPort values are unique across all manifests.
func checkHostPortConflicts(allManifests []api.ContainerManifest, newManifest *api.ContainerManifest) []error {
allErrs := []error{}
allPorts := map[int]bool{}
// filterHostPortConflicts removes pods that conflict on Port.HostPort values
func filterHostPortConflicts(pods []Pod) []Pod {
filtered := []Pod{}
ports := map[int]bool{}
extract := func(p *api.Port) int { return p.HostPort }
for i := range allManifests {
manifest := &allManifests[i]
errs := api.AccumulateUniquePorts(manifest.Containers, allPorts, extract)
if len(errs) != 0 {
allErrs = append(allErrs, errs...)
for i := range pods {
pod := &pods[i]
if errs := api.AccumulateUniquePorts(pod.Manifest.Containers, ports, extract); len(errs) != 0 {
glog.Warningf("Pod %s has conflicting ports, ignoring: %v", GetPodFullName(pod), errs)
continue
}
filtered = append(filtered, *pod)
}
if errs := api.AccumulateUniquePorts(newManifest.Containers, allPorts, extract); len(errs) != 0 {
allErrs = append(allErrs, errs...)
}
return allErrs
return filtered
}
// syncLoop is the main loop for processing changes. It watches for changes from
// four channels (file, etcd, server, and http) and creates a union of them. For
// any new change seen, will run a sync against desired state and running state. If
// no changes are seen to the configuration, will synchronize the last known desired
// state every sync_frequency seconds.
// Never returns.
func (kl *Kubelet) syncLoop(updateChannel <-chan manifestUpdate, handler SyncHandler) {
last := make(map[string][]api.ContainerManifest)
// state every sync_frequency seconds. Never returns.
func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
for {
var pods []Pod
select {
case u := <-updateChannel:
glog.Infof("Got configuration from %s: %+v", u.source, u.manifests)
last[u.source] = u.manifests
case <-time.After(kl.SyncFrequency):
}
case u := <-updates:
switch u.Op {
case SET:
glog.Infof("Containers changed [%s]", kl.Hostname)
pods = u.Pods
allManifests := []api.ContainerManifest{}
allIds := util.StringSet{}
for src, srcManifests := range last {
for i := range srcManifests {
allErrs := []error{}
case UPDATE:
//TODO: implement updates of containers
glog.Infof("Containers updated, not implemented [%s]", kl.Hostname)
continue
m := &srcManifests[i]
if allIds.Has(m.ID) {
allErrs = append(allErrs, api.ValidationError{api.ErrTypeDuplicate, "ContainerManifest.ID", m.ID})
} else {
allIds.Insert(m.ID)
}
if errs := api.ValidateManifest(m); len(errs) != 0 {
allErrs = append(allErrs, errs...)
}
// Check for host-wide HostPort conflicts.
if errs := checkHostPortConflicts(allManifests, m); len(errs) != 0 {
allErrs = append(allErrs, errs...)
}
if len(allErrs) > 0 {
glog.Warningf("Manifest from %s failed validation, ignoring: %v", src, allErrs)
}
default:
panic("syncLoop does not support incremental changes")
}
// TODO(thockin): There's no reason to collect manifests by value. Don't pessimize.
allManifests = append(allManifests, srcManifests...)
}
err := handler.SyncManifests(allManifests)
pods = filterHostPortConflicts(pods)
err := handler.SyncPods(pods)
if err != nil {
glog.Errorf("Couldn't sync containers : %v", err)
}
@ -778,12 +476,12 @@ func (kl *Kubelet) statsFromContainerPath(containerPath string, req *info.Contai
}
// GetPodInfo returns information from Docker about the containers in a pod
func (kl *Kubelet) GetPodInfo(manifestID string) (api.PodInfo, error) {
return getDockerPodInfo(kl.DockerClient, manifestID)
func (kl *Kubelet) GetPodInfo(podFullName string) (api.PodInfo, error) {
return getDockerPodInfo(kl.DockerClient, podFullName)
}
// GetContainerInfo returns stats (from Cadvisor) for a container.
func (kl *Kubelet) GetContainerInfo(manifestID, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error) {
func (kl *Kubelet) GetContainerInfo(podFullName, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error) {
if kl.CadvisorClient == nil {
return nil, nil
}
@ -791,7 +489,7 @@ func (kl *Kubelet) GetContainerInfo(manifestID, containerName string, req *info.
if err != nil {
return nil, err
}
dockerContainer, found := dockerContainers.FindPodContainer(manifestID, containerName)
dockerContainer, found := dockerContainers.FindPodContainer(podFullName, containerName)
if !found {
return nil, errors.New("couldn't find container")
}

View File

@ -19,8 +19,6 @@ package kubelet
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http/httptest"
"reflect"
"sync"
"testing"
@ -28,9 +26,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/health"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume"
"github.com/coreos/go-etcd/etcd"
"github.com/fsouza/go-dockerclient"
"github.com/google/cadvisor/info"
"github.com/stretchr/testify/mock"
@ -43,29 +39,6 @@ func expectNoError(t *testing.T, err error) {
}
}
// These are used for testing extract json (below)
type TestData struct {
Value string
Number int
}
type TestObject struct {
Name string
Data TestData
}
func verifyStringEquals(t *testing.T, actual, expected string) {
if actual != expected {
t.Errorf("Verification failed. Expected: %s, Found %s", expected, actual)
}
}
func verifyIntEquals(t *testing.T, actual, expected int) {
if actual != expected {
t.Errorf("Verification failed. Expected: %d, Found %d", expected, actual)
}
}
func verifyNoError(t *testing.T, e error) {
if e != nil {
t.Errorf("Expected no error, found %#v", e)
@ -91,17 +64,6 @@ func makeTestKubelet(t *testing.T) (*Kubelet, *tools.FakeEtcdClient, *FakeDocker
return kubelet, fakeEtcdClient, fakeDocker
}
func TestExtractJSON(t *testing.T) {
obj := TestObject{}
kubelet, _, _ := makeTestKubelet(t)
data := `{ "name": "foo", "data": { "value": "bar", "number": 10 } }`
kubelet.ExtractYAMLData([]byte(data), &obj)
verifyStringEquals(t, obj.Name, "foo")
verifyStringEquals(t, obj.Data.Value, "bar")
verifyIntEquals(t, obj.Data.Number, 10)
}
func verifyCalls(t *testing.T, fakeDocker *FakeDockerClient, calls []string) {
verifyStringArrayEquals(t, fakeDocker.called, calls)
}
@ -120,14 +82,15 @@ func verifyStringArrayEquals(t *testing.T, actual, expected []string) {
}
}
func verifyPackUnpack(t *testing.T, manifestID, containerName string) {
func verifyPackUnpack(t *testing.T, podNamespace, podName, containerName string) {
name := buildDockerName(
&api.ContainerManifest{ID: manifestID},
&Pod{Name: podName, Namespace: podNamespace},
&api.Container{Name: containerName},
)
returnedManifestID, returnedContainerName := parseDockerName(name)
if manifestID != returnedManifestID || containerName != returnedContainerName {
t.Errorf("For (%s, %s), unpacked (%s, %s)", manifestID, containerName, returnedManifestID, returnedContainerName)
podFullName := fmt.Sprintf("%s.%s", podName, podNamespace)
returnedPodFullName, returnedContainerName := parseDockerName(name)
if podFullName != returnedPodFullName || containerName != returnedContainerName {
t.Errorf("For (%s, %s), unpacked (%s, %s)", podFullName, containerName, returnedPodFullName, returnedContainerName)
}
}
@ -138,11 +101,11 @@ func verifyBoolean(t *testing.T, expected, value bool) {
}
func TestContainerManifestNaming(t *testing.T) {
verifyPackUnpack(t, "manifest1234", "container5678")
verifyPackUnpack(t, "manifest--", "container__")
verifyPackUnpack(t, "--manifest", "__container")
verifyPackUnpack(t, "m___anifest_", "container-_-")
verifyPackUnpack(t, "_m___anifest", "-_-container")
verifyPackUnpack(t, "file", "manifest1234", "container5678")
verifyPackUnpack(t, "file", "manifest--", "container__")
verifyPackUnpack(t, "file", "--manifest", "__container")
verifyPackUnpack(t, "", "m___anifest_", "container-_-")
verifyPackUnpack(t, "other", "_m___anifest", "-_-container")
}
func TestGetContainerID(t *testing.T) {
@ -224,39 +187,12 @@ func TestKillContainer(t *testing.T) {
verifyCalls(t, fakeDocker, []string{"stop"})
}
func TestResponseToContainersNil(t *testing.T) {
kubelet, _, _ := makeTestKubelet(t)
list, err := kubelet.ResponseToManifests(&etcd.Response{Node: nil})
if len(list) != 0 {
t.Errorf("Unexpected non-zero list: %#v", list)
}
if err == nil {
t.Error("Unexpected non-error")
}
}
func TestResponseToManifests(t *testing.T) {
kubelet, _, _ := makeTestKubelet(t)
list, err := kubelet.ResponseToManifests(&etcd.Response{
Node: &etcd.Node{
Value: util.MakeJSONString([]api.ContainerManifest{
{ID: "foo"},
{ID: "bar"},
}),
},
})
if len(list) != 2 || list[0].ID != "foo" || list[1].ID != "bar" {
t.Errorf("Unexpected list: %#v", list)
}
expectNoError(t, err)
}
type channelReader struct {
list [][]api.ContainerManifest
list [][]Pod
wg sync.WaitGroup
}
func startReading(channel <-chan manifestUpdate) *channelReader {
func startReading(channel <-chan interface{}) *channelReader {
cr := &channelReader{}
cr.wg.Add(1)
go func() {
@ -265,118 +201,44 @@ func startReading(channel <-chan manifestUpdate) *channelReader {
if !ok {
break
}
cr.list = append(cr.list, update.manifests)
cr.list = append(cr.list, update.(PodUpdate).Pods)
}
cr.wg.Done()
}()
return cr
}
func (cr *channelReader) GetList() [][]api.ContainerManifest {
func (cr *channelReader) GetList() [][]Pod {
cr.wg.Wait()
return cr.list
}
func TestGetKubeletStateFromEtcdNoData(t *testing.T) {
kubelet, fakeClient, _ := makeTestKubelet(t)
channel := make(chan manifestUpdate)
reader := startReading(channel)
fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{
R: &etcd.Response{},
E: nil,
}
err := kubelet.getKubeletStateFromEtcd("/registry/hosts/machine/kubelet", channel)
if err == nil {
t.Error("Unexpected no err.")
}
close(channel)
list := reader.GetList()
if len(list) != 0 {
t.Errorf("Unexpected list: %#v", list)
}
}
func TestGetKubeletStateFromEtcd(t *testing.T) {
kubelet, fakeClient, _ := makeTestKubelet(t)
channel := make(chan manifestUpdate)
reader := startReading(channel)
fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: util.MakeJSONString([]api.Container{}),
},
},
E: nil,
}
err := kubelet.getKubeletStateFromEtcd("/registry/hosts/machine/kubelet", channel)
expectNoError(t, err)
close(channel)
list := reader.GetList()
if len(list) != 1 {
t.Errorf("Unexpected list: %#v", list)
}
}
func TestGetKubeletStateFromEtcdNotFound(t *testing.T) {
kubelet, fakeClient, _ := makeTestKubelet(t)
channel := make(chan manifestUpdate)
reader := startReading(channel)
fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{
R: &etcd.Response{},
E: tools.EtcdErrorNotFound,
}
err := kubelet.getKubeletStateFromEtcd("/registry/hosts/machine/kubelet", channel)
expectNoError(t, err)
close(channel)
list := reader.GetList()
if len(list) != 0 {
t.Errorf("Unexpected list: %#v", list)
}
}
func TestGetKubeletStateFromEtcdError(t *testing.T) {
kubelet, fakeClient, _ := makeTestKubelet(t)
channel := make(chan manifestUpdate)
reader := startReading(channel)
fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{
R: &etcd.Response{},
E: &etcd.EtcdError{
ErrorCode: 200, // non not found error
},
}
err := kubelet.getKubeletStateFromEtcd("/registry/hosts/machine/kubelet", channel)
if err == nil {
t.Error("Unexpected non-error")
}
close(channel)
list := reader.GetList()
if len(list) != 0 {
t.Errorf("Unexpected list: %#v", list)
}
}
func TestSyncManifestsDoesNothing(t *testing.T) {
func TestSyncPodsDoesNothing(t *testing.T) {
kubelet, _, fakeDocker := makeTestKubelet(t)
fakeDocker.containerList = []docker.APIContainers{
{
// format is k8s--<container-id>--<manifest-id>
Names: []string{"/k8s--bar--foo"},
// format is k8s--<container-id>--<pod-fullname>
Names: []string{"/k8s--bar--foo.test"},
ID: "1234",
},
{
// network container
Names: []string{"/k8s--net--foo--"},
Names: []string{"/k8s--net--foo.test--"},
ID: "9876",
},
}
fakeDocker.container = &docker.Container{
ID: "1234",
}
err := kubelet.SyncManifests([]api.ContainerManifest{
err := kubelet.SyncPods([]Pod{
{
ID: "foo",
Containers: []api.Container{
{Name: "bar"},
Name: "foo",
Namespace: "test",
Manifest: api.ContainerManifest{
ID: "foo",
Containers: []api.Container{
{Name: "bar"},
},
},
},
})
@ -384,17 +246,17 @@ func TestSyncManifestsDoesNothing(t *testing.T) {
verifyCalls(t, fakeDocker, []string{"list", "list"})
}
func TestSyncManifestsDeletes(t *testing.T) {
func TestSyncPodsDeletes(t *testing.T) {
kubelet, _, fakeDocker := makeTestKubelet(t)
fakeDocker.containerList = []docker.APIContainers{
{
// the k8s prefix is required for the kubelet to manage the container
Names: []string{"/k8s--foo--bar"},
Names: []string{"/k8s--foo--bar.test"},
ID: "1234",
},
{
// network container
Names: []string{"/k8s--net--foo--"},
Names: []string{"/k8s--net--foo.test--"},
ID: "9876",
},
{
@ -402,7 +264,7 @@ func TestSyncManifestsDeletes(t *testing.T) {
ID: "4567",
},
}
err := kubelet.SyncManifests([]api.ContainerManifest{})
err := kubelet.SyncPods([]Pod{})
expectNoError(t, err)
verifyCalls(t, fakeDocker, []string{"list", "list", "stop", "stop"})
@ -425,29 +287,33 @@ func (f *FalseHealthChecker) HealthCheck(container api.Container) (health.Status
return health.Unhealthy, nil
}
func TestSyncManifestsUnhealthy(t *testing.T) {
func TestSyncPodsUnhealthy(t *testing.T) {
kubelet, _, fakeDocker := makeTestKubelet(t)
kubelet.HealthChecker = &FalseHealthChecker{}
fakeDocker.containerList = []docker.APIContainers{
{
// the k8s prefix is required for the kubelet to manage the container
Names: []string{"/k8s--bar--foo"},
Names: []string{"/k8s--bar--foo.test"},
ID: "1234",
},
{
// network container
Names: []string{"/k8s--net--foo--"},
Names: []string{"/k8s--net--foo.test--"},
ID: "9876",
},
}
err := kubelet.SyncManifests([]api.ContainerManifest{
err := kubelet.SyncPods([]Pod{
{
ID: "foo",
Containers: []api.Container{
{Name: "bar",
LivenessProbe: &api.LivenessProbe{
// Always returns healthy == false
Type: "false",
Name: "foo",
Namespace: "test",
Manifest: api.ContainerManifest{
ID: "foo",
Containers: []api.Container{
{Name: "bar",
LivenessProbe: &api.LivenessProbe{
// Always returns healthy == false
Type: "false",
},
},
},
},
@ -582,14 +448,20 @@ func TestMakeVolumesAndBinds(t *testing.T) {
},
}
pod := Pod{
Name: "pod",
Namespace: "test",
}
podVolumes := make(volumeMap)
podVolumes["disk4"] = &volume.HostDirectory{"/mnt/host"}
volumes, binds := makeVolumesAndBinds("pod", &container, podVolumes)
volumes, binds := makeVolumesAndBinds(&pod, &container, podVolumes)
expectedVolumes := []string{"/mnt/path", "/mnt/path2"}
expectedBinds := []string{"/exports/pod/disk:/mnt/path", "/exports/pod/disk2:/mnt/path2:ro", "/mnt/path3:/mnt/path3",
expectedBinds := []string{"/exports/pod.test/disk:/mnt/path", "/exports/pod.test/disk2:/mnt/path2:ro", "/mnt/path3:/mnt/path3",
"/mnt/host:/mnt/path4"}
if len(volumes) != len(expectedVolumes) {
t.Errorf("Unexpected volumes. Expected %#v got %#v. Container was: %#v", expectedVolumes, volumes, container)
}
@ -669,274 +541,29 @@ func TestMakePortsAndBindings(t *testing.T) {
}
func TestCheckHostPortConflicts(t *testing.T) {
successCaseAll := []api.ContainerManifest{
{Containers: []api.Container{{Ports: []api.Port{{HostPort: 80}}}}},
{Containers: []api.Container{{Ports: []api.Port{{HostPort: 81}}}}},
{Containers: []api.Container{{Ports: []api.Port{{HostPort: 82}}}}},
successCaseAll := []Pod{
{Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 80}}}}}},
{Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 81}}}}}},
{Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 82}}}}}},
}
successCaseNew := api.ContainerManifest{
Containers: []api.Container{{Ports: []api.Port{{HostPort: 83}}}},
successCaseNew := Pod{
Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 83}}}}},
}
if errs := checkHostPortConflicts(successCaseAll, &successCaseNew); len(errs) != 0 {
t.Errorf("Expected success: %v", errs)
expected := append(successCaseAll, successCaseNew)
if actual := filterHostPortConflicts(expected); !reflect.DeepEqual(actual, expected) {
t.Errorf("Expected %#v, Got %#v", expected, actual)
}
failureCaseAll := []api.ContainerManifest{
{Containers: []api.Container{{Ports: []api.Port{{HostPort: 80}}}}},
{Containers: []api.Container{{Ports: []api.Port{{HostPort: 81}}}}},
{Containers: []api.Container{{Ports: []api.Port{{HostPort: 82}}}}},
failureCaseAll := []Pod{
{Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 80}}}}}},
{Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 81}}}}}},
{Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 82}}}}}},
}
failureCaseNew := api.ContainerManifest{
Containers: []api.Container{{Ports: []api.Port{{HostPort: 81}}}},
failureCaseNew := Pod{
Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 81}}}}},
}
if errs := checkHostPortConflicts(failureCaseAll, &failureCaseNew); len(errs) == 0 {
t.Errorf("Expected failure")
}
}
func TestExtractFromNonExistentFile(t *testing.T) {
kubelet := New()
_, err := kubelet.extractFromFile("/some/fake/file")
if err == nil {
t.Error("Unexpected non-error.")
}
}
func TestExtractFromBadDataFile(t *testing.T) {
kubelet := New()
badData := []byte{1, 2, 3}
file, err := ioutil.TempFile("", "foo")
expectNoError(t, err)
name := file.Name()
file.Close()
ioutil.WriteFile(name, badData, 0755)
_, err = kubelet.extractFromFile(name)
if err == nil {
t.Error("Unexpected non-error.")
}
}
func TestExtractFromValidDataFile(t *testing.T) {
kubelet := New()
manifest := api.ContainerManifest{ID: "bar"}
data, err := json.Marshal(manifest)
expectNoError(t, err)
file, err := ioutil.TempFile("", "foo")
expectNoError(t, err)
name := file.Name()
expectNoError(t, file.Close())
ioutil.WriteFile(name, data, 0755)
read, err := kubelet.extractFromFile(name)
expectNoError(t, err)
if !reflect.DeepEqual(read, manifest) {
t.Errorf("Unexpected difference. Expected %#v, got %#v", manifest, read)
}
}
func TestExtractFromEmptyDir(t *testing.T) {
kubelet := New()
dirName, err := ioutil.TempDir("", "foo")
expectNoError(t, err)
_, err = kubelet.extractFromDir(dirName)
expectNoError(t, err)
}
func TestExtractFromDir(t *testing.T) {
kubelet := New()
manifests := []api.ContainerManifest{
{ID: "aaaa"},
{ID: "bbbb"},
}
dirName, err := ioutil.TempDir("", "foo")
expectNoError(t, err)
for _, manifest := range manifests {
data, err := json.Marshal(manifest)
expectNoError(t, err)
file, err := ioutil.TempFile(dirName, manifest.ID)
expectNoError(t, err)
name := file.Name()
expectNoError(t, file.Close())
ioutil.WriteFile(name, data, 0755)
}
read, err := kubelet.extractFromDir(dirName)
expectNoError(t, err)
if !reflect.DeepEqual(read, manifests) {
t.Errorf("Unexpected difference. Expected %#v, got %#v", manifests, read)
}
}
func TestExtractFromHttpBadness(t *testing.T) {
kubelet := New()
updateChannel := make(chan manifestUpdate)
reader := startReading(updateChannel)
err := kubelet.extractFromHTTP("http://localhost:12345", updateChannel)
if err == nil {
t.Error("Unexpected non-error.")
}
close(updateChannel)
list := reader.GetList()
if len(list) != 0 {
t.Errorf("Unexpected list: %#v", list)
}
}
func TestExtractFromHttpSingle(t *testing.T) {
kubelet := New()
updateChannel := make(chan manifestUpdate)
reader := startReading(updateChannel)
manifests := []api.ContainerManifest{
{Version: "v1beta1", ID: "foo"},
}
// Taking a single-manifest from a URL allows kubelet to be used
// in the implementation of google's container VM image.
data, err := json.Marshal(manifests[0])
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(data),
}
testServer := httptest.NewServer(&fakeHandler)
err = kubelet.extractFromHTTP(testServer.URL, updateChannel)
if err != nil {
t.Errorf("Unexpected error: %#v", err)
}
close(updateChannel)
read := reader.GetList()
if len(read) != 1 {
t.Errorf("Unexpected list: %#v", read)
return
}
if !reflect.DeepEqual(manifests, read[0]) {
t.Errorf("Unexpected difference. Expected: %#v, Saw: %#v", manifests, read[0])
}
}
func TestExtractFromHttpMultiple(t *testing.T) {
kubelet := New()
updateChannel := make(chan manifestUpdate)
reader := startReading(updateChannel)
manifests := []api.ContainerManifest{
{Version: "v1beta1", ID: "foo"},
{Version: "v1beta1", ID: "bar"},
}
data, err := json.Marshal(manifests)
if err != nil {
t.Fatalf("Some weird json problem: %v", err)
}
t.Logf("Serving: %v", string(data))
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(data),
}
testServer := httptest.NewServer(&fakeHandler)
err = kubelet.extractFromHTTP(testServer.URL, updateChannel)
if err != nil {
t.Errorf("Unexpected error: %#v", err)
}
close(updateChannel)
read := reader.GetList()
if len(read) != 1 {
t.Errorf("Unexpected list: %#v", read)
return
}
if !reflect.DeepEqual(manifests, read[0]) {
t.Errorf("Unexpected difference. Expected: %#v, Saw: %#v", manifests, read[0])
}
}
func TestExtractFromHttpEmptyArray(t *testing.T) {
kubelet := New()
updateChannel := make(chan manifestUpdate)
reader := startReading(updateChannel)
manifests := []api.ContainerManifest{}
data, err := json.Marshal(manifests)
if err != nil {
t.Fatalf("Some weird json problem: %v", err)
}
t.Logf("Serving: %v", string(data))
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(data),
}
testServer := httptest.NewServer(&fakeHandler)
err = kubelet.extractFromHTTP(testServer.URL, updateChannel)
if err != nil {
t.Errorf("Unexpected error: %#v", err)
}
close(updateChannel)
read := reader.GetList()
if len(read) != 1 {
t.Errorf("Unexpected list: %#v", read)
return
}
if len(read[0]) != 0 {
t.Errorf("Unexpected manifests: %#v", read[0])
}
}
func TestWatchEtcd(t *testing.T) {
watchChannel := make(chan *etcd.Response)
updateChannel := make(chan manifestUpdate)
kubelet := New()
reader := startReading(updateChannel)
manifest := []api.ContainerManifest{
{
ID: "foo",
},
}
data, err := json.Marshal(manifest)
expectNoError(t, err)
var wg sync.WaitGroup
wg.Add(1)
go func() {
kubelet.WatchEtcd(watchChannel, updateChannel)
wg.Done()
}()
watchChannel <- &etcd.Response{
Node: &etcd.Node{
Value: string(data),
},
}
close(watchChannel)
wg.Wait()
close(updateChannel)
read := reader.GetList()
if len(read) != 1 {
t.Errorf("Expected number of results: %v", len(read))
} else if !reflect.DeepEqual(read[0], manifest) {
t.Errorf("Unexpected manifest(s) %#v %#v", read[0], manifest)
if actual := filterHostPortConflicts(append(failureCaseAll, failureCaseNew)); !reflect.DeepEqual(failureCaseAll, actual) {
t.Errorf("Expected %#v, Got %#v", expected, actual)
}
}

View File

@ -22,28 +22,49 @@ import (
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
"path"
"strconv"
"strings"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/httplog"
"github.com/golang/glog"
"github.com/google/cadvisor/info"
"gopkg.in/v1/yaml"
)
// Server is a http.Handler which exposes kubelet functionality over HTTP.
type Server struct {
Kubelet kubeletInterface
UpdateChannel chan<- manifestUpdate
DelegateHandler http.Handler
host HostInterface
updates chan<- interface{}
handler http.Handler
}
// kubeletInterface contains all the kubelet methods required by the server.
func ListenAndServeKubeletServer(host HostInterface, updates chan<- interface{}, delegate http.Handler, address string, port uint) {
glog.Infof("Starting to listen on %s:%d", address, port)
handler := Server{
host: host,
updates: updates,
handler: delegate,
}
s := &http.Server{
Addr: net.JoinHostPort(address, strconv.FormatUint(uint64(port), 10)),
Handler: &handler,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
MaxHeaderBytes: 1 << 20,
}
s.ListenAndServe()
}
// HostInterface contains all the kubelet methods required by the server.
// For testablitiy.
type kubeletInterface interface {
GetContainerInfo(podID, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error)
type HostInterface interface {
GetContainerInfo(podFullName, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error)
GetRootInfo(req *info.ContainerInfoRequest) (*info.ContainerInfo, error)
GetMachineInfo() (*info.MachineInfo, error)
GetPodInfo(name string) (api.PodInfo, error)
@ -78,13 +99,15 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
if u.Path == "/container" {
// This is to provide backward compatibility. It only supports a single manifest
var manifest api.ContainerManifest
err = yaml.Unmarshal(data, &manifest)
var pod Pod
err = yaml.Unmarshal(data, &pod.Manifest)
if err != nil {
s.error(w, err)
return
}
s.UpdateChannel <- manifestUpdate{httpServerSource, []api.ContainerManifest{manifest}}
//TODO: sha1 of manifest?
pod.Name = "1"
s.updates <- PodUpdate{[]Pod{pod}, SET}
} else if u.Path == "/containers" {
var manifests []api.ContainerManifest
err = yaml.Unmarshal(data, &manifests)
@ -92,15 +115,23 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
s.error(w, err)
return
}
s.UpdateChannel <- manifestUpdate{httpServerSource, manifests}
pods := make([]Pod, len(manifests))
for i := range manifests {
pods[i].Name = fmt.Sprintf("%d", i+1)
pods[i].Manifest = manifests[i]
}
s.updates <- PodUpdate{pods, SET}
}
case u.Path == "/podInfo":
podID := u.Query().Get("podID")
if len(podID) == 0 {
w.WriteHeader(http.StatusBadRequest)
http.Error(w, "Missing 'podID=' query entry.", http.StatusBadRequest)
return
}
info, err := s.Kubelet.GetPodInfo(podID)
// TODO: backwards compatibility with existing API, needs API change
podFullName := GetPodFullName(&Pod{Name: podID, Namespace: "etcd"})
info, err := s.host.GetPodInfo(podFullName)
if err == ErrNoContainersInPod {
http.Error(w, "Pod does not exist", http.StatusNotFound)
return
@ -120,7 +151,7 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
case strings.HasPrefix(u.Path, "/stats"):
s.serveStats(w, req)
case strings.HasPrefix(u.Path, "/spec"):
info, err := s.Kubelet.GetMachineInfo()
info, err := s.host.GetMachineInfo()
if err != nil {
s.error(w, err)
return
@ -133,14 +164,16 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
w.Header().Add("Content-type", "application/json")
w.Write(data)
case strings.HasPrefix(u.Path, "/logs/"):
s.Kubelet.ServeLogs(w, req)
s.host.ServeLogs(w, req)
default:
s.DelegateHandler.ServeHTTP(w, req)
if s.handler != nil {
s.handler.ServeHTTP(w, req)
}
}
}
func (s *Server) serveStats(w http.ResponseWriter, req *http.Request) {
// /stats/<podid>/<containerName>
// /stats/<podfullname>/<containerName>
components := strings.Split(strings.TrimPrefix(path.Clean(req.URL.Path), "/"), "/")
var stats *info.ContainerInfo
var err error
@ -153,13 +186,13 @@ func (s *Server) serveStats(w http.ResponseWriter, req *http.Request) {
switch len(components) {
case 1:
// Machine stats
stats, err = s.Kubelet.GetRootInfo(&query)
stats, err = s.host.GetRootInfo(&query)
case 2:
// pod stats
// TODO(monnand) Implement this
errors.New("pod level status currently unimplemented")
case 3:
stats, err = s.Kubelet.GetContainerInfo(components[1], components[2], &query)
stats, err = s.host.GetContainerInfo(components[1], components[2], &query)
default:
http.Error(w, "unknown resource.", http.StatusNotFound)
return

View File

@ -36,7 +36,7 @@ import (
type fakeKubelet struct {
infoFunc func(name string) (api.PodInfo, error)
containerInfoFunc func(podID, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error)
containerInfoFunc func(podFullName, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error)
rootInfoFunc func(query *info.ContainerInfoRequest) (*info.ContainerInfo, error)
machineInfoFunc func() (*info.MachineInfo, error)
logFunc func(w http.ResponseWriter, req *http.Request)
@ -46,8 +46,8 @@ func (fk *fakeKubelet) GetPodInfo(name string) (api.PodInfo, error) {
return fk.infoFunc(name)
}
func (fk *fakeKubelet) GetContainerInfo(podID, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error) {
return fk.containerInfoFunc(podID, containerName, req)
func (fk *fakeKubelet) GetContainerInfo(podFullName, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error) {
return fk.containerInfoFunc(podFullName, containerName, req)
}
func (fk *fakeKubelet) GetRootInfo(req *info.ContainerInfoRequest) (*info.ContainerInfo, error) {
@ -63,7 +63,7 @@ func (fk *fakeKubelet) ServeLogs(w http.ResponseWriter, req *http.Request) {
}
type serverTestFramework struct {
updateChan chan manifestUpdate
updateChan chan interface{}
updateReader *channelReader
serverUnderTest *Server
fakeKubelet *fakeKubelet
@ -72,13 +72,13 @@ type serverTestFramework struct {
func makeServerTest() *serverTestFramework {
fw := &serverTestFramework{
updateChan: make(chan manifestUpdate),
updateChan: make(chan interface{}),
}
fw.updateReader = startReading(fw.updateChan)
fw.fakeKubelet = &fakeKubelet{}
fw.serverUnderTest = &Server{
Kubelet: fw.fakeKubelet,
UpdateChannel: fw.updateChan,
host: fw.fakeKubelet,
updates: fw.updateChan,
}
fw.testHTTPServer = httptest.NewServer(fw.serverUnderTest)
return fw
@ -106,8 +106,9 @@ func TestContainer(t *testing.T) {
if len(received) != 1 {
t.Errorf("Expected 1 manifest, but got %v", len(received))
}
if !reflect.DeepEqual(expected, received[0]) {
t.Errorf("Expected %#v, but got %#v", expected, received[0])
expectedPods := []Pod{Pod{Name: "1", Manifest: expected[0]}}
if !reflect.DeepEqual(expectedPods, received[0]) {
t.Errorf("Expected %#v, but got %#v", expectedPods, received[0])
}
}
@ -128,8 +129,9 @@ func TestContainers(t *testing.T) {
if len(received) != 1 {
t.Errorf("Expected 1 update, but got %v", len(received))
}
if !reflect.DeepEqual(expected, received[0]) {
t.Errorf("Expected %#v, but got %#v", expected, received[0])
expectedPods := []Pod{Pod{Name: "1", Manifest: expected[0]}, Pod{Name: "2", Manifest: expected[1]}}
if !reflect.DeepEqual(expectedPods, received[0]) {
t.Errorf("Expected %#v, but got %#v", expectedPods, received[0])
}
}
@ -137,10 +139,10 @@ func TestPodInfo(t *testing.T) {
fw := makeServerTest()
expected := api.PodInfo{"goodpod": docker.Container{ID: "myContainerID"}}
fw.fakeKubelet.infoFunc = func(name string) (api.PodInfo, error) {
if name == "goodpod" {
if name == "goodpod.etcd" {
return expected, nil
}
return nil, fmt.Errorf("bad pod")
return nil, fmt.Errorf("bad pod %s", name)
}
resp, err := http.Get(fw.testHTTPServer.URL + "/podInfo?podID=goodpod")
if err != nil {