Merge pull request #13052 from pmorie/podip-fix

Fix race condition for consuming podIP via downward API
pull/6/head
Marcin Wielgus 2015-09-02 16:04:01 +02:00
commit 3e9932557d
8 changed files with 238 additions and 77 deletions

View File

@ -285,16 +285,58 @@ type containerStatusResult struct {
err error
}
const podIPDownwardAPISelector = "status.podIP"
// podDependsOnIP returns whether any containers in a pod depend on using the pod IP via
// the downward API.
func podDependsOnPodIP(pod *api.Pod) bool {
for _, container := range pod.Spec.Containers {
for _, env := range container.Env {
if env.ValueFrom != nil &&
env.ValueFrom.FieldRef != nil &&
env.ValueFrom.FieldRef.FieldPath == podIPDownwardAPISelector {
return true
}
}
}
return false
}
// determineContainerIP determines the IP address of the given container. It is expected
// that the container passed is the infrastructure container of a pod and the responsibility
// of the caller to ensure that the correct container is passed.
func (dm *DockerManager) determineContainerIP(podNamespace, podName string, container *docker.Container) string {
result := ""
if container.NetworkSettings != nil {
result = container.NetworkSettings.IPAddress
}
if dm.networkPlugin.Name() != network.DefaultPluginName {
netStatus, err := dm.networkPlugin.Status(podNamespace, podName, kubeletTypes.DockerID(container.ID))
if err != nil {
glog.Errorf("NetworkPlugin %s failed on the status hook for pod '%s' - %v", dm.networkPlugin.Name(), podName, err)
} else if netStatus != nil {
result = netStatus.IP.String()
}
}
return result
}
func (dm *DockerManager) inspectContainer(dockerID, containerName, tPath string, pod *api.Pod) *containerStatusResult {
result := containerStatusResult{api.ContainerStatus{}, "", nil}
inspectResult, err := dm.client.InspectContainer(dockerID)
if err != nil {
result.err = err
return &result
}
// NOTE (pmorie): this is a seriously fishy if statement. A nil result from InspectContainer seems like it should
// always be paired with a non-nil error in the result of InspectContainer.
if inspectResult == nil {
glog.Error("Received a nil result from InspectContainer without receiving an error")
// Why did we not get an error?
return &result
}
@ -312,18 +354,7 @@ func (dm *DockerManager) inspectContainer(dockerID, containerName, tPath string,
StartedAt: util.NewTime(inspectResult.State.StartedAt),
}
if containerName == PodInfraContainerName {
if inspectResult.NetworkSettings != nil {
result.ip = inspectResult.NetworkSettings.IPAddress
}
// override the above if a network plugin exists
if dm.networkPlugin.Name() != network.DefaultPluginName {
netStatus, err := dm.networkPlugin.Status(pod.Namespace, pod.Name, kubeletTypes.DockerID(dockerID))
if err != nil {
glog.Errorf("NetworkPlugin %s failed on the status hook for pod '%s' - %v", dm.networkPlugin.Name(), pod.Name, err)
} else if netStatus != nil {
result.ip = netStatus.IP.String()
}
}
result.ip = dm.determineContainerIP(pod.Namespace, pod.Name, inspectResult)
}
} else if !inspectResult.State.FinishedAt.IsZero() {
reason := ""
@ -1345,7 +1376,7 @@ func containerAndPodFromLabels(inspect *docker.Container) (pod *api.Pod, contain
}
// Run a single container from a pod. Returns the docker container ID
func (dm *DockerManager) runContainerInPod(pod *api.Pod, container *api.Container, netMode, ipcMode string) (kubeletTypes.DockerID, error) {
func (dm *DockerManager) runContainerInPod(pod *api.Pod, container *api.Container, netMode, ipcMode string) (kubeletTypes.DockerID, *docker.Container, error) {
start := time.Now()
defer func() {
metrics.ContainerManagerLatency.WithLabelValues("runContainerInPod").Observe(metrics.SinceInMicroseconds(start))
@ -1358,7 +1389,7 @@ func (dm *DockerManager) runContainerInPod(pod *api.Pod, container *api.Containe
opts, err := dm.generator.GenerateRunContainerOptions(pod, container)
if err != nil {
return "", err
return "", nil, err
}
utsMode := ""
@ -1367,7 +1398,7 @@ func (dm *DockerManager) runContainerInPod(pod *api.Pod, container *api.Containe
}
id, err := dm.runContainer(pod, container, opts, ref, netMode, ipcMode, utsMode)
if err != nil {
return "", err
return "", nil, err
}
// Remember this reference so we can report events about this container
@ -1379,7 +1410,7 @@ func (dm *DockerManager) runContainerInPod(pod *api.Pod, container *api.Containe
handlerErr := dm.runner.Run(id, pod, container, container.Lifecycle.PostStart)
if handlerErr != nil {
dm.KillContainerInPod(types.UID(id), container, pod)
return kubeletTypes.DockerID(""), fmt.Errorf("failed to call event handler: %v", handlerErr)
return kubeletTypes.DockerID(""), nil, fmt.Errorf("failed to call event handler: %v", handlerErr)
}
}
@ -1397,11 +1428,11 @@ func (dm *DockerManager) runContainerInPod(pod *api.Pod, container *api.Containe
// Container information is used in adjusting OOM scores and adding ndots.
containerInfo, err := dm.client.InspectContainer(string(id))
if err != nil {
return "", err
return "", nil, err
}
// Ensure the PID actually exists, else we'll move ourselves.
if containerInfo.State.Pid == 0 {
return "", fmt.Errorf("failed to get init PID for Docker container %q", string(id))
return "", nil, fmt.Errorf("failed to get init PID for Docker container %q", string(id))
}
// Set OOM score of the container based on the priority of the container.
@ -1416,10 +1447,10 @@ func (dm *DockerManager) runContainerInPod(pod *api.Pod, container *api.Containe
}
cgroupName, err := dm.procFs.GetFullContainerName(containerInfo.State.Pid)
if err != nil {
return "", err
return "", nil, err
}
if err = dm.oomAdjuster.ApplyOomScoreAdjContainer(cgroupName, oomScoreAdj, 5); err != nil {
return "", err
return "", nil, err
}
// currently, Docker does not have a flag by which the ndots option can be passed.
@ -1432,7 +1463,7 @@ func (dm *DockerManager) runContainerInPod(pod *api.Pod, container *api.Containe
err = addNDotsOption(containerInfo.ResolvConfPath)
}
return kubeletTypes.DockerID(id), err
return kubeletTypes.DockerID(id), containerInfo, err
}
func addNDotsOption(resolvFilePath string) error {
@ -1466,7 +1497,7 @@ func appendToFile(filePath, stringToAppend string) error {
}
// createPodInfraContainer starts the pod infra container for a pod. Returns the docker container ID of the newly created container.
func (dm *DockerManager) createPodInfraContainer(pod *api.Pod) (kubeletTypes.DockerID, error) {
func (dm *DockerManager) createPodInfraContainer(pod *api.Pod) (kubeletTypes.DockerID, *docker.Container, error) {
start := time.Now()
defer func() {
metrics.ContainerManagerLatency.WithLabelValues("createPodInfraContainer").Observe(metrics.SinceInMicroseconds(start))
@ -1494,15 +1525,15 @@ func (dm *DockerManager) createPodInfraContainer(pod *api.Pod) (kubeletTypes.Doc
// No pod secrets for the infra container.
if err := dm.imagePuller.PullImage(pod, container, nil); err != nil {
return "", err
return "", nil, err
}
id, err := dm.runContainerInPod(pod, container, netNamespace, "")
id, dockerContainer, err := dm.runContainerInPod(pod, container, netNamespace, "")
if err != nil {
return "", err
return "", nil, err
}
return id, nil
return id, dockerContainer, nil
}
// TODO(vmarmol): This will soon be made non-public when its only use is internal.
@ -1702,16 +1733,21 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, pod
podInfraContainerID := containerChanges.InfraContainerId
if containerChanges.StartInfraContainer && (len(containerChanges.ContainersToStart) > 0) {
glog.V(4).Infof("Creating pod infra container for %q", podFullName)
podInfraContainerID, err = dm.createPodInfraContainer(pod)
// Call the networking plugin
podInfraContainerID, podInfraContainer, err := dm.createPodInfraContainer(pod)
if err == nil {
// Call the networking plugin
err = dm.networkPlugin.SetUpPod(pod.Namespace, pod.Name, podInfraContainerID)
}
if err != nil {
glog.Errorf("Failed to create pod infra container: %v; Skipping pod %q", err, podFullName)
return err
}
if podDependsOnPodIP(pod) {
// Find the pod IP after starting the infra container in order to expose
// it safely via the downward API without a race.
pod.Status.PodIP = dm.determineContainerIP(pod.Name, pod.Namespace, podInfraContainer)
}
}
// Start everything
@ -1743,7 +1779,7 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, pod
// TODO(dawnchen): Check RestartPolicy.DelaySeconds before restart a container
namespaceMode := fmt.Sprintf("container:%v", podInfraContainerID)
_, err = dm.runContainerInPod(pod, container, namespaceMode, namespaceMode)
_, _, err = dm.runContainerInPod(pod, container, namespaceMode, namespaceMode)
dm.updateReasonCache(pod, container, err)
if err != nil {
// TODO(bburns) : Perhaps blacklist a container after N failures?

View File

@ -2334,3 +2334,62 @@ func TestGetUidFromUser(t *testing.T) {
}
}
}
func TestPodDependsOnPodIP(t *testing.T) {
tests := []struct {
name string
expected bool
env api.EnvVar
}{
{
name: "depends on pod IP",
expected: true,
env: api.EnvVar{
Name: "POD_IP",
ValueFrom: &api.EnvVarSource{
FieldRef: &api.ObjectFieldSelector{
APIVersion: testapi.Version(),
FieldPath: "status.podIP",
},
},
},
},
{
name: "literal value",
expected: false,
env: api.EnvVar{
Name: "SOME_VAR",
Value: "foo",
},
},
{
name: "other downward api field",
expected: false,
env: api.EnvVar{
Name: "POD_NAME",
ValueFrom: &api.EnvVarSource{
FieldRef: &api.ObjectFieldSelector{
APIVersion: testapi.Version(),
FieldPath: "metadata.name",
},
},
},
},
}
for _, tc := range tests {
pod := &api.Pod{
Spec: api.PodSpec{
Containers: []api.Container{
{Env: []api.EnvVar{tc.env}},
},
},
}
result := podDependsOnPodIP(pod)
if e, a := tc.expected, result; e != a {
t.Errorf("%v: Unexpected result; expected %v, got %v", tc.name, e, a)
}
}
}

View File

@ -45,7 +45,7 @@ var _ = Describe("Docker Containers", func() {
})
It("should use the image defaults if command and args are blank", func() {
testContainerOutputInNamespace("use defaults", c, entrypointTestPod(), 0, []string{
testContainerOutput("use defaults", c, entrypointTestPod(), 0, []string{
"[/ep default arguments]",
}, ns)
})
@ -54,7 +54,7 @@ var _ = Describe("Docker Containers", func() {
pod := entrypointTestPod()
pod.Spec.Containers[0].Args = []string{"override", "arguments"}
testContainerOutputInNamespace("override arguments", c, pod, 0, []string{
testContainerOutput("override arguments", c, pod, 0, []string{
"[/ep override arguments]",
}, ns)
})
@ -65,7 +65,7 @@ var _ = Describe("Docker Containers", func() {
pod := entrypointTestPod()
pod.Spec.Containers[0].Command = []string{"/ep-2"}
testContainerOutputInNamespace("override command", c, pod, 0, []string{
testContainerOutput("override command", c, pod, 0, []string{
"[/ep-2]",
}, ns)
})
@ -75,7 +75,7 @@ var _ = Describe("Docker Containers", func() {
pod.Spec.Containers[0].Command = []string{"/ep-2"}
pod.Spec.Containers[0].Args = []string{"override", "arguments"}
testContainerOutputInNamespace("override all", c, pod, 0, []string{
testContainerOutput("override all", c, pod, 0, []string{
"[/ep-2 override arguments]",
}, ns)
})

View File

@ -30,46 +30,85 @@ var _ = Describe("Downward API", func() {
It("should provide pod name and namespace as env vars", func() {
podName := "downward-api-" + string(util.NewUUID())
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: podName,
Labels: map[string]string{"name": podName},
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "dapi-container",
Image: "gcr.io/google_containers/busybox",
Command: []string{"sh", "-c", "env"},
Env: []api.EnvVar{
{
Name: "POD_NAME",
ValueFrom: &api.EnvVarSource{
FieldRef: &api.ObjectFieldSelector{
APIVersion: "v1",
FieldPath: "metadata.name",
},
},
},
{
Name: "POD_NAMESPACE",
ValueFrom: &api.EnvVarSource{
FieldRef: &api.ObjectFieldSelector{
APIVersion: "v1",
FieldPath: "metadata.namespace",
},
},
},
},
env := []api.EnvVar{
{
Name: "POD_NAME",
ValueFrom: &api.EnvVarSource{
FieldRef: &api.ObjectFieldSelector{
APIVersion: "v1",
FieldPath: "metadata.name",
},
},
},
{
Name: "POD_NAMESPACE",
ValueFrom: &api.EnvVarSource{
FieldRef: &api.ObjectFieldSelector{
APIVersion: "v1",
FieldPath: "metadata.namespace",
},
},
},
{
Name: "POD_IP",
ValueFrom: &api.EnvVarSource{
FieldRef: &api.ObjectFieldSelector{
APIVersion: "v1",
FieldPath: "status.podIP",
},
},
RestartPolicy: api.RestartPolicyNever,
},
}
framework.TestContainerOutput("downward api env vars", pod, 0, []string{
expectations := []string{
fmt.Sprintf("POD_NAME=%v", podName),
fmt.Sprintf("POD_NAMESPACE=%v", framework.Namespace.Name),
})
}
testDownwardAPI(framework, podName, env, expectations)
})
It("should provide pod IP as an env var", func() {
podName := "downward-api-" + string(util.NewUUID())
env := []api.EnvVar{
{
Name: "POD_IP",
ValueFrom: &api.EnvVarSource{
FieldRef: &api.ObjectFieldSelector{
APIVersion: "v1",
FieldPath: "status.podIP",
},
},
},
}
expectations := []string{
"POD_IP=(?:\\d+)\\.(?:\\d+)\\.(?:\\d+)\\.(?:\\d+)",
}
testDownwardAPI(framework, podName, env, expectations)
})
})
func testDownwardAPI(framework *Framework, podName string, env []api.EnvVar, expectations []string) {
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: podName,
Labels: map[string]string{"name": podName},
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "dapi-container",
Image: "gcr.io/google_containers/busybox",
Command: []string{"sh", "-c", "env"},
Env: env,
},
},
RestartPolicy: api.RestartPolicyNever,
},
}
framework.TestContainerOutputRegexp("downward api env vars", pod, 0, expectations)
}

View File

@ -112,7 +112,12 @@ func (f *Framework) WaitForPodRunning(podName string) error {
// Runs the given pod and verifies that the output of exact container matches the desired output.
func (f *Framework) TestContainerOutput(scenarioName string, pod *api.Pod, containerIndex int, expectedOutput []string) {
testContainerOutputInNamespace(scenarioName, f.Client, pod, containerIndex, expectedOutput, f.Namespace.Name)
testContainerOutput(scenarioName, f.Client, pod, containerIndex, expectedOutput, f.Namespace.Name)
}
// Runs the given pod and verifies that the output of exact container matches the desired regexps.
func (f *Framework) TestContainerOutputRegexp(scenarioName string, pod *api.Pod, containerIndex int, expectedOutput []string) {
testContainerOutputRegexp(scenarioName, f.Client, pod, containerIndex, expectedOutput, f.Namespace.Name)
}
// WaitForAnEndpoint waits for at least one endpoint to become available in the

View File

@ -67,7 +67,7 @@ var _ = Describe("hostPath", func() {
fmt.Sprintf("--fs_type=%v", volumePath),
fmt.Sprintf("--file_mode=%v", volumePath),
}
testContainerOutputInNamespace("hostPath mode", c, pod, 0, []string{
testContainerOutput("hostPath mode", c, pod, 0, []string{
"mode of file \"/test-volume\": dtrwxrwxrwx", // we expect the sticky bit (mode flag t) to be set for the dir
},
namespace.Name)
@ -93,7 +93,7 @@ var _ = Describe("hostPath", func() {
}
//Read the content of the file with the second container to
//verify volumes being shared properly among continers within the pod.
testContainerOutputInNamespace("hostPath r/w", c, pod, 1, []string{
testContainerOutput("hostPath r/w", c, pod, 1, []string{
"content of file \"/test-volume/test-file\": mount-tester new file",
}, namespace.Name,
)

View File

@ -92,7 +92,7 @@ var _ = Describe("Secrets", func() {
},
}
testContainerOutputInNamespace("consume secrets", f.Client, pod, 0, []string{
testContainerOutput("consume secrets", f.Client, pod, 0, []string{
"content of file \"/etc/secret-volume/data-1\": value-1",
"mode of file \"/etc/secret-volume/data-1\": -r--r--r--",
}, f.Namespace.Name)

View File

@ -55,6 +55,8 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
gomegatypes "github.com/onsi/gomega/types"
)
const (
@ -1017,9 +1019,29 @@ func tryKill(cmd *exec.Cmd) {
}
// testContainerOutputInNamespace runs the given pod in the given namespace and waits
// for all of the containers in the podSpec to move into the 'Success' status. It retrieves
// the exact container log and searches for lines of expected output.
func testContainerOutputInNamespace(scenarioName string, c *client.Client, pod *api.Pod, containerIndex int, expectedOutput []string, ns string) {
// for all of the containers in the podSpec to move into the 'Success' status, and tests
// the specified container log against the given expected output using a substring matcher.
func testContainerOutput(scenarioName string, c *client.Client, pod *api.Pod, containerIndex int, expectedOutput []string, ns string) {
testContainerOutputMatcher(scenarioName, c, pod, containerIndex, expectedOutput, ns, ContainSubstring)
}
// testContainerOutputInNamespace runs the given pod in the given namespace and waits
// for all of the containers in the podSpec to move into the 'Success' status, and tests
// the specified container log against the given expected output using a regexp matcher.
func testContainerOutputRegexp(scenarioName string, c *client.Client, pod *api.Pod, containerIndex int, expectedOutput []string, ns string) {
testContainerOutputMatcher(scenarioName, c, pod, containerIndex, expectedOutput, ns, MatchRegexp)
}
// testContainerOutputInNamespace runs the given pod in the given namespace and waits
// for all of the containers in the podSpec to move into the 'Success' status, and tests
// the specified container log against the given expected output using the given matcher.
func testContainerOutputMatcher(scenarioName string,
c *client.Client,
pod *api.Pod,
containerIndex int,
expectedOutput []string, ns string,
matcher func(string, ...interface{}) gomegatypes.GomegaMatcher) {
By(fmt.Sprintf("Creating a pod to test %v", scenarioName))
defer c.Pods(ns).Delete(pod.Name, api.NewDeleteOptions(0))
@ -1075,7 +1097,7 @@ func testContainerOutputInNamespace(scenarioName string, c *client.Client, pod *
}
for _, m := range expectedOutput {
Expect(string(logs)).To(ContainSubstring(m), "%q in container output", m)
Expect(string(logs)).To(matcher(m), "%q in container output", m)
}
}