Pods which exits and won't restart should not be in the Endpoints.NotReadyAddresses

pull/6/head
xiangpengzhao 2017-06-14 15:54:33 +08:00
parent 4ae3b032f4
commit d59c128904
2 changed files with 243 additions and 1 deletions

View File

@ -515,7 +515,7 @@ func addEndpointSubset(subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.Endpoint
Ports: []v1.EndpointPort{epp},
})
readyEps++
} else {
} else if shouldPodBeInEndpoints(pod) {
glog.V(5).Infof("Pod is out of service: %v/%v", pod.Namespace, pod.Name)
subsets = append(subsets, v1.EndpointSubset{
NotReadyAddresses: []v1.EndpointAddress{epa},
@ -525,3 +525,14 @@ func addEndpointSubset(subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.Endpoint
}
return subsets, readyEps, notReadyEps
}
func shouldPodBeInEndpoints(pod *v1.Pod) bool {
switch pod.Spec.RestartPolicy {
case v1.RestartPolicyNever:
return pod.Status.Phase != v1.PodFailed && pod.Status.Phase != v1.PodSucceeded
case v1.RestartPolicyOnFailure:
return pod.Status.Phase != v1.PodSucceeded
default:
return true
}
}

View File

@ -76,6 +76,38 @@ func addPods(store cache.Store, namespace string, nPods int, nPorts int, nNotRea
}
}
func addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(store cache.Store, namespace string, nPods int, nPorts int, restartPolicy v1.RestartPolicy, podPhase v1.PodPhase) {
for i := 0; i < nPods; i++ {
p := &v1.Pod{
TypeMeta: metav1.TypeMeta{APIVersion: api.Registry.GroupOrDie(v1.GroupName).GroupVersion.String()},
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: fmt.Sprintf("pod%d", i),
Labels: map[string]string{"foo": "bar"},
},
Spec: v1.PodSpec{
RestartPolicy: restartPolicy,
Containers: []v1.Container{{Ports: []v1.ContainerPort{}}},
},
Status: v1.PodStatus{
PodIP: fmt.Sprintf("1.2.3.%d", 4+i),
Phase: podPhase,
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionFalse,
},
},
},
}
for j := 0; j < nPorts; j++ {
p.Spec.Containers[0].Ports = append(p.Spec.Containers[0].Ports,
v1.ContainerPort{Name: fmt.Sprintf("port%d", i), ContainerPort: int32(8080 + j)})
}
store.Add(p)
}
}
type serverResponse struct {
statusCode int
obj interface{}
@ -661,3 +693,202 @@ func TestSyncEndpointsHeadlessService(t *testing.T) {
endpointsHandler.ValidateRequestCount(t, 1)
endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
}
func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseFailed(t *testing.T) {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
Labels: map[string]string{
"foo": "bar",
},
},
Subsets: []v1.EndpointSubset{},
})
addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(endpoints.podStore, ns, 1, 1, v1.RestartPolicyNever, v1.PodFailed)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
},
})
endpoints.syncService(ns + "/foo")
data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
},
Subsets: []v1.EndpointSubset{},
})
endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
}
func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseSucceeded(t *testing.T) {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
Labels: map[string]string{
"foo": "bar",
},
},
Subsets: []v1.EndpointSubset{},
})
addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(endpoints.podStore, ns, 1, 1, v1.RestartPolicyNever, v1.PodSucceeded)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
},
})
endpoints.syncService(ns + "/foo")
data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
},
Subsets: []v1.EndpointSubset{},
})
endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
}
func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyOnFailureAndPhaseSucceeded(t *testing.T) {
ns := "other"
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL)
endpoints.endpointsStore.Add(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
Labels: map[string]string{
"foo": "bar",
},
},
Subsets: []v1.EndpointSubset{},
})
addNotReadyPodsWithSpecifiedRestartPolicyAndPhase(endpoints.podStore, ns, 1, 1, v1.RestartPolicyOnFailure, v1.PodSucceeded)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}},
},
})
endpoints.syncService(ns + "/foo")
data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
},
Subsets: []v1.EndpointSubset{},
})
endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
}
// There are 3*5 possibilities(3 types of RestartPolicy by 5 types of PodPhase). Not list them all here.
// Just list all of the 3 false cases and 3 of the 12 true cases.
func TestShouldPodBeInEndpoints(t *testing.T) {
testCases := []struct {
name string
pod *v1.Pod
expected bool
}{
// Pod should not be in endpoints cases:
{
name: "Failed pod with Never RestartPolicy",
pod: &v1.Pod{
Spec: v1.PodSpec{
RestartPolicy: v1.RestartPolicyNever,
},
Status: v1.PodStatus{
Phase: v1.PodFailed,
},
},
expected: false,
},
{
name: "Succeeded pod with Never RestartPolicy",
pod: &v1.Pod{
Spec: v1.PodSpec{
RestartPolicy: v1.RestartPolicyNever,
},
Status: v1.PodStatus{
Phase: v1.PodSucceeded,
},
},
expected: false,
},
{
name: "Succeeded pod with OnFailure RestartPolicy",
pod: &v1.Pod{
Spec: v1.PodSpec{
RestartPolicy: v1.RestartPolicyOnFailure,
},
Status: v1.PodStatus{
Phase: v1.PodSucceeded,
},
},
expected: false,
},
// Pod should be in endpoints cases:
{
name: "Failed pod with Always RestartPolicy",
pod: &v1.Pod{
Spec: v1.PodSpec{
RestartPolicy: v1.RestartPolicyAlways,
},
Status: v1.PodStatus{
Phase: v1.PodFailed,
},
},
expected: true,
},
{
name: "Pending pod with Never RestartPolicy",
pod: &v1.Pod{
Spec: v1.PodSpec{
RestartPolicy: v1.RestartPolicyNever,
},
Status: v1.PodStatus{
Phase: v1.PodPending,
},
},
expected: true,
},
{
name: "Unknown pod with OnFailure RestartPolicy",
pod: &v1.Pod{
Spec: v1.PodSpec{
RestartPolicy: v1.RestartPolicyOnFailure,
},
Status: v1.PodStatus{
Phase: v1.PodUnknown,
},
},
expected: true,
},
}
for _, test := range testCases {
result := shouldPodBeInEndpoints(test.pod)
if result != test.expected {
t.Errorf("%s: expected : %t, got: %t", test.name, test.expected, result)
}
}
}