diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index 78cf37a95a..27d43f11bc 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -515,7 +515,7 @@ func addEndpointSubset(subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.Endpoint Ports: []v1.EndpointPort{epp}, }) readyEps++ - } else { + } else if shouldPodBeInEndpoints(pod) { glog.V(5).Infof("Pod is out of service: %v/%v", pod.Namespace, pod.Name) subsets = append(subsets, v1.EndpointSubset{ NotReadyAddresses: []v1.EndpointAddress{epa}, @@ -525,3 +525,14 @@ func addEndpointSubset(subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.Endpoint } return subsets, readyEps, notReadyEps } + +func shouldPodBeInEndpoints(pod *v1.Pod) bool { + switch pod.Spec.RestartPolicy { + case v1.RestartPolicyNever: + return pod.Status.Phase != v1.PodFailed && pod.Status.Phase != v1.PodSucceeded + case v1.RestartPolicyOnFailure: + return pod.Status.Phase != v1.PodSucceeded + default: + return true + } +} diff --git a/pkg/controller/endpoint/endpoints_controller_test.go b/pkg/controller/endpoint/endpoints_controller_test.go index 9a45cd84e2..89a181557a 100644 --- a/pkg/controller/endpoint/endpoints_controller_test.go +++ b/pkg/controller/endpoint/endpoints_controller_test.go @@ -76,6 +76,38 @@ func addPods(store cache.Store, namespace string, nPods int, nPorts int, nNotRea } } +func addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(store cache.Store, namespace string, nPods int, nPorts int, restartPolicy v1.RestartPolicy, podPhase v1.PodPhase) { + for i := 0; i < nPods; i++ { + p := &v1.Pod{ + TypeMeta: metav1.TypeMeta{APIVersion: api.Registry.GroupOrDie(v1.GroupName).GroupVersion.String()}, + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: fmt.Sprintf("pod%d", i), + Labels: map[string]string{"foo": "bar"}, + }, + Spec: v1.PodSpec{ + RestartPolicy: restartPolicy, + Containers: []v1.Container{{Ports: []v1.ContainerPort{}}}, + }, + Status: v1.PodStatus{ + PodIP: fmt.Sprintf("1.2.3.%d", 4+i), + Phase: podPhase, + Conditions: []v1.PodCondition{ + { + Type: v1.PodReady, + Status: v1.ConditionFalse, + }, + }, + }, + } + for j := 0; j < nPorts; j++ { + p.Spec.Containers[0].Ports = append(p.Spec.Containers[0].Ports, + v1.ContainerPort{Name: fmt.Sprintf("port%d", i), ContainerPort: int32(8080 + j)}) + } + store.Add(p) + } +} + type serverResponse struct { statusCode int obj interface{} @@ -661,3 +693,202 @@ func TestSyncEndpointsHeadlessService(t *testing.T) { endpointsHandler.ValidateRequestCount(t, 1) endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data) } + +func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseFailed(t *testing.T) { + ns := "other" + testServer, endpointsHandler := makeTestServer(t, ns) + defer testServer.Close() + endpoints := newController(testServer.URL) + endpoints.endpointsStore.Add(&v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + Labels: map[string]string{ + "foo": "bar", + }, + }, + Subsets: []v1.EndpointSubset{}, + }) + addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(endpoints.podStore, ns, 1, 1, v1.RestartPolicyNever, v1.PodFailed) + endpoints.serviceStore.Add(&v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, + Spec: v1.ServiceSpec{ + Selector: map[string]string{"foo": "bar"}, + Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}}, + }, + }) + endpoints.syncService(ns + "/foo") + data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + }, + Subsets: []v1.EndpointSubset{}, + }) + endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data) +} + +func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseSucceeded(t *testing.T) { + ns := "other" + testServer, endpointsHandler := makeTestServer(t, ns) + defer testServer.Close() + endpoints := newController(testServer.URL) + endpoints.endpointsStore.Add(&v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + Labels: map[string]string{ + "foo": "bar", + }, + }, + Subsets: []v1.EndpointSubset{}, + }) + addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(endpoints.podStore, ns, 1, 1, v1.RestartPolicyNever, v1.PodSucceeded) + endpoints.serviceStore.Add(&v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, + Spec: v1.ServiceSpec{ + Selector: map[string]string{"foo": "bar"}, + Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}}, + }, + }) + endpoints.syncService(ns + "/foo") + data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + }, + Subsets: []v1.EndpointSubset{}, + }) + endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data) +} + +func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyOnFailureAndPhaseSucceeded(t *testing.T) { + ns := "other" + testServer, endpointsHandler := makeTestServer(t, ns) + defer testServer.Close() + endpoints := newController(testServer.URL) + endpoints.endpointsStore.Add(&v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + Labels: map[string]string{ + "foo": "bar", + }, + }, + Subsets: []v1.EndpointSubset{}, + }) + addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(endpoints.podStore, ns, 1, 1, v1.RestartPolicyOnFailure, v1.PodSucceeded) + endpoints.serviceStore.Add(&v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, + Spec: v1.ServiceSpec{ + Selector: map[string]string{"foo": "bar"}, + Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}}, + }, + }) + endpoints.syncService(ns + "/foo") + data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + }, + Subsets: []v1.EndpointSubset{}, + }) + endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data) +} + +// There are 3*5 possibilities(3 types of RestartPolicy by 5 types of PodPhase). Not list them all here. +// Just list all of the 3 false cases and 3 of the 12 true cases. +func TestShouldPodBeInEndpoints(t *testing.T) { + testCases := []struct { + name string + pod *v1.Pod + expected bool + }{ + // Pod should not be in endpoints cases: + { + name: "Failed pod with Never RestartPolicy", + pod: &v1.Pod{ + Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyNever, + }, + Status: v1.PodStatus{ + Phase: v1.PodFailed, + }, + }, + expected: false, + }, + { + name: "Succeeded pod with Never RestartPolicy", + pod: &v1.Pod{ + Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyNever, + }, + Status: v1.PodStatus{ + Phase: v1.PodSucceeded, + }, + }, + expected: false, + }, + { + name: "Succeeded pod with OnFailure RestartPolicy", + pod: &v1.Pod{ + Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyOnFailure, + }, + Status: v1.PodStatus{ + Phase: v1.PodSucceeded, + }, + }, + expected: false, + }, + // Pod should be in endpoints cases: + { + name: "Failed pod with Always RestartPolicy", + pod: &v1.Pod{ + Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyAlways, + }, + Status: v1.PodStatus{ + Phase: v1.PodFailed, + }, + }, + expected: true, + }, + { + name: "Pending pod with Never RestartPolicy", + pod: &v1.Pod{ + Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyNever, + }, + Status: v1.PodStatus{ + Phase: v1.PodPending, + }, + }, + expected: true, + }, + { + name: "Unknown pod with OnFailure RestartPolicy", + pod: &v1.Pod{ + Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyOnFailure, + }, + Status: v1.PodStatus{ + Phase: v1.PodUnknown, + }, + }, + expected: true, + }, + } + for _, test := range testCases { + result := shouldPodBeInEndpoints(test.pod) + if result != test.expected { + t.Errorf("%s: expected : %t, got: %t", test.name, test.expected, result) + } + } +}