Merge pull request #1546 from smarterclayton/allow_configurable_net_image

Allow configurable Kubelet net image for isolated networks
pull/6/head
Tim Hockin 2014-10-02 17:11:32 -07:00
commit f7db0bc674
4 changed files with 118 additions and 41 deletions

View File

@ -48,21 +48,22 @@ import (
const defaultRootDir = "/var/lib/kubelet" const defaultRootDir = "/var/lib/kubelet"
var ( var (
config = flag.String("config", "", "Path to the config file or directory of files") config = flag.String("config", "", "Path to the config file or directory of files")
syncFrequency = flag.Duration("sync_frequency", 10*time.Second, "Max period between synchronizing running containers and config") syncFrequency = flag.Duration("sync_frequency", 10*time.Second, "Max period between synchronizing running containers and config")
fileCheckFrequency = flag.Duration("file_check_frequency", 20*time.Second, "Duration between checking config files for new data") fileCheckFrequency = flag.Duration("file_check_frequency", 20*time.Second, "Duration between checking config files for new data")
httpCheckFrequency = flag.Duration("http_check_frequency", 20*time.Second, "Duration between checking http for new data") httpCheckFrequency = flag.Duration("http_check_frequency", 20*time.Second, "Duration between checking http for new data")
manifestURL = flag.String("manifest_url", "", "URL for accessing the container manifest") manifestURL = flag.String("manifest_url", "", "URL for accessing the container manifest")
enableServer = flag.Bool("enable_server", true, "Enable the info server") enableServer = flag.Bool("enable_server", true, "Enable the info server")
address = flag.String("address", "127.0.0.1", "The address for the info server to serve on (set to 0.0.0.0 or \"\" for all interfaces)") address = flag.String("address", "127.0.0.1", "The address for the info server to serve on (set to 0.0.0.0 or \"\" for all interfaces)")
port = flag.Uint("port", master.KubeletPort, "The port for the info server to serve on") port = flag.Uint("port", master.KubeletPort, "The port for the info server to serve on")
hostnameOverride = flag.String("hostname_override", "", "If non-empty, will use this string as identification instead of the actual hostname.") hostnameOverride = flag.String("hostname_override", "", "If non-empty, will use this string as identification instead of the actual hostname.")
dockerEndpoint = flag.String("docker_endpoint", "", "If non-empty, use this for the docker endpoint to communicate with") networkContainerImage = flag.String("network_container_image", kubelet.NetworkContainerImage, "The image that network containers in each pod will use.")
etcdServerList util.StringList dockerEndpoint = flag.String("docker_endpoint", "", "If non-empty, use this for the docker endpoint to communicate with")
rootDirectory = flag.String("root_dir", defaultRootDir, "Directory path for managing kubelet files (volume mounts,etc).") etcdServerList util.StringList
allowPrivileged = flag.Bool("allow_privileged", false, "If true, allow containers to request privileged mode. [default=false]") rootDirectory = flag.String("root_dir", defaultRootDir, "Directory path for managing kubelet files (volume mounts,etc).")
registryPullQPS = flag.Float64("registry_qps", 0.0, "If > 0, limit registry pull QPS to this value. If 0, unlimited. [default=0.0]") allowPrivileged = flag.Bool("allow_privileged", false, "If true, allow containers to request privileged mode. [default=false]")
registryBurst = flag.Int("registry_burst", 10, "Maximum size of a bursty pulls, temporarily allows pulls to burst to this number, while still not exceeding registry_qps. Only used if --registry_qps > 0") registryPullQPS = flag.Float64("registry_qps", 0.0, "If > 0, limit registry pull QPS to this value. If 0, unlimited. [default=0.0]")
registryBurst = flag.Int("registry_burst", 10, "Maximum size of a bursty pulls, temporarily allows pulls to burst to this number, while still not exceeding registry_qps. Only used if --registry_qps > 0")
) )
func init() { func init() {
@ -159,6 +160,7 @@ func main() {
cadvisorClient, cadvisorClient,
etcdClient, etcdClient,
*rootDirectory, *rootDirectory,
*networkContainerImage,
*syncFrequency, *syncFrequency,
float32(*registryPullQPS), float32(*registryPullQPS),
*registryBurst) *registryBurst)

View File

@ -81,7 +81,7 @@ func (f *FakeDockerClient) CreateContainer(c docker.CreateContainerOptions) (*do
// This is not a very good fake. We'll just add this container's name to the list. // This is not a very good fake. We'll just add this container's name to the list.
// Docker likes to add a '/', so copy that behavior. // Docker likes to add a '/', so copy that behavior.
name := "/" + c.Name name := "/" + c.Name
f.ContainerList = append(f.ContainerList, docker.APIContainers{ID: name, Names: []string{name}}) f.ContainerList = append(f.ContainerList, docker.APIContainers{ID: name, Names: []string{name}, Image: c.Config.Image})
return &docker.Container{ID: name}, nil return &docker.Container{ID: name}, nil
} }
@ -138,6 +138,7 @@ func (f *FakeDockerClient) InspectImage(name string) (*docker.Image, error) {
type FakeDockerPuller struct { type FakeDockerPuller struct {
sync.Mutex sync.Mutex
HasImages []string
ImagesPulled []string ImagesPulled []string
// Every pull will return the first error here, and then reslice // Every pull will return the first error here, and then reslice
@ -159,5 +160,15 @@ func (f *FakeDockerPuller) Pull(image string) (err error) {
} }
func (f *FakeDockerPuller) IsImagePresent(name string) (bool, error) { func (f *FakeDockerPuller) IsImagePresent(name string) (bool, error) {
return true, nil f.Lock()
defer f.Unlock()
if f.HasImages == nil {
return true, nil
}
for _, s := range f.HasImages {
if s == name {
return true, nil
}
}
return false, nil
} }

View File

@ -67,21 +67,23 @@ func NewMainKubelet(
cc CadvisorInterface, cc CadvisorInterface,
ec tools.EtcdClient, ec tools.EtcdClient,
rd string, rd string,
ni string,
ri time.Duration, ri time.Duration,
pullQPS float32, pullQPS float32,
pullBurst int) *Kubelet { pullBurst int) *Kubelet {
return &Kubelet{ return &Kubelet{
hostname: hn, hostname: hn,
dockerClient: dc, dockerClient: dc,
cadvisorClient: cc, cadvisorClient: cc,
etcdClient: ec, etcdClient: ec,
rootDirectory: rd, rootDirectory: rd,
resyncInterval: ri, resyncInterval: ri,
podWorkers: newPodWorkers(), networkContainerImage: ni,
runner: dockertools.NewDockerContainerCommandRunner(), podWorkers: newPodWorkers(),
httpClient: &http.Client{}, runner: dockertools.NewDockerContainerCommandRunner(),
pullQPS: pullQPS, httpClient: &http.Client{},
pullBurst: pullBurst, pullQPS: pullQPS,
pullBurst: pullBurst,
} }
} }
@ -89,11 +91,12 @@ func NewMainKubelet(
// TODO: add more integration tests, and expand parameter list as needed. // TODO: add more integration tests, and expand parameter list as needed.
func NewIntegrationTestKubelet(hn string, dc dockertools.DockerInterface) *Kubelet { func NewIntegrationTestKubelet(hn string, dc dockertools.DockerInterface) *Kubelet {
return &Kubelet{ return &Kubelet{
hostname: hn, hostname: hn,
dockerClient: dc, dockerClient: dc,
dockerPuller: &dockertools.FakeDockerPuller{}, dockerPuller: &dockertools.FakeDockerPuller{},
resyncInterval: 3 * time.Second, networkContainerImage: NetworkContainerImage,
podWorkers: newPodWorkers(), resyncInterval: 3 * time.Second,
podWorkers: newPodWorkers(),
} }
} }
@ -103,11 +106,12 @@ type httpGetInterface interface {
// Kubelet is the main kubelet implementation. // Kubelet is the main kubelet implementation.
type Kubelet struct { type Kubelet struct {
hostname string hostname string
dockerClient dockertools.DockerInterface dockerClient dockertools.DockerInterface
rootDirectory string rootDirectory string
podWorkers podWorkers networkContainerImage string
resyncInterval time.Duration podWorkers podWorkers
resyncInterval time.Duration
// Optional, no events will be sent without it // Optional, no events will be sent without it
etcdClient tools.EtcdClient etcdClient tools.EtcdClient
@ -368,7 +372,7 @@ func (kl *Kubelet) killContainerByID(ID, name string) error {
const ( const (
networkContainerName = "net" networkContainerName = "net"
networkContainerImage = "kubernetes/pause:latest" NetworkContainerImage = "kubernetes/pause:latest"
) )
// createNetworkContainer starts the network container for a pod. Returns the docker container ID of the newly created container. // createNetworkContainer starts the network container for a pod. Returns the docker container ID of the newly created container.
@ -381,12 +385,19 @@ func (kl *Kubelet) createNetworkContainer(pod *Pod) (dockertools.DockerID, error
} }
container := &api.Container{ container := &api.Container{
Name: networkContainerName, Name: networkContainerName,
Image: networkContainerImage, Image: kl.networkContainerImage,
Ports: ports, Ports: ports,
} }
if err := kl.dockerPuller.Pull(networkContainerImage); err != nil { // TODO: make this a TTL based pull (if image older than X policy, pull)
ok, err := kl.dockerPuller.IsImagePresent(container.Image)
if err != nil {
return "", err return "", err
} }
if !ok {
if err := kl.dockerPuller.Pull(container.Image); err != nil {
return "", err
}
}
return kl.runContainer(pod, container, nil, "") return kl.runContainer(pod, container, nil, "")
} }

View File

@ -22,6 +22,7 @@ import (
"reflect" "reflect"
"regexp" "regexp"
"strconv" "strconv"
"strings"
"sync" "sync"
"testing" "testing"
"time" "time"
@ -206,6 +207,7 @@ func matchString(t *testing.T, pattern, str string) bool {
func TestSyncPodsCreatesNetAndContainer(t *testing.T) { func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
kubelet, _, fakeDocker := newTestKubelet(t) kubelet, _, fakeDocker := newTestKubelet(t)
kubelet.networkContainerImage = "custom_image_name"
fakeDocker.ContainerList = []docker.APIContainers{} fakeDocker.ContainerList = []docker.APIContainers{}
err := kubelet.SyncPods([]Pod{ err := kubelet.SyncPods([]Pod{
{ {
@ -228,6 +230,57 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
"list", "list", "create", "start", "list", "inspect", "list", "create", "start"}) "list", "list", "create", "start", "list", "inspect", "list", "create", "start"})
fakeDocker.Lock() fakeDocker.Lock()
found := false
for _, c := range fakeDocker.ContainerList {
if c.Image == "custom_image_name" && strings.HasPrefix(c.Names[0], "/k8s_net") {
found = true
}
}
if !found {
t.Errorf("Custom net container not found: %v", fakeDocker.ContainerList)
}
if len(fakeDocker.Created) != 2 ||
!matchString(t, "k8s_net\\.[a-f0-9]+_foo.test_", fakeDocker.Created[0]) ||
!matchString(t, "k8s_bar\\.[a-f0-9]+_foo.test_", fakeDocker.Created[1]) {
t.Errorf("Unexpected containers created %v", fakeDocker.Created)
}
fakeDocker.Unlock()
}
func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
kubelet, _, fakeDocker := newTestKubelet(t)
puller := kubelet.dockerPuller.(*dockertools.FakeDockerPuller)
puller.HasImages = []string{}
kubelet.networkContainerImage = "custom_image_name"
fakeDocker.ContainerList = []docker.APIContainers{}
err := kubelet.SyncPods([]Pod{
{
Name: "foo",
Namespace: "test",
Manifest: api.ContainerManifest{
ID: "foo",
Containers: []api.Container{
{Name: "bar"},
},
},
},
})
if err != nil {
t.Errorf("unexpected error: %v", err)
}
kubelet.drainWorkers()
verifyCalls(t, fakeDocker, []string{
"list", "list", "create", "start", "list", "inspect", "list", "create", "start"})
fakeDocker.Lock()
if !reflect.DeepEqual(puller.ImagesPulled, []string{"custom_image_name", ""}) {
t.Errorf("Unexpected pulled containers: %v", puller.ImagesPulled)
}
if len(fakeDocker.Created) != 2 || if len(fakeDocker.Created) != 2 ||
!matchString(t, "k8s_net\\.[a-f0-9]+_foo.test_", fakeDocker.Created[0]) || !matchString(t, "k8s_net\\.[a-f0-9]+_foo.test_", fakeDocker.Created[0]) ||
!matchString(t, "k8s_bar\\.[a-f0-9]+_foo.test_", fakeDocker.Created[1]) { !matchString(t, "k8s_bar\\.[a-f0-9]+_foo.test_", fakeDocker.Created[1]) {