From 985ba931b1dedeec1462c0b248687b496715407a Mon Sep 17 00:00:00 2001 From: Krzysztof Jastrzebski Date: Tue, 4 Sep 2018 20:16:48 +0200 Subject: [PATCH] Use informer cache instead of active pod gets in HPA controller. --- .../app/autoscaling.go | 13 +- pkg/controller/podautoscaler/BUILD | 3 +- pkg/controller/podautoscaler/horizontal.go | 29 +++- .../podautoscaler/horizontal_test.go | 124 ++++++++++++------ .../podautoscaler/legacy_horizontal_test.go | 8 +- .../legacy_replica_calculator_test.go | 14 +- .../podautoscaler/replica_calculator.go | 36 ++--- .../podautoscaler/replica_calculator_test.go | 38 ++++-- 8 files changed, 178 insertions(+), 87 deletions(-) diff --git a/cmd/kube-controller-manager/app/autoscaling.go b/cmd/kube-controller-manager/app/autoscaling.go index 76f80d9ae1..e8b7decfdd 100644 --- a/cmd/kube-controller-manager/app/autoscaling.go +++ b/cmd/kube-controller-manager/app/autoscaling.go @@ -80,22 +80,19 @@ func startHPAControllerWithMetricsClient(ctx ControllerContext, metricsClient me return nil, false, err } - replicaCalc := podautoscaler.NewReplicaCalculator( - metricsClient, - hpaClient.CoreV1(), - ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerTolerance, - ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerCPUInitializationPeriod.Duration, - ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerInitialReadinessDelay.Duration, - ) go podautoscaler.NewHorizontalController( hpaClient.CoreV1(), scaleClient, hpaClient.AutoscalingV1(), ctx.RESTMapper, - replicaCalc, + metricsClient, ctx.InformerFactory.Autoscaling().V1().HorizontalPodAutoscalers(), + ctx.InformerFactory.Core().V1().Pods(), ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod.Duration, ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerDownscaleStabilizationWindow.Duration, + ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerTolerance, + ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerCPUInitializationPeriod.Duration, + ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerInitialReadinessDelay.Duration, ).Run(ctx.Stop) return nil, true, nil } diff --git a/pkg/controller/podautoscaler/BUILD b/pkg/controller/podautoscaler/BUILD index 0e8237cd88..801a5a5226 100644 --- a/pkg/controller/podautoscaler/BUILD +++ b/pkg/controller/podautoscaler/BUILD @@ -30,10 +30,12 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/client-go/informers/autoscaling/v1:go_default_library", + "//staging/src/k8s.io/client-go/informers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/autoscaling/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//staging/src/k8s.io/client-go/listers/autoscaling/v1:go_default_library", + "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/scale:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", @@ -83,7 +85,6 @@ go_test( "//staging/src/k8s.io/metrics/pkg/client/clientset/versioned/fake:go_default_library", "//staging/src/k8s.io/metrics/pkg/client/custom_metrics/fake:go_default_library", "//staging/src/k8s.io/metrics/pkg/client/external_metrics/fake:go_default_library", - "//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/github.com/stretchr/testify/require:go_default_library", "//vendor/k8s.io/heapster/metrics/api/v1/types:go_default_library", diff --git a/pkg/controller/podautoscaler/horizontal.go b/pkg/controller/podautoscaler/horizontal.go index 289a465f96..b899f26786 100644 --- a/pkg/controller/podautoscaler/horizontal.go +++ b/pkg/controller/podautoscaler/horizontal.go @@ -36,16 +36,19 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" autoscalinginformers "k8s.io/client-go/informers/autoscaling/v1" + coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes/scheme" autoscalingclient "k8s.io/client-go/kubernetes/typed/autoscaling/v1" v1core "k8s.io/client-go/kubernetes/typed/core/v1" autoscalinglisters "k8s.io/client-go/listers/autoscaling/v1" + corelisters "k8s.io/client-go/listers/core/v1" scaleclient "k8s.io/client-go/scale" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/pkg/controller" + metricsclient "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics" ) var ( @@ -76,6 +79,11 @@ type HorizontalController struct { hpaLister autoscalinglisters.HorizontalPodAutoscalerLister hpaListerSynced cache.InformerSynced + // podLister is able to list/get Pods from the shared cache from the informer passed in to + // NewHorizontalController. + podLister corelisters.PodLister + podListerSynced cache.InformerSynced + // Controllers that need to be synced queue workqueue.RateLimitingInterface @@ -89,10 +97,14 @@ func NewHorizontalController( scaleNamespacer scaleclient.ScalesGetter, hpaNamespacer autoscalingclient.HorizontalPodAutoscalersGetter, mapper apimeta.RESTMapper, - replicaCalc *ReplicaCalculator, + metricsClient metricsclient.MetricsClient, hpaInformer autoscalinginformers.HorizontalPodAutoscalerInformer, + podInformer coreinformers.PodInformer, resyncPeriod time.Duration, downscaleStabilisationWindow time.Duration, + tolerance float64, + cpuInitializationPeriod, + delayOfInitialReadinessStatus time.Duration, ) *HorizontalController { broadcaster := record.NewBroadcaster() @@ -101,7 +113,6 @@ func NewHorizontalController( recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "horizontal-pod-autoscaler"}) hpaController := &HorizontalController{ - replicaCalc: replicaCalc, eventRecorder: recorder, scaleNamespacer: scaleNamespacer, hpaNamespacer: hpaNamespacer, @@ -122,6 +133,18 @@ func NewHorizontalController( hpaController.hpaLister = hpaInformer.Lister() hpaController.hpaListerSynced = hpaInformer.Informer().HasSynced + hpaController.podLister = podInformer.Lister() + hpaController.podListerSynced = podInformer.Informer().HasSynced + + replicaCalc := NewReplicaCalculator( + metricsClient, + hpaController.podLister, + tolerance, + cpuInitializationPeriod, + delayOfInitialReadinessStatus, + ) + hpaController.replicaCalc = replicaCalc + return hpaController } @@ -133,7 +156,7 @@ func (a *HorizontalController) Run(stopCh <-chan struct{}) { glog.Infof("Starting HPA controller") defer glog.Infof("Shutting down HPA controller") - if !controller.WaitForCacheSync("HPA", stopCh, a.hpaListerSynced) { + if !controller.WaitForCacheSync("HPA", stopCh, a.hpaListerSynced, a.podListerSynced) { return } diff --git a/pkg/controller/podautoscaler/horizontal_test.go b/pkg/controller/podautoscaler/horizontal_test.go index 8d064501c5..939fe8b7b5 100644 --- a/pkg/controller/podautoscaler/horizontal_test.go +++ b/pkg/controller/podautoscaler/horizontal_test.go @@ -51,7 +51,6 @@ import ( "github.com/stretchr/testify/assert" - "github.com/golang/glog" _ "k8s.io/kubernetes/pkg/apis/autoscaling/install" _ "k8s.io/kubernetes/pkg/apis/extensions/install" ) @@ -662,8 +661,6 @@ func (tc *testCase) setupController(t *testing.T) (*HorizontalController, inform return true, obj, nil }) - replicaCalc := NewReplicaCalculator(metricsClient, testClient.Core(), defaultTestingTolerance, defaultTestingCpuInitializationPeriod, defaultTestingDelayOfInitialReadinessStatus) - informerFactory := informers.NewSharedInformerFactory(testClient, controller.NoResyncPeriodFunc()) defaultDownscalestabilizationWindow := 5 * time.Minute @@ -672,10 +669,14 @@ func (tc *testCase) setupController(t *testing.T) (*HorizontalController, inform testScaleClient, testClient.Autoscaling(), testrestmapper.TestOnlyStaticRESTMapper(legacyscheme.Scheme), - replicaCalc, + metricsClient, informerFactory.Autoscaling().V1().HorizontalPodAutoscalers(), + informerFactory.Core().V1().Pods(), controller.NoResyncPeriodFunc(), defaultDownscalestabilizationWindow, + defaultTestingTolerance, + defaultTestingCpuInitializationPeriod, + defaultTestingDelayOfInitialReadinessStatus, ) hpaController.hpaListerSynced = alwaysReady if tc.recommendations != nil { @@ -715,7 +716,6 @@ func (tc *testCase) runTestWithController(t *testing.T, hpaController *Horizonta func (tc *testCase) runTest(t *testing.T) { hpaController, informerFactory := tc.setupController(t) tc.runTestWithController(t, hpaController, informerFactory) - glog.Errorf("recommendations: %+v", hpaController.recommendations) } func TestScaleUp(t *testing.T) { @@ -2227,11 +2227,12 @@ func TestScaleDownRCImmediately(t *testing.T) { } func TestAvoidUncessaryUpdates(t *testing.T) { + now := metav1.Time{Time: time.Now().Add(-time.Hour)} tc := testCase{ minReplicas: 2, maxReplicas: 6, - initialReplicas: 3, - expectedDesiredReplicas: 3, + initialReplicas: 2, + expectedDesiredReplicas: 2, CPUTarget: 30, CPUCurrent: 40, verifyCPUCurrent: true, @@ -2239,41 +2240,95 @@ func TestAvoidUncessaryUpdates(t *testing.T) { reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")}, reportedPodStartTime: []metav1.Time{coolCpuCreationTime(), hotCpuCreationTime(), hotCpuCreationTime()}, useMetricsAPI: true, + lastScaleTime: &now, } testClient, _, _, _, _ := tc.prepareTestClient(t) tc.testClient = testClient - var savedHPA *autoscalingv1.HorizontalPodAutoscaler testClient.PrependReactor("list", "horizontalpodautoscalers", func(action core.Action) (handled bool, ret runtime.Object, err error) { tc.Lock() defer tc.Unlock() + // fake out the verification logic and mark that we're done processing + go func() { + // wait a tick and then mark that we're finished (otherwise, we have no + // way to indicate that we're finished, because the function decides not to do anything) + time.Sleep(1 * time.Second) + tc.statusUpdated = true + tc.processed <- "test-hpa" + }() - if savedHPA != nil { - // fake out the verification logic and mark that we're done processing - go func() { - // wait a tick and then mark that we're finished (otherwise, we have no - // way to indicate that we're finished, because the function decides not to do anything) - time.Sleep(1 * time.Second) - tc.statusUpdated = true - tc.processed <- "test-hpa" - }() - return true, &autoscalingv1.HorizontalPodAutoscalerList{ - Items: []autoscalingv1.HorizontalPodAutoscaler{*savedHPA}, - }, nil + quantity := resource.MustParse("400m") + obj := &autoscalingv2.HorizontalPodAutoscalerList{ + Items: []autoscalingv2.HorizontalPodAutoscaler{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-hpa", + Namespace: "test-namespace", + SelfLink: "experimental/v1/namespaces/test-namespace/horizontalpodautoscalers/test-hpa", + }, + Spec: autoscalingv2.HorizontalPodAutoscalerSpec{ + ScaleTargetRef: autoscalingv2.CrossVersionObjectReference{ + Kind: "ReplicationController", + Name: "test-rc", + APIVersion: "v1", + }, + + MinReplicas: &tc.minReplicas, + MaxReplicas: tc.maxReplicas, + }, + Status: autoscalingv2.HorizontalPodAutoscalerStatus{ + CurrentReplicas: tc.initialReplicas, + DesiredReplicas: tc.initialReplicas, + LastScaleTime: tc.lastScaleTime, + CurrentMetrics: []autoscalingv2.MetricStatus{ + { + Type: autoscalingv2.ResourceMetricSourceType, + Resource: &autoscalingv2.ResourceMetricStatus{ + Name: v1.ResourceCPU, + Current: autoscalingv2.MetricValueStatus{ + AverageValue: &quantity, + AverageUtilization: &tc.CPUCurrent, + }, + }, + }, + }, + Conditions: []autoscalingv2.HorizontalPodAutoscalerCondition{ + { + Type: autoscalingv2.AbleToScale, + Status: v1.ConditionTrue, + LastTransitionTime: *tc.lastScaleTime, + Reason: "ReadyForNewScale", + Message: "recommended size matches current size", + }, + { + Type: autoscalingv2.ScalingActive, + Status: v1.ConditionTrue, + LastTransitionTime: *tc.lastScaleTime, + Reason: "ValidMetricFound", + Message: "the HPA was able to successfully calculate a replica count from cpu resource utilization (percentage of request)", + }, + { + Type: autoscalingv2.ScalingLimited, + Status: v1.ConditionTrue, + LastTransitionTime: *tc.lastScaleTime, + Reason: "TooFewReplicas", + Message: "the desired replica count is more than the maximum replica count", + }, + }, + }, + }, + }, + } + // and... convert to autoscaling v1 to return the right type + objv1, err := unsafeConvertToVersionVia(obj, autoscalingv1.SchemeGroupVersion) + if err != nil { + return true, nil, err } - // fallthrough - return false, nil, nil + return true, objv1, nil }) testClient.PrependReactor("update", "horizontalpodautoscalers", func(action core.Action) (handled bool, ret runtime.Object, err error) { tc.Lock() defer tc.Unlock() - - if savedHPA == nil { - // save the HPA and return it - savedHPA = action.(core.UpdateAction).GetObject().(*autoscalingv1.HorizontalPodAutoscaler) - return true, savedHPA, nil - } - assert.Fail(t, "should not have attempted to update the HPA when nothing changed") // mark that we've processed this HPA tc.processed <- "" @@ -2281,17 +2336,6 @@ func TestAvoidUncessaryUpdates(t *testing.T) { }) controller, informerFactory := tc.setupController(t) - - // fake an initial processing loop to populate savedHPA - initialHPAs, err := testClient.Autoscaling().HorizontalPodAutoscalers("test-namespace").List(metav1.ListOptions{}) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - if err := controller.reconcileAutoscaler(&initialHPAs.Items[0], ""); err != nil { - t.Fatalf("unexpected error: %v", err) - } - - // actually run the test tc.runTestWithController(t, controller, informerFactory) } diff --git a/pkg/controller/podautoscaler/legacy_horizontal_test.go b/pkg/controller/podautoscaler/legacy_horizontal_test.go index bc681b543a..f29e19012c 100644 --- a/pkg/controller/podautoscaler/legacy_horizontal_test.go +++ b/pkg/controller/podautoscaler/legacy_horizontal_test.go @@ -485,8 +485,6 @@ func (tc *legacyTestCase) runTest(t *testing.T) { return true, obj, nil }) - replicaCalc := NewReplicaCalculator(metricsClient, testClient.Core(), defaultTestingTolerance, defaultTestingCpuInitializationPeriod, defaultTestingDelayOfInitialReadinessStatus) - informerFactory := informers.NewSharedInformerFactory(testClient, controller.NoResyncPeriodFunc()) defaultDownscaleStabilisationWindow := 5 * time.Minute @@ -495,10 +493,14 @@ func (tc *legacyTestCase) runTest(t *testing.T) { testScaleClient, testClient.Autoscaling(), testrestmapper.TestOnlyStaticRESTMapper(legacyscheme.Scheme), - replicaCalc, + metricsClient, informerFactory.Autoscaling().V1().HorizontalPodAutoscalers(), + informerFactory.Core().V1().Pods(), controller.NoResyncPeriodFunc(), defaultDownscaleStabilisationWindow, + defaultTestingTolerance, + defaultTestingCpuInitializationPeriod, + defaultTestingDelayOfInitialReadinessStatus, ) hpaController.hpaListerSynced = alwaysReady diff --git a/pkg/controller/podautoscaler/legacy_replica_calculator_test.go b/pkg/controller/podautoscaler/legacy_replica_calculator_test.go index 486ee5049e..083d2bdd25 100644 --- a/pkg/controller/podautoscaler/legacy_replica_calculator_test.go +++ b/pkg/controller/podautoscaler/legacy_replica_calculator_test.go @@ -29,9 +29,11 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" restclient "k8s.io/client-go/rest" core "k8s.io/client-go/testing" + "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics" heapster "k8s.io/heapster/metrics/api/v1/types" @@ -186,7 +188,17 @@ func (tc *legacyReplicaCalcTestCase) runTest(t *testing.T) { testClient := tc.prepareTestClient(t) metricsClient := metrics.NewHeapsterMetricsClient(testClient, metrics.DefaultHeapsterNamespace, metrics.DefaultHeapsterScheme, metrics.DefaultHeapsterService, metrics.DefaultHeapsterPort) - replicaCalc := NewReplicaCalculator(metricsClient, testClient.Core(), defaultTestingTolerance, defaultTestingCpuInitializationPeriod, defaultTestingDelayOfInitialReadinessStatus) + informerFactory := informers.NewSharedInformerFactory(testClient, controller.NoResyncPeriodFunc()) + informer := informerFactory.Core().V1().Pods() + + replicaCalc := NewReplicaCalculator(metricsClient, informer.Lister(), defaultTestingTolerance, defaultTestingCpuInitializationPeriod, defaultTestingDelayOfInitialReadinessStatus) + + stop := make(chan struct{}) + defer close(stop) + informerFactory.Start(stop) + if !controller.WaitForCacheSync("HPA", stop, informer.Informer().HasSynced) { + return + } selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{ MatchLabels: map[string]string{"name": podNamePrefix}, diff --git a/pkg/controller/podautoscaler/replica_calculator.go b/pkg/controller/podautoscaler/replica_calculator.go index 6823a838a2..a1eb82794f 100644 --- a/pkg/controller/podautoscaler/replica_calculator.go +++ b/pkg/controller/podautoscaler/replica_calculator.go @@ -26,7 +26,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/sets" - v1coreclient "k8s.io/client-go/kubernetes/typed/core/v1" + corelisters "k8s.io/client-go/listers/core/v1" podutil "k8s.io/kubernetes/pkg/api/v1/pod" metricsclient "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics" ) @@ -41,16 +41,16 @@ const ( type ReplicaCalculator struct { metricsClient metricsclient.MetricsClient - podsGetter v1coreclient.PodsGetter + podLister corelisters.PodLister tolerance float64 cpuInitializationPeriod time.Duration delayOfInitialReadinessStatus time.Duration } -func NewReplicaCalculator(metricsClient metricsclient.MetricsClient, podsGetter v1coreclient.PodsGetter, tolerance float64, cpuInitializationPeriod, delayOfInitialReadinessStatus time.Duration) *ReplicaCalculator { +func NewReplicaCalculator(metricsClient metricsclient.MetricsClient, podLister corelisters.PodLister, tolerance float64, cpuInitializationPeriod, delayOfInitialReadinessStatus time.Duration) *ReplicaCalculator { return &ReplicaCalculator{ metricsClient: metricsClient, - podsGetter: podsGetter, + podLister: podLister, tolerance: tolerance, cpuInitializationPeriod: cpuInitializationPeriod, delayOfInitialReadinessStatus: delayOfInitialReadinessStatus, @@ -64,20 +64,19 @@ func (c *ReplicaCalculator) GetResourceReplicas(currentReplicas int32, targetUti if err != nil { return 0, 0, 0, time.Time{}, fmt.Errorf("unable to get metrics for resource %s: %v", resource, err) } - - podList, err := c.podsGetter.Pods(namespace).List(metav1.ListOptions{LabelSelector: selector.String()}) + podList, err := c.podLister.Pods(namespace).List(selector) if err != nil { return 0, 0, 0, time.Time{}, fmt.Errorf("unable to get pods while calculating replica count: %v", err) } - itemsLen := len(podList.Items) + itemsLen := len(podList) if itemsLen == 0 { return 0, 0, 0, time.Time{}, fmt.Errorf("no pods returned by selector while calculating replica count") } - readyPodCount, ignoredPods, missingPods := groupPods(podList.Items, metrics, resource, c.cpuInitializationPeriod, c.delayOfInitialReadinessStatus) + readyPodCount, ignoredPods, missingPods := groupPods(podList, metrics, resource, c.cpuInitializationPeriod, c.delayOfInitialReadinessStatus) removeMetricsForPods(metrics, ignoredPods) - requests, err := calculatePodRequests(podList.Items, resource) + requests, err := calculatePodRequests(podList, resource) if err != nil { return 0, 0, 0, time.Time{}, err } @@ -167,16 +166,17 @@ func (c *ReplicaCalculator) GetMetricReplicas(currentReplicas int32, targetUtili // calcPlainMetricReplicas calculates the desired replicas for plain (i.e. non-utilization percentage) metrics. func (c *ReplicaCalculator) calcPlainMetricReplicas(metrics metricsclient.PodMetricsInfo, currentReplicas int32, targetUtilization int64, namespace string, selector labels.Selector, resource v1.ResourceName) (replicaCount int32, utilization int64, err error) { - podList, err := c.podsGetter.Pods(namespace).List(metav1.ListOptions{LabelSelector: selector.String()}) + + podList, err := c.podLister.Pods(namespace).List(selector) if err != nil { return 0, 0, fmt.Errorf("unable to get pods while calculating replica count: %v", err) } - if len(podList.Items) == 0 { + if len(podList) == 0 { return 0, 0, fmt.Errorf("no pods returned by selector while calculating replica count") } - readyPodCount, ignoredPods, missingPods := groupPods(podList.Items, metrics, resource, c.cpuInitializationPeriod, c.delayOfInitialReadinessStatus) + readyPodCount, ignoredPods, missingPods := groupPods(podList, metrics, resource, c.cpuInitializationPeriod, c.delayOfInitialReadinessStatus) removeMetricsForPods(metrics, ignoredPods) if len(metrics) == 0 { @@ -261,19 +261,19 @@ func (c *ReplicaCalculator) GetObjectMetricReplicas(currentReplicas int32, targe // of this function. Make this function generic, so we don't repeat the same // logic in multiple places. func (c *ReplicaCalculator) getReadyPodsCount(namespace string, selector labels.Selector) (int64, error) { - podList, err := c.podsGetter.Pods(namespace).List(metav1.ListOptions{LabelSelector: selector.String()}) + podList, err := c.podLister.Pods(namespace).List(selector) if err != nil { return 0, fmt.Errorf("unable to get pods while calculating replica count: %v", err) } - if len(podList.Items) == 0 { + if len(podList) == 0 { return 0, fmt.Errorf("no pods returned by selector while calculating replica count") } readyPodCount := 0 - for _, pod := range podList.Items { - if pod.Status.Phase == v1.PodRunning && podutil.IsPodReady(&pod) { + for _, pod := range podList { + if pod.Status.Phase == v1.PodRunning && podutil.IsPodReady(pod) { readyPodCount++ } } @@ -340,7 +340,7 @@ func (c *ReplicaCalculator) GetExternalPerPodMetricReplicas(currentReplicas int3 return replicaCount, utilization, timestamp, nil } -func groupPods(pods []v1.Pod, metrics metricsclient.PodMetricsInfo, resource v1.ResourceName, cpuInitializationPeriod, delayOfInitialReadinessStatus time.Duration) (readyPodCount int, ignoredPods sets.String, missingPods sets.String) { +func groupPods(pods []*v1.Pod, metrics metricsclient.PodMetricsInfo, resource v1.ResourceName, cpuInitializationPeriod, delayOfInitialReadinessStatus time.Duration) (readyPodCount int, ignoredPods sets.String, missingPods sets.String) { missingPods = sets.NewString() ignoredPods = sets.NewString() for _, pod := range pods { @@ -377,7 +377,7 @@ func groupPods(pods []v1.Pod, metrics metricsclient.PodMetricsInfo, resource v1. return } -func calculatePodRequests(pods []v1.Pod, resource v1.ResourceName) (map[string]int64, error) { +func calculatePodRequests(pods []*v1.Pod, resource v1.ResourceName) (map[string]int64, error) { requests := make(map[string]int64, len(pods)) for _, pod := range pods { podSum := int64(0) diff --git a/pkg/controller/podautoscaler/replica_calculator_test.go b/pkg/controller/podautoscaler/replica_calculator_test.go index d8492c1c05..ff71498278 100644 --- a/pkg/controller/podautoscaler/replica_calculator_test.go +++ b/pkg/controller/podautoscaler/replica_calculator_test.go @@ -31,9 +31,11 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" core "k8s.io/client-go/testing" "k8s.io/kubernetes/pkg/api/legacyscheme" + "k8s.io/kubernetes/pkg/controller" metricsclient "k8s.io/kubernetes/pkg/controller/podautoscaler/metrics" cmapi "k8s.io/metrics/pkg/apis/custom_metrics/v1beta2" emapi "k8s.io/metrics/pkg/apis/external_metrics/v1beta1" @@ -338,7 +340,17 @@ func (tc *replicaCalcTestCase) runTest(t *testing.T) { testClient, testMetricsClient, testCMClient, testEMClient := tc.prepareTestClient(t) metricsClient := metricsclient.NewRESTMetricsClient(testMetricsClient.MetricsV1beta1(), testCMClient, testEMClient) - replicaCalc := NewReplicaCalculator(metricsClient, testClient.Core(), defaultTestingTolerance, defaultTestingCpuInitializationPeriod, defaultTestingDelayOfInitialReadinessStatus) + informerFactory := informers.NewSharedInformerFactory(testClient, controller.NoResyncPeriodFunc()) + informer := informerFactory.Core().V1().Pods() + + replicaCalc := NewReplicaCalculator(metricsClient, informer.Lister(), defaultTestingTolerance, defaultTestingCpuInitializationPeriod, defaultTestingDelayOfInitialReadinessStatus) + + stop := make(chan struct{}) + defer close(stop) + informerFactory.Start(stop) + if !controller.WaitForCacheSync("HPA", stop, informer.Informer().HasSynced) { + return + } selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{ MatchLabels: map[string]string{"name": podNamePrefix}, @@ -1224,7 +1236,7 @@ func TestReplicaCalcComputedToleranceAlgImplementation(t *testing.T) { func TestGroupPods(t *testing.T) { tests := []struct { name string - pods []v1.Pod + pods []*v1.Pod metrics metricsclient.PodMetricsInfo resource v1.ResourceName expectReadyPodCount int @@ -1233,7 +1245,7 @@ func TestGroupPods(t *testing.T) { }{ { "void", - []v1.Pod{}, + []*v1.Pod{}, metricsclient.PodMetricsInfo{}, v1.ResourceCPU, 0, @@ -1242,7 +1254,7 @@ func TestGroupPods(t *testing.T) { }, { "count in a ready pod - memory", - []v1.Pod{ + []*v1.Pod{ { ObjectMeta: metav1.ObjectMeta{ Name: "bentham", @@ -1262,7 +1274,7 @@ func TestGroupPods(t *testing.T) { }, { "ignore a pod without ready condition - CPU", - []v1.Pod{ + []*v1.Pod{ { ObjectMeta: metav1.ObjectMeta{ Name: "lucretius", @@ -1285,7 +1297,7 @@ func TestGroupPods(t *testing.T) { }, { "count in a ready pod with fresh metrics during initialization period - CPU", - []v1.Pod{ + []*v1.Pod{ { ObjectMeta: metav1.ObjectMeta{ Name: "bentham", @@ -1315,7 +1327,7 @@ func TestGroupPods(t *testing.T) { }, { "ignore a ready pod without fresh metrics during initialization period - CPU", - []v1.Pod{ + []*v1.Pod{ { ObjectMeta: metav1.ObjectMeta{ Name: "bentham", @@ -1345,7 +1357,7 @@ func TestGroupPods(t *testing.T) { }, { "ignore an unready pod during initialization period - CPU", - []v1.Pod{ + []*v1.Pod{ { ObjectMeta: metav1.ObjectMeta{ Name: "lucretius", @@ -1375,7 +1387,7 @@ func TestGroupPods(t *testing.T) { }, { "count in a ready pod without fresh metrics after initialization period - CPU", - []v1.Pod{ + []*v1.Pod{ { ObjectMeta: metav1.ObjectMeta{ Name: "bentham", @@ -1406,7 +1418,7 @@ func TestGroupPods(t *testing.T) { { "count in an unready pod that was ready after initialization period - CPU", - []v1.Pod{ + []*v1.Pod{ { ObjectMeta: metav1.ObjectMeta{ Name: "lucretius", @@ -1436,7 +1448,7 @@ func TestGroupPods(t *testing.T) { }, { "ignore pod that has never been ready after initialization period - CPU", - []v1.Pod{ + []*v1.Pod{ { ObjectMeta: metav1.ObjectMeta{ Name: "lucretius", @@ -1466,7 +1478,7 @@ func TestGroupPods(t *testing.T) { }, { "a missing pod", - []v1.Pod{ + []*v1.Pod{ { ObjectMeta: metav1.ObjectMeta{ Name: "epicurus", @@ -1487,7 +1499,7 @@ func TestGroupPods(t *testing.T) { }, { "several pods", - []v1.Pod{ + []*v1.Pod{ { ObjectMeta: metav1.ObjectMeta{ Name: "lucretius",