From df2cbd48778908b069401f880e4a44f7464ec676 Mon Sep 17 00:00:00 2001 From: Yu-Ju Hong Date: Thu, 16 Apr 2015 17:37:57 -0700 Subject: [PATCH] Prioritize deleting the non-running pods when reducing replicas This changes instructs the replication controller to delete replicas in the order of "unscheduled (pending)", "scheduled (pending)", and "scheduled (running)" pods. This is less disruptive than deleting random pods. --- pkg/api/resource_helpers.go | 10 ++++ pkg/controller/replication_controller.go | 55 ++++++++++++++----- pkg/controller/replication_controller_test.go | 52 ++++++++++++++++++ pkg/service/endpoints_controller.go | 9 +-- 4 files changed, 104 insertions(+), 22 deletions(-) diff --git a/pkg/api/resource_helpers.go b/pkg/api/resource_helpers.go index 7dff8b9412..e4f8f7edeb 100644 --- a/pkg/api/resource_helpers.go +++ b/pkg/api/resource_helpers.go @@ -58,3 +58,13 @@ func GetExistingContainerStatus(statuses []ContainerStatus, name string) Contain } return ContainerStatus{} } + +// IsPodReady retruns true if a pod is ready; false otherwise. +func IsPodReady(pod *Pod) bool { + for _, c := range pod.Status.Conditions { + if c.Type == PodReady && c.Status == ConditionTrue { + return true + } + } + return false +} diff --git a/pkg/controller/replication_controller.go b/pkg/controller/replication_controller.go index b74578261e..0e43b90991 100644 --- a/pkg/controller/replication_controller.go +++ b/pkg/controller/replication_controller.go @@ -18,6 +18,7 @@ package controller import ( "fmt" + "sort" "sync" "time" @@ -163,7 +164,7 @@ func (rm *ReplicationManager) watchControllers(resourceVersion *string) { if !ok { if status, ok := event.Object.(*api.Status); ok { if status.Status == api.StatusFailure { - glog.Errorf("failed to watch: %v", status) + glog.Errorf("Failed to watch: %v", status) // Clear resource version here, as above, this won't hurt consistency, but we // should consider introspecting more carefully here. (or make the apiserver smarter) // "why not both?" @@ -178,7 +179,7 @@ func (rm *ReplicationManager) watchControllers(resourceVersion *string) { *resourceVersion = rc.ResourceVersion // Sync even if this is a deletion event, to ensure that we leave // it in the desired state. - glog.V(4).Infof("About to sync from watch: %v", rc.Name) + glog.V(4).Infof("About to sync from watch: %q", rc.Name) if err := rm.syncHandler(*rc); err != nil { util.HandleError(fmt.Errorf("unexpected sync error: %v", err)) } @@ -186,33 +187,54 @@ func (rm *ReplicationManager) watchControllers(resourceVersion *string) { } } -// Helper function. Also used in pkg/registry/controller, for now. -func FilterActivePods(pods []api.Pod) []*api.Pod { +// filterActivePods returns pods that have not terminated. +func filterActivePods(pods []api.Pod) []*api.Pod { var result []*api.Pod - for i := range pods { - value := &pods[i] + for _, value := range pods { if api.PodSucceeded != value.Status.Phase && api.PodFailed != value.Status.Phase { - result = append(result, value) + result = append(result, &value) } } return result } +type activePods []*api.Pod + +func (s activePods) Len() int { return len(s) } +func (s activePods) Swap(i, j int) { s[i], s[j] = s[j], s[i] } + +func (s activePods) Less(i, j int) bool { + // Unassigned < assigned + if s[i].Spec.Host == "" && s[j].Spec.Host != "" { + return true + } + // PodPending < PodUnknown < PodRunning + m := map[api.PodPhase]int{api.PodPending: 0, api.PodUnknown: 1, api.PodRunning: 2} + if m[s[i].Status.Phase] != m[s[j].Status.Phase] { + return m[s[i].Status.Phase] < m[s[j].Status.Phase] + } + // Not ready < ready + if !api.IsPodReady(s[i]) && api.IsPodReady(s[j]) { + return true + } + return false +} + func (rm *ReplicationManager) syncReplicationController(controller api.ReplicationController) error { s := labels.Set(controller.Spec.Selector).AsSelector() podList, err := rm.kubeClient.Pods(controller.Namespace).List(s) if err != nil { return err } - filteredList := FilterActivePods(podList.Items) - activePods := len(filteredList) - diff := activePods - controller.Spec.Replicas + filteredList := filterActivePods(podList.Items) + numActivePods := len(filteredList) + diff := numActivePods - controller.Spec.Replicas if diff < 0 { diff *= -1 wait := sync.WaitGroup{} wait.Add(diff) - glog.V(2).Infof("Too few \"%s\" replicas, creating %d\n", controller.Name, diff) + glog.V(2).Infof("Too few %q replicas, creating %d", controller.Name, diff) for i := 0; i < diff; i++ { go func() { defer wait.Done() @@ -221,7 +243,12 @@ func (rm *ReplicationManager) syncReplicationController(controller api.Replicati } wait.Wait() } else if diff > 0 { - glog.V(2).Infof("Too many \"%s\" replicas, deleting %d\n", controller.Name, diff) + glog.V(2).Infof("Too many %q replicas, deleting %d", controller.Name, diff) + // Sort the pods in the order such that not-ready < ready, unscheduled + // < scheduled, and pending < running. This ensures that we delete pods + // in the earlier stages whenever possible. + sort.Sort(activePods(filteredList)) + wait := sync.WaitGroup{} wait.Add(diff) for i := 0; i < diff; i++ { @@ -232,8 +259,8 @@ func (rm *ReplicationManager) syncReplicationController(controller api.Replicati } wait.Wait() } - if controller.Status.Replicas != activePods { - controller.Status.Replicas = activePods + if controller.Status.Replicas != numActivePods { + controller.Status.Replicas = numActivePods _, err = rm.kubeClient.ReplicationControllers(controller.Namespace).Update(&controller) if err != nil { return err diff --git a/pkg/controller/replication_controller_test.go b/pkg/controller/replication_controller_test.go index 8adb7094ba..aac5fc7bcb 100644 --- a/pkg/controller/replication_controller_test.go +++ b/pkg/controller/replication_controller_test.go @@ -18,8 +18,11 @@ package controller import ( "fmt" + "math/rand" "net/http" "net/http/httptest" + "reflect" + "sort" "sync" "testing" "time" @@ -375,3 +378,52 @@ func TestWatchControllers(t *testing.T) { t.Errorf("Expected 1 call but got 0") } } + +func TestSortingActivePods(t *testing.T) { + numPods := 5 + podList := newPodList(numPods) + pods := make([]*api.Pod, len(podList.Items)) + for i := range podList.Items { + pods[i] = &podList.Items[i] + } + // pods[0] is not scheduled yet. + pods[0].Spec.Host = "" + pods[0].Status.Phase = api.PodPending + // pods[1] is scheduled but pending. + pods[1].Spec.Host = "bar" + pods[1].Status.Phase = api.PodPending + // pods[2] is unknown. + pods[2].Spec.Host = "foo" + pods[2].Status.Phase = api.PodUnknown + // pods[3] is running but not ready. + pods[3].Spec.Host = "foo" + pods[3].Status.Phase = api.PodRunning + // pods[4] is running and ready. + pods[4].Spec.Host = "foo" + pods[4].Status.Phase = api.PodRunning + pods[4].Status.Conditions = []api.PodCondition{{Type: api.PodReady, Status: api.ConditionTrue}} + + getOrder := func(pods []*api.Pod) []string { + names := make([]string, len(pods)) + for i := range pods { + names[i] = pods[i].Name + } + return names + } + + expected := getOrder(pods) + + for i := 0; i < 20; i++ { + idx := rand.Perm(numPods) + randomizedPods := make([]*api.Pod, numPods) + for j := 0; j < numPods; j++ { + randomizedPods[j] = pods[idx[j]] + } + sort.Sort(activePods(randomizedPods)) + actual := getOrder(randomizedPods) + + if !reflect.DeepEqual(actual, expected) { + t.Errorf("expected %v, got %v", expected, actual) + } + } +} diff --git a/pkg/service/endpoints_controller.go b/pkg/service/endpoints_controller.go index 1be624dc97..b5c0f31799 100644 --- a/pkg/service/endpoints_controller.go +++ b/pkg/service/endpoints_controller.go @@ -315,14 +315,7 @@ func (e *EndpointController) syncService(key string) { continue } - inService := false - for _, c := range pod.Status.Conditions { - if c.Type == api.PodReady && c.Status == api.ConditionTrue { - inService = true - break - } - } - if !inService { + if !api.IsPodReady(pod) { glog.V(5).Infof("Pod is out of service: %v/%v", pod.Namespace, pod.Name) continue }