diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index b85fb67a06..66dc993acf 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -772,18 +772,21 @@ func (dc *DeploymentController) rsAndPodsWithHashKeySynced(deployment *extension } syncedRSList = append(syncedRSList, *syncedRS) } - syncedPodList, err := deploymentutil.ListPods(deployment, - func(namespace string, options api.ListOptions) (*api.PodList, error) { - podList, err := dc.podStore.Pods(namespace).List(options.LabelSelector) - return &podList, err - }) - + syncedPodList, err := dc.listPods(deployment) if err != nil { return nil, nil, err } return syncedRSList, syncedPodList, nil } +func (dc *DeploymentController) listPods(deployment *extensions.Deployment) (*api.PodList, error) { + return deploymentutil.ListPods(deployment, + func(namespace string, options api.ListOptions) (*api.PodList, error) { + podList, err := dc.podStore.Pods(namespace).List(options.LabelSelector) + return &podList, err + }) +} + // addHashKeyToRSAndPods adds pod-template-hash information to the given rs, if it's not already there, with the following steps: // 1. Add hash label to the rs's pod template, and make sure the controller sees this update so that no orphaned pods will be created // 2. Add hash label to all pods this rs owns, wait until replicaset controller reports rs.Status.FullyLabeledReplicas equal to the desired number of replicas @@ -977,6 +980,14 @@ func (dc *DeploymentController) reconcileNewReplicaSet(allRSs []*extensions.Repl return scaled, err } +func (dc *DeploymentController) getAvailablePodsForReplicaSets(deployment *extensions.Deployment, rss []*extensions.ReplicaSet, minReadySeconds int32) (int32, error) { + podList, err := dc.listPods(deployment) + if err != nil { + return 0, err + } + return deploymentutil.CountAvailablePodsForReplicaSets(podList, rss, minReadySeconds) +} + func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (bool, error) { oldPodsCount := deploymentutil.GetReplicaCountForReplicaSets(oldRSs) if oldPodsCount == 0 { @@ -986,7 +997,8 @@ func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*extensions.Rep minReadySeconds := deployment.Spec.MinReadySeconds allPodsCount := deploymentutil.GetReplicaCountForReplicaSets(allRSs) - newRSAvailablePodCount, err := deploymentutil.GetAvailablePodsForReplicaSets(dc.client, []*extensions.ReplicaSet{newRS}, minReadySeconds) + // TODO: use dc.getAvailablePodsForReplicaSets instead + newRSAvailablePodCount, err := deploymentutil.GetAvailablePodsForReplicaSets(dc.client, deployment, []*extensions.ReplicaSet{newRS}, minReadySeconds) if err != nil { return false, fmt.Errorf("could not find available pods: %v", err) } @@ -1068,7 +1080,8 @@ func (dc *DeploymentController) cleanupUnhealthyReplicas(oldRSs []*extensions.Re // cannot scale down this replica set. continue } - readyPodCount, err := deploymentutil.GetAvailablePodsForReplicaSets(dc.client, []*extensions.ReplicaSet{targetRS}, 0) + // TODO: use dc.getAvailablePodsForReplicaSets instead + readyPodCount, err := deploymentutil.GetAvailablePodsForReplicaSets(dc.client, deployment, []*extensions.ReplicaSet{targetRS}, 0) if err != nil { return nil, totalScaledDown, fmt.Errorf("could not find available pods: %v", err) } @@ -1104,7 +1117,8 @@ func (dc *DeploymentController) scaleDownOldReplicaSetsForRollingUpdate(allRSs [ minAvailable := deployment.Spec.Replicas - maxUnavailable minReadySeconds := deployment.Spec.MinReadySeconds // Find the number of ready pods. - availablePodCount, err := deploymentutil.GetAvailablePodsForReplicaSets(dc.client, allRSs, minReadySeconds) + // TODO: use dc.getAvailablePodsForReplicaSets instead + availablePodCount, err := deploymentutil.GetAvailablePodsForReplicaSets(dc.client, deployment, allRSs, minReadySeconds) if err != nil { return 0, fmt.Errorf("could not find available pods: %v", err) } @@ -1220,7 +1234,7 @@ func (dc *DeploymentController) calculateStatus(allRSs []*extensions.ReplicaSet, totalActualReplicas = deploymentutil.GetActualReplicaCountForReplicaSets(allRSs) updatedReplicas = deploymentutil.GetActualReplicaCountForReplicaSets([]*extensions.ReplicaSet{newRS}) minReadySeconds := deployment.Spec.MinReadySeconds - availableReplicas, err = deploymentutil.GetAvailablePodsForReplicaSets(dc.client, allRSs, minReadySeconds) + availableReplicas, err = dc.getAvailablePodsForReplicaSets(deployment, allRSs, minReadySeconds) if err != nil { err = fmt.Errorf("failed to count available pods: %v", err) return diff --git a/pkg/controller/deployment/deployment_controller_test.go b/pkg/controller/deployment/deployment_controller_test.go index 0dc92fb934..f9455979cd 100644 --- a/pkg/controller/deployment/deployment_controller_test.go +++ b/pkg/controller/deployment/deployment_controller_test.go @@ -54,13 +54,14 @@ func newRSWithStatus(name string, specReplicas, statusReplicas int, selector map return rs } -func deployment(name string, replicas int, maxSurge, maxUnavailable intstr.IntOrString) exp.Deployment { +func deployment(name string, replicas int, maxSurge, maxUnavailable intstr.IntOrString, selector map[string]string) exp.Deployment { return exp.Deployment{ ObjectMeta: api.ObjectMeta{ Name: name, }, Spec: exp.DeploymentSpec{ Replicas: int32(replicas), + Selector: &unversioned.LabelSelector{MatchLabels: selector}, Strategy: exp.DeploymentStrategy{ Type: exp.RollingUpdateDeploymentStrategyType, RollingUpdate: &exp.RollingUpdateDeployment{ @@ -190,7 +191,7 @@ func TestDeploymentController_reconcileNewReplicaSet(t *testing.T) { newRS := rs("foo-v2", test.newReplicas, nil) oldRS := rs("foo-v2", test.oldReplicas, nil) allRSs := []*exp.ReplicaSet{newRS, oldRS} - deployment := deployment("foo", test.deploymentReplicas, test.maxSurge, intstr.FromInt(0)) + deployment := deployment("foo", test.deploymentReplicas, test.maxSurge, intstr.FromInt(0), nil) fake := fake.Clientset{} controller := &DeploymentController{ client: &fake, @@ -293,7 +294,7 @@ func TestDeploymentController_reconcileOldReplicaSets(t *testing.T) { oldRSs := []*exp.ReplicaSet{oldRS} allRSs := []*exp.ReplicaSet{oldRS, newRS} - deployment := deployment("foo", test.deploymentReplicas, intstr.FromInt(0), test.maxUnavailable) + deployment := deployment("foo", test.deploymentReplicas, intstr.FromInt(0), test.maxUnavailable, newSelector) fakeClientset := fake.Clientset{} fakeClientset.AddReactor("list", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) { switch action.(type) { @@ -430,7 +431,7 @@ func TestDeploymentController_cleanupUnhealthyReplicas(t *testing.T) { t.Logf("executing scenario %d", i) oldRS := rs("foo-v2", test.oldReplicas, nil) oldRSs := []*exp.ReplicaSet{oldRS} - deployment := deployment("foo", 10, intstr.FromInt(2), intstr.FromInt(2)) + deployment := deployment("foo", 10, intstr.FromInt(2), intstr.FromInt(2), nil) fakeClientset := fake.Clientset{} fakeClientset.AddReactor("list", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) { switch action.(type) { @@ -540,7 +541,7 @@ func TestDeploymentController_scaleDownOldReplicaSetsForRollingUpdate(t *testing oldRS := rs("foo-v2", test.oldReplicas, nil) allRSs := []*exp.ReplicaSet{oldRS} oldRSs := []*exp.ReplicaSet{oldRS} - deployment := deployment("foo", test.deploymentReplicas, intstr.FromInt(0), test.maxUnavailable) + deployment := deployment("foo", test.deploymentReplicas, intstr.FromInt(0), test.maxUnavailable, map[string]string{"foo": "bar"}) fakeClientset := fake.Clientset{} fakeClientset.AddReactor("list", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) { switch action.(type) { @@ -783,12 +784,10 @@ func TestSyncDeploymentCreatesReplicaSet(t *testing.T) { // then is updated to 1 replica rs := newReplicaSet(d, "deploymentrs-4186632231", 0) updatedRS := newReplicaSet(d, "deploymentrs-4186632231", 1) - opt := newListOptions() f.expectCreateRSAction(rs) f.expectUpdateDeploymentAction(d) f.expectUpdateRSAction(updatedRS) - f.expectListPodAction(rs.Namespace, opt) f.expectUpdateDeploymentAction(d) f.run(getKey(d, t)) diff --git a/pkg/util/deployment/deployment.go b/pkg/util/deployment/deployment.go index 192648bf8c..0442da4290 100644 --- a/pkg/util/deployment/deployment.go +++ b/pkg/util/deployment/deployment.go @@ -34,6 +34,7 @@ import ( intstrutil "k8s.io/kubernetes/pkg/util/intstr" labelsutil "k8s.io/kubernetes/pkg/util/labels" podutil "k8s.io/kubernetes/pkg/util/pod" + rsutil "k8s.io/kubernetes/pkg/util/replicaset" "k8s.io/kubernetes/pkg/util/wait" ) @@ -314,23 +315,42 @@ func GetActualReplicaCountForReplicaSets(replicaSets []*extensions.ReplicaSet) i return totalReplicaCount } -// Returns the number of available pods corresponding to the given replica sets. -func GetAvailablePodsForReplicaSets(c clientset.Interface, rss []*extensions.ReplicaSet, minReadySeconds int32) (int32, error) { - allPods, err := GetPodsForReplicaSets(c, rss) +// GetAvailablePodsForReplicaSets returns the number of available pods (listed from clientset) corresponding to the given replica sets. +func GetAvailablePodsForReplicaSets(c clientset.Interface, deployment *extensions.Deployment, rss []*extensions.ReplicaSet, minReadySeconds int32) (int32, error) { + podList, err := listPods(deployment, c) if err != nil { return 0, err } - return getReadyPodsCount(allPods, minReadySeconds), nil + return CountAvailablePodsForReplicaSets(podList, rss, minReadySeconds) } -func getReadyPodsCount(pods []api.Pod, minReadySeconds int32) int32 { - readyPodCount := int32(0) +// CountAvailablePodsForReplicaSets returns the number of available pods corresponding to the given pod list and replica sets. +// Note that the input pod list should be the pods targeted by the deployment of input replica sets. +func CountAvailablePodsForReplicaSets(podList *api.PodList, rss []*extensions.ReplicaSet, minReadySeconds int32) (int32, error) { + rsPods, err := filterPodsMatchingReplicaSets(rss, podList) + if err != nil { + return 0, err + } + return countAvailablePods(rsPods, minReadySeconds), nil +} + +// GetAvailablePodsForDeployment returns the number of available pods (listed from clientset) corresponding to the given deployment. +func GetAvailablePodsForDeployment(c clientset.Interface, deployment *extensions.Deployment, minReadySeconds int32) (int32, error) { + podList, err := listPods(deployment, c) + if err != nil { + return 0, err + } + return countAvailablePods(podList.Items, minReadySeconds), nil +} + +func countAvailablePods(pods []api.Pod, minReadySeconds int32) int32 { + availablePodCount := int32(0) for _, pod := range pods { if IsPodAvailable(&pod, minReadySeconds) { - readyPodCount++ + availablePodCount++ } } - return readyPodCount + return availablePodCount } func IsPodAvailable(pod *api.Pod, minReadySeconds int32) bool { @@ -354,29 +374,20 @@ func IsPodAvailable(pod *api.Pod, minReadySeconds int32) bool { return false } -func GetPodsForReplicaSets(c clientset.Interface, replicaSets []*extensions.ReplicaSet) ([]api.Pod, error) { - allPods := map[string]api.Pod{} +// filterPodsMatchingReplicaSets filters the given pod list and only return the ones targeted by the input replicasets +func filterPodsMatchingReplicaSets(replicaSets []*extensions.ReplicaSet, podList *api.PodList) ([]api.Pod, error) { + rsPods := []api.Pod{} for _, rs := range replicaSets { - if rs != nil { - selector, err := unversioned.LabelSelectorAsSelector(rs.Spec.Selector) - if err != nil { - return nil, fmt.Errorf("invalid label selector: %v", err) - } - options := api.ListOptions{LabelSelector: selector} - podList, err := c.Core().Pods(rs.ObjectMeta.Namespace).List(options) - if err != nil { - return nil, fmt.Errorf("error listing pods: %v", err) - } - for _, pod := range podList.Items { - allPods[pod.Name] = pod - } + matchingFunc, err := rsutil.MatchingPodsFunc(rs) + if err != nil { + return nil, err } + if matchingFunc == nil { + continue + } + rsPods = append(rsPods, podutil.Filter(podList, matchingFunc)...) } - requiredPods := []api.Pod{} - for _, pod := range allPods { - requiredPods = append(requiredPods, pod) - } - return requiredPods, nil + return rsPods, nil } // Revision returns the revision number of the input replica set diff --git a/pkg/util/deployment/deployment_test.go b/pkg/util/deployment/deployment_test.go index 61d3a16311..ef26617d7c 100644 --- a/pkg/util/deployment/deployment_test.go +++ b/pkg/util/deployment/deployment_test.go @@ -96,7 +96,7 @@ func newPod(now time.Time, ready bool, beforeSec int) api.Pod { } } -func TestGetReadyPodsCount(t *testing.T) { +func TestCountAvailablePods(t *testing.T) { now := time.Now() tests := []struct { pods []api.Pod @@ -124,7 +124,7 @@ func TestGetReadyPodsCount(t *testing.T) { } for _, test := range tests { - if count := getReadyPodsCount(test.pods, int32(test.minReadySeconds)); int(count) != test.expected { + if count := countAvailablePods(test.pods, int32(test.minReadySeconds)); int(count) != test.expected { t.Errorf("Pods = %#v, minReadySeconds = %d, expected %d, got %d", test.pods, test.minReadySeconds, test.expected, count) } } diff --git a/pkg/util/pod/pod.go b/pkg/util/pod/pod.go index b59e15dbc5..9c57aaebbc 100644 --- a/pkg/util/pod/pod.go +++ b/pkg/util/pod/pod.go @@ -87,3 +87,14 @@ func UpdatePodWithRetries(podClient unversionedcore.PodInterface, pod *api.Pod, // if the error is nil and podUpdated is true, the returned pod contains the applied update. return pod, podUpdated, err } + +// Filter uses the input function f to filter the given pod list, and return the filtered pods +func Filter(podList *api.PodList, f func(api.Pod) bool) []api.Pod { + pods := make([]api.Pod, 0) + for _, p := range podList.Items { + if f(p) { + pods = append(pods, p) + } + } + return pods +} diff --git a/pkg/util/replicaset/replicaset.go b/pkg/util/replicaset/replicaset.go index 1705d9318b..e5dd26517b 100644 --- a/pkg/util/replicaset/replicaset.go +++ b/pkg/util/replicaset/replicaset.go @@ -23,8 +23,10 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/extensions" unversionedextensions "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/extensions/unversioned" + "k8s.io/kubernetes/pkg/labels" errorsutil "k8s.io/kubernetes/pkg/util/errors" labelsutil "k8s.io/kubernetes/pkg/util/labels" podutil "k8s.io/kubernetes/pkg/util/pod" @@ -91,3 +93,18 @@ func GetPodTemplateSpecHash(rs extensions.ReplicaSet) string { Spec: rs.Spec.Template.Spec, })) } + +// MatchingPodsFunc returns a filter function for pods with matching labels +func MatchingPodsFunc(rs *extensions.ReplicaSet) (func(api.Pod) bool, error) { + if rs == nil { + return nil, nil + } + selector, err := unversioned.LabelSelectorAsSelector(rs.Spec.Selector) + if err != nil { + return nil, fmt.Errorf("invalid label selector: %v", err) + } + return func(pod api.Pod) bool { + podLabelsSelector := labels.Set(pod.ObjectMeta.Labels) + return selector.Matches(podLabelsSelector) + }, nil +} diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index 4d6567fe3c..e65e5e12d6 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -2886,18 +2886,18 @@ func WaitForDeploymentStatus(c clientset.Interface, ns, deploymentName string, d } } totalCreated := deploymentutil.GetReplicaCountForReplicaSets(allRSs) - totalAvailable, err := deploymentutil.GetAvailablePodsForReplicaSets(c, allRSs, minReadySeconds) + totalAvailable, err := deploymentutil.GetAvailablePodsForDeployment(c, deployment, minReadySeconds) if err != nil { return false, err } if totalCreated > maxCreated { logReplicaSetsOfDeployment(deployment, allOldRSs, newRS) - logPodsOfReplicaSets(c, allRSs, minReadySeconds) + logPodsOfDeployment(c, deployment, minReadySeconds) return false, fmt.Errorf("total pods created: %d, more than the max allowed: %d", totalCreated, maxCreated) } if totalAvailable < minAvailable { logReplicaSetsOfDeployment(deployment, allOldRSs, newRS) - logPodsOfReplicaSets(c, allRSs, minReadySeconds) + logPodsOfDeployment(c, deployment, minReadySeconds) return false, fmt.Errorf("total pods available: %d, less than the min required: %d", totalAvailable, minAvailable) } @@ -2913,7 +2913,7 @@ func WaitForDeploymentStatus(c clientset.Interface, ns, deploymentName string, d if err == wait.ErrWaitTimeout { logReplicaSetsOfDeployment(deployment, allOldRSs, newRS) - logPodsOfReplicaSets(c, allRSs, minReadySeconds) + logPodsOfDeployment(c, deployment, minReadySeconds) } if err != nil { return fmt.Errorf("error waiting for deployment %s status to match expectation: %v", deploymentName, err) @@ -3062,10 +3062,17 @@ func WaitForObservedDeployment(c *clientset.Clientset, ns, deploymentName string return deploymentutil.WaitForObservedDeployment(func() (*extensions.Deployment, error) { return c.Extensions().Deployments(ns).Get(deploymentName) }, desiredGeneration, Poll, 1*time.Minute) } -func logPodsOfReplicaSets(c clientset.Interface, rss []*extensions.ReplicaSet, minReadySeconds int32) { - allPods, err := deploymentutil.GetPodsForReplicaSets(c, rss) +func logPodsOfDeployment(c clientset.Interface, deployment *extensions.Deployment, minReadySeconds int32) { + podList, err := deploymentutil.ListPods(deployment, + func(namespace string, options api.ListOptions) (*api.PodList, error) { + return c.Core().Pods(namespace).List(options) + }) + if err != nil { + Logf("Failed to list pods of deployment %s: %v", deployment.Name, err) + return + } if err == nil { - for _, pod := range allPods { + for _, pod := range podList.Items { availability := "not available" if deploymentutil.IsPodAvailable(&pod, minReadySeconds) { availability = "available"