From 4ff66bd70ad128d05f26ad9615e48ed5d167ee23 Mon Sep 17 00:00:00 2001 From: Paul Morie Date: Fri, 21 Aug 2015 18:10:22 -0400 Subject: [PATCH] Fix race exposing pod IP via downward API --- pkg/kubelet/dockertools/manager.go | 98 +++++++++++++++------- pkg/kubelet/dockertools/manager_test.go | 59 +++++++++++++ test/e2e/docker_containers.go | 8 +- test/e2e/downward_api.go | 107 ++++++++++++++++-------- test/e2e/framework.go | 7 +- test/e2e/host_path.go | 4 +- test/e2e/secrets.go | 2 +- test/e2e/util.go | 30 ++++++- 8 files changed, 238 insertions(+), 77 deletions(-) diff --git a/pkg/kubelet/dockertools/manager.go b/pkg/kubelet/dockertools/manager.go index 4f51b492f2..b4243ced77 100644 --- a/pkg/kubelet/dockertools/manager.go +++ b/pkg/kubelet/dockertools/manager.go @@ -285,16 +285,58 @@ type containerStatusResult struct { err error } +const podIPDownwardAPISelector = "status.podIP" + +// podDependsOnIP returns whether any containers in a pod depend on using the pod IP via +// the downward API. +func podDependsOnPodIP(pod *api.Pod) bool { + for _, container := range pod.Spec.Containers { + for _, env := range container.Env { + if env.ValueFrom != nil && + env.ValueFrom.FieldRef != nil && + env.ValueFrom.FieldRef.FieldPath == podIPDownwardAPISelector { + return true + } + } + } + + return false +} + +// determineContainerIP determines the IP address of the given container. It is expected +// that the container passed is the infrastructure container of a pod and the responsibility +// of the caller to ensure that the correct container is passed. +func (dm *DockerManager) determineContainerIP(podNamespace, podName string, container *docker.Container) string { + result := "" + + if container.NetworkSettings != nil { + result = container.NetworkSettings.IPAddress + } + + if dm.networkPlugin.Name() != network.DefaultPluginName { + netStatus, err := dm.networkPlugin.Status(podNamespace, podName, kubeletTypes.DockerID(container.ID)) + if err != nil { + glog.Errorf("NetworkPlugin %s failed on the status hook for pod '%s' - %v", dm.networkPlugin.Name(), podName, err) + } else if netStatus != nil { + result = netStatus.IP.String() + } + } + + return result +} + func (dm *DockerManager) inspectContainer(dockerID, containerName, tPath string, pod *api.Pod) *containerStatusResult { result := containerStatusResult{api.ContainerStatus{}, "", nil} inspectResult, err := dm.client.InspectContainer(dockerID) - if err != nil { result.err = err return &result } + // NOTE (pmorie): this is a seriously fishy if statement. A nil result from InspectContainer seems like it should + // always be paired with a non-nil error in the result of InspectContainer. if inspectResult == nil { + glog.Error("Received a nil result from InspectContainer without receiving an error") // Why did we not get an error? return &result } @@ -312,18 +354,7 @@ func (dm *DockerManager) inspectContainer(dockerID, containerName, tPath string, StartedAt: util.NewTime(inspectResult.State.StartedAt), } if containerName == PodInfraContainerName { - if inspectResult.NetworkSettings != nil { - result.ip = inspectResult.NetworkSettings.IPAddress - } - // override the above if a network plugin exists - if dm.networkPlugin.Name() != network.DefaultPluginName { - netStatus, err := dm.networkPlugin.Status(pod.Namespace, pod.Name, kubeletTypes.DockerID(dockerID)) - if err != nil { - glog.Errorf("NetworkPlugin %s failed on the status hook for pod '%s' - %v", dm.networkPlugin.Name(), pod.Name, err) - } else if netStatus != nil { - result.ip = netStatus.IP.String() - } - } + result.ip = dm.determineContainerIP(pod.Namespace, pod.Name, inspectResult) } } else if !inspectResult.State.FinishedAt.IsZero() { reason := "" @@ -1344,7 +1375,7 @@ func containerAndPodFromLabels(inspect *docker.Container) (pod *api.Pod, contain } // Run a single container from a pod. Returns the docker container ID -func (dm *DockerManager) runContainerInPod(pod *api.Pod, container *api.Container, netMode, ipcMode string) (kubeletTypes.DockerID, error) { +func (dm *DockerManager) runContainerInPod(pod *api.Pod, container *api.Container, netMode, ipcMode string) (kubeletTypes.DockerID, *docker.Container, error) { start := time.Now() defer func() { metrics.ContainerManagerLatency.WithLabelValues("runContainerInPod").Observe(metrics.SinceInMicroseconds(start)) @@ -1357,7 +1388,7 @@ func (dm *DockerManager) runContainerInPod(pod *api.Pod, container *api.Containe opts, err := dm.generator.GenerateRunContainerOptions(pod, container) if err != nil { - return "", err + return "", nil, err } utsMode := "" @@ -1366,7 +1397,7 @@ func (dm *DockerManager) runContainerInPod(pod *api.Pod, container *api.Containe } id, err := dm.runContainer(pod, container, opts, ref, netMode, ipcMode, utsMode) if err != nil { - return "", err + return "", nil, err } // Remember this reference so we can report events about this container @@ -1378,7 +1409,7 @@ func (dm *DockerManager) runContainerInPod(pod *api.Pod, container *api.Containe handlerErr := dm.runner.Run(id, pod, container, container.Lifecycle.PostStart) if handlerErr != nil { dm.KillContainerInPod(types.UID(id), container, pod) - return kubeletTypes.DockerID(""), fmt.Errorf("failed to call event handler: %v", handlerErr) + return kubeletTypes.DockerID(""), nil, fmt.Errorf("failed to call event handler: %v", handlerErr) } } @@ -1396,11 +1427,11 @@ func (dm *DockerManager) runContainerInPod(pod *api.Pod, container *api.Containe // Container information is used in adjusting OOM scores and adding ndots. containerInfo, err := dm.client.InspectContainer(string(id)) if err != nil { - return "", err + return "", nil, err } // Ensure the PID actually exists, else we'll move ourselves. if containerInfo.State.Pid == 0 { - return "", fmt.Errorf("failed to get init PID for Docker container %q", string(id)) + return "", nil, fmt.Errorf("failed to get init PID for Docker container %q", string(id)) } // Set OOM score of the container based on the priority of the container. @@ -1415,10 +1446,10 @@ func (dm *DockerManager) runContainerInPod(pod *api.Pod, container *api.Containe } cgroupName, err := dm.procFs.GetFullContainerName(containerInfo.State.Pid) if err != nil { - return "", err + return "", nil, err } if err = dm.oomAdjuster.ApplyOomScoreAdjContainer(cgroupName, oomScoreAdj, 5); err != nil { - return "", err + return "", nil, err } // currently, Docker does not have a flag by which the ndots option can be passed. @@ -1431,7 +1462,7 @@ func (dm *DockerManager) runContainerInPod(pod *api.Pod, container *api.Containe err = addNDotsOption(containerInfo.ResolvConfPath) } - return kubeletTypes.DockerID(id), err + return kubeletTypes.DockerID(id), containerInfo, err } func addNDotsOption(resolvFilePath string) error { @@ -1465,7 +1496,7 @@ func appendToFile(filePath, stringToAppend string) error { } // createPodInfraContainer starts the pod infra container for a pod. Returns the docker container ID of the newly created container. -func (dm *DockerManager) createPodInfraContainer(pod *api.Pod) (kubeletTypes.DockerID, error) { +func (dm *DockerManager) createPodInfraContainer(pod *api.Pod) (kubeletTypes.DockerID, *docker.Container, error) { start := time.Now() defer func() { metrics.ContainerManagerLatency.WithLabelValues("createPodInfraContainer").Observe(metrics.SinceInMicroseconds(start)) @@ -1493,15 +1524,15 @@ func (dm *DockerManager) createPodInfraContainer(pod *api.Pod) (kubeletTypes.Doc // No pod secrets for the infra container. if err := dm.imagePuller.PullImage(pod, container, nil); err != nil { - return "", err + return "", nil, err } - id, err := dm.runContainerInPod(pod, container, netNamespace, "") + id, dockerContainer, err := dm.runContainerInPod(pod, container, netNamespace, "") if err != nil { - return "", err + return "", nil, err } - return id, nil + return id, dockerContainer, nil } // TODO(vmarmol): This will soon be made non-public when its only use is internal. @@ -1701,16 +1732,21 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, pod podInfraContainerID := containerChanges.InfraContainerId if containerChanges.StartInfraContainer && (len(containerChanges.ContainersToStart) > 0) { glog.V(4).Infof("Creating pod infra container for %q", podFullName) - podInfraContainerID, err = dm.createPodInfraContainer(pod) - - // Call the networking plugin + podInfraContainerID, podInfraContainer, err := dm.createPodInfraContainer(pod) if err == nil { + // Call the networking plugin err = dm.networkPlugin.SetUpPod(pod.Namespace, pod.Name, podInfraContainerID) } if err != nil { glog.Errorf("Failed to create pod infra container: %v; Skipping pod %q", err, podFullName) return err } + + if podDependsOnPodIP(pod) { + // Find the pod IP after starting the infra container in order to expose + // it safely via the downward API without a race. + pod.Status.PodIP = dm.determineContainerIP(pod.Name, pod.Namespace, podInfraContainer) + } } // Start everything @@ -1742,7 +1778,7 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, pod // TODO(dawnchen): Check RestartPolicy.DelaySeconds before restart a container namespaceMode := fmt.Sprintf("container:%v", podInfraContainerID) - _, err = dm.runContainerInPod(pod, container, namespaceMode, namespaceMode) + _, _, err = dm.runContainerInPod(pod, container, namespaceMode, namespaceMode) dm.updateReasonCache(pod, container, err) if err != nil { // TODO(bburns) : Perhaps blacklist a container after N failures? diff --git a/pkg/kubelet/dockertools/manager_test.go b/pkg/kubelet/dockertools/manager_test.go index 2951682c4f..a3d65bf15b 100644 --- a/pkg/kubelet/dockertools/manager_test.go +++ b/pkg/kubelet/dockertools/manager_test.go @@ -2334,3 +2334,62 @@ func TestGetUidFromUser(t *testing.T) { } } } + +func TestPodDependsOnPodIP(t *testing.T) { + tests := []struct { + name string + expected bool + env api.EnvVar + }{ + { + name: "depends on pod IP", + expected: true, + env: api.EnvVar{ + Name: "POD_IP", + ValueFrom: &api.EnvVarSource{ + FieldRef: &api.ObjectFieldSelector{ + APIVersion: testapi.Version(), + FieldPath: "status.podIP", + }, + }, + }, + }, + { + name: "literal value", + expected: false, + env: api.EnvVar{ + Name: "SOME_VAR", + Value: "foo", + }, + }, + { + name: "other downward api field", + expected: false, + env: api.EnvVar{ + Name: "POD_NAME", + ValueFrom: &api.EnvVarSource{ + FieldRef: &api.ObjectFieldSelector{ + APIVersion: testapi.Version(), + FieldPath: "metadata.name", + }, + }, + }, + }, + } + + for _, tc := range tests { + pod := &api.Pod{ + Spec: api.PodSpec{ + Containers: []api.Container{ + {Env: []api.EnvVar{tc.env}}, + }, + }, + } + + result := podDependsOnPodIP(pod) + if e, a := tc.expected, result; e != a { + t.Errorf("%v: Unexpected result; expected %v, got %v", tc.name, e, a) + } + } + +} diff --git a/test/e2e/docker_containers.go b/test/e2e/docker_containers.go index cae777a984..dfb585a70f 100644 --- a/test/e2e/docker_containers.go +++ b/test/e2e/docker_containers.go @@ -45,7 +45,7 @@ var _ = Describe("Docker Containers", func() { }) It("should use the image defaults if command and args are blank", func() { - testContainerOutputInNamespace("use defaults", c, entrypointTestPod(), 0, []string{ + testContainerOutput("use defaults", c, entrypointTestPod(), 0, []string{ "[/ep default arguments]", }, ns) }) @@ -54,7 +54,7 @@ var _ = Describe("Docker Containers", func() { pod := entrypointTestPod() pod.Spec.Containers[0].Args = []string{"override", "arguments"} - testContainerOutputInNamespace("override arguments", c, pod, 0, []string{ + testContainerOutput("override arguments", c, pod, 0, []string{ "[/ep override arguments]", }, ns) }) @@ -65,7 +65,7 @@ var _ = Describe("Docker Containers", func() { pod := entrypointTestPod() pod.Spec.Containers[0].Command = []string{"/ep-2"} - testContainerOutputInNamespace("override command", c, pod, 0, []string{ + testContainerOutput("override command", c, pod, 0, []string{ "[/ep-2]", }, ns) }) @@ -75,7 +75,7 @@ var _ = Describe("Docker Containers", func() { pod.Spec.Containers[0].Command = []string{"/ep-2"} pod.Spec.Containers[0].Args = []string{"override", "arguments"} - testContainerOutputInNamespace("override all", c, pod, 0, []string{ + testContainerOutput("override all", c, pod, 0, []string{ "[/ep-2 override arguments]", }, ns) }) diff --git a/test/e2e/downward_api.go b/test/e2e/downward_api.go index e5ade3f0fa..4936409631 100644 --- a/test/e2e/downward_api.go +++ b/test/e2e/downward_api.go @@ -30,46 +30,85 @@ var _ = Describe("Downward API", func() { It("should provide pod name and namespace as env vars", func() { podName := "downward-api-" + string(util.NewUUID()) - pod := &api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: podName, - Labels: map[string]string{"name": podName}, - }, - Spec: api.PodSpec{ - Containers: []api.Container{ - { - Name: "dapi-container", - Image: "gcr.io/google_containers/busybox", - Command: []string{"sh", "-c", "env"}, - Env: []api.EnvVar{ - { - Name: "POD_NAME", - ValueFrom: &api.EnvVarSource{ - FieldRef: &api.ObjectFieldSelector{ - APIVersion: "v1", - FieldPath: "metadata.name", - }, - }, - }, - { - Name: "POD_NAMESPACE", - ValueFrom: &api.EnvVarSource{ - FieldRef: &api.ObjectFieldSelector{ - APIVersion: "v1", - FieldPath: "metadata.namespace", - }, - }, - }, - }, + env := []api.EnvVar{ + { + Name: "POD_NAME", + ValueFrom: &api.EnvVarSource{ + FieldRef: &api.ObjectFieldSelector{ + APIVersion: "v1", + FieldPath: "metadata.name", + }, + }, + }, + { + Name: "POD_NAMESPACE", + ValueFrom: &api.EnvVarSource{ + FieldRef: &api.ObjectFieldSelector{ + APIVersion: "v1", + FieldPath: "metadata.namespace", + }, + }, + }, + { + Name: "POD_IP", + ValueFrom: &api.EnvVarSource{ + FieldRef: &api.ObjectFieldSelector{ + APIVersion: "v1", + FieldPath: "status.podIP", }, }, - RestartPolicy: api.RestartPolicyNever, }, } - framework.TestContainerOutput("downward api env vars", pod, 0, []string{ + expectations := []string{ fmt.Sprintf("POD_NAME=%v", podName), fmt.Sprintf("POD_NAMESPACE=%v", framework.Namespace.Name), - }) + } + + testDownwardAPI(framework, podName, env, expectations) }) + + It("should provide pod IP as an env var", func() { + podName := "downward-api-" + string(util.NewUUID()) + env := []api.EnvVar{ + { + Name: "POD_IP", + ValueFrom: &api.EnvVarSource{ + FieldRef: &api.ObjectFieldSelector{ + APIVersion: "v1", + FieldPath: "status.podIP", + }, + }, + }, + } + + expectations := []string{ + "POD_IP=(?:\\d+)\\.(?:\\d+)\\.(?:\\d+)\\.(?:\\d+)", + } + + testDownwardAPI(framework, podName, env, expectations) + }) + }) + +func testDownwardAPI(framework *Framework, podName string, env []api.EnvVar, expectations []string) { + pod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: podName, + Labels: map[string]string{"name": podName}, + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: "dapi-container", + Image: "gcr.io/google_containers/busybox", + Command: []string{"sh", "-c", "env"}, + Env: env, + }, + }, + RestartPolicy: api.RestartPolicyNever, + }, + } + + framework.TestContainerOutputRegexp("downward api env vars", pod, 0, expectations) +} diff --git a/test/e2e/framework.go b/test/e2e/framework.go index 136b42ab7a..bdc6383db6 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -112,7 +112,12 @@ func (f *Framework) WaitForPodRunning(podName string) error { // Runs the given pod and verifies that the output of exact container matches the desired output. func (f *Framework) TestContainerOutput(scenarioName string, pod *api.Pod, containerIndex int, expectedOutput []string) { - testContainerOutputInNamespace(scenarioName, f.Client, pod, containerIndex, expectedOutput, f.Namespace.Name) + testContainerOutput(scenarioName, f.Client, pod, containerIndex, expectedOutput, f.Namespace.Name) +} + +// Runs the given pod and verifies that the output of exact container matches the desired regexps. +func (f *Framework) TestContainerOutputRegexp(scenarioName string, pod *api.Pod, containerIndex int, expectedOutput []string) { + testContainerOutputRegexp(scenarioName, f.Client, pod, containerIndex, expectedOutput, f.Namespace.Name) } // WaitForAnEndpoint waits for at least one endpoint to become available in the diff --git a/test/e2e/host_path.go b/test/e2e/host_path.go index 6e55c62bd5..6bc4ccdc46 100644 --- a/test/e2e/host_path.go +++ b/test/e2e/host_path.go @@ -67,7 +67,7 @@ var _ = Describe("hostPath", func() { fmt.Sprintf("--fs_type=%v", volumePath), fmt.Sprintf("--file_mode=%v", volumePath), } - testContainerOutputInNamespace("hostPath mode", c, pod, 0, []string{ + testContainerOutput("hostPath mode", c, pod, 0, []string{ "mode of file \"/test-volume\": dtrwxrwxrwx", // we expect the sticky bit (mode flag t) to be set for the dir }, namespace.Name) @@ -93,7 +93,7 @@ var _ = Describe("hostPath", func() { } //Read the content of the file with the second container to //verify volumes being shared properly among continers within the pod. - testContainerOutputInNamespace("hostPath r/w", c, pod, 1, []string{ + testContainerOutput("hostPath r/w", c, pod, 1, []string{ "content of file \"/test-volume/test-file\": mount-tester new file", }, namespace.Name, ) diff --git a/test/e2e/secrets.go b/test/e2e/secrets.go index 17f21e3e60..26a57438ec 100644 --- a/test/e2e/secrets.go +++ b/test/e2e/secrets.go @@ -92,7 +92,7 @@ var _ = Describe("Secrets", func() { }, } - testContainerOutputInNamespace("consume secrets", f.Client, pod, 0, []string{ + testContainerOutput("consume secrets", f.Client, pod, 0, []string{ "content of file \"/etc/secret-volume/data-1\": value-1", "mode of file \"/etc/secret-volume/data-1\": -r--r--r--", }, f.Namespace.Name) diff --git a/test/e2e/util.go b/test/e2e/util.go index 1f6e96f067..79f0034eac 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -55,6 +55,8 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + + gomegatypes "github.com/onsi/gomega/types" ) const ( @@ -1011,9 +1013,29 @@ func tryKill(cmd *exec.Cmd) { } // testContainerOutputInNamespace runs the given pod in the given namespace and waits -// for all of the containers in the podSpec to move into the 'Success' status. It retrieves -// the exact container log and searches for lines of expected output. -func testContainerOutputInNamespace(scenarioName string, c *client.Client, pod *api.Pod, containerIndex int, expectedOutput []string, ns string) { +// for all of the containers in the podSpec to move into the 'Success' status, and tests +// the specified container log against the given expected output using a substring matcher. +func testContainerOutput(scenarioName string, c *client.Client, pod *api.Pod, containerIndex int, expectedOutput []string, ns string) { + testContainerOutputMatcher(scenarioName, c, pod, containerIndex, expectedOutput, ns, ContainSubstring) +} + +// testContainerOutputInNamespace runs the given pod in the given namespace and waits +// for all of the containers in the podSpec to move into the 'Success' status, and tests +// the specified container log against the given expected output using a regexp matcher. +func testContainerOutputRegexp(scenarioName string, c *client.Client, pod *api.Pod, containerIndex int, expectedOutput []string, ns string) { + testContainerOutputMatcher(scenarioName, c, pod, containerIndex, expectedOutput, ns, MatchRegexp) +} + +// testContainerOutputInNamespace runs the given pod in the given namespace and waits +// for all of the containers in the podSpec to move into the 'Success' status, and tests +// the specified container log against the given expected output using the given matcher. +func testContainerOutputMatcher(scenarioName string, + c *client.Client, + pod *api.Pod, + containerIndex int, + expectedOutput []string, ns string, + matcher func(string, ...interface{}) gomegatypes.GomegaMatcher) { + By(fmt.Sprintf("Creating a pod to test %v", scenarioName)) defer c.Pods(ns).Delete(pod.Name, api.NewDeleteOptions(0)) @@ -1069,7 +1091,7 @@ func testContainerOutputInNamespace(scenarioName string, c *client.Client, pod * } for _, m := range expectedOutput { - Expect(string(logs)).To(ContainSubstring(m), "%q in container output", m) + Expect(string(logs)).To(matcher(m), "%q in container output", m) } }