mirror of https://github.com/k3s-io/k3s
Merge pull request #47078 from DirectXMan12/bug/only-update-when-needed
Automatic merge from submit-queue (batch tested with PRs 46979, 47078, 47138, 46916) HPA: only send updates when the status has changed This commit only sends updates if the status has actually changed. Since the HPA runs at a regular interval, this should reduce the volume of writes, especially on short HPA intervals with relatively constant metrics. Fixes #47077 **Release note**: ```release-note The HorizontalPodAutoscaler controller will now only send updates when it has new status information, reducing the number of writes caused by the controller. ```pull/6/head
commit
6e8d6acdb7
|
@ -32,6 +32,7 @@ go_library(
|
|||
"//pkg/controller:go_default_library",
|
||||
"//pkg/controller/podautoscaler/metrics:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
|
|
|
@ -22,6 +22,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
@ -371,6 +372,12 @@ func (a *HorizontalController) reconcileAutoscaler(hpav1Shared *autoscalingv1.Ho
|
|||
return fmt.Errorf("failed to convert the given HPA to %s: %v", autoscalingv2.SchemeGroupVersion.String(), err)
|
||||
}
|
||||
hpa := hpaRaw.(*autoscalingv2.HorizontalPodAutoscaler)
|
||||
hpaStatusOriginalRaw, err := api.Scheme.DeepCopy(&hpa.Status)
|
||||
if err != nil {
|
||||
a.eventRecorder.Event(hpav1Shared, v1.EventTypeWarning, "FailedConvertHPA", err.Error())
|
||||
return fmt.Errorf("failed to deep-copy the HPA status: %v", err)
|
||||
}
|
||||
hpaStatusOriginal := hpaStatusOriginalRaw.(*autoscalingv2.HorizontalPodAutoscalerStatus)
|
||||
|
||||
reference := fmt.Sprintf("%s/%s/%s", hpa.Spec.ScaleTargetRef.Kind, hpa.Namespace, hpa.Spec.ScaleTargetRef.Name)
|
||||
|
||||
|
@ -378,7 +385,7 @@ func (a *HorizontalController) reconcileAutoscaler(hpav1Shared *autoscalingv1.Ho
|
|||
if err != nil {
|
||||
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedGetScale", err.Error())
|
||||
setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedGetScale", "the HPA controller was unable to get the target's current scale: %v", err)
|
||||
a.update(hpa)
|
||||
a.updateStatusIfNeeded(hpaStatusOriginal, hpa)
|
||||
return fmt.Errorf("failed to query scale subresource for %s: %v", reference, err)
|
||||
}
|
||||
setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, "SucceededGetScale", "the HPA controller was able to get the target's current scale")
|
||||
|
@ -412,7 +419,10 @@ func (a *HorizontalController) reconcileAutoscaler(hpav1Shared *autoscalingv1.Ho
|
|||
} else {
|
||||
metricDesiredReplicas, metricName, metricStatuses, metricTimestamp, err = a.computeReplicasForMetrics(hpa, scale, hpa.Spec.Metrics)
|
||||
if err != nil {
|
||||
a.updateCurrentReplicasInStatus(hpa, currentReplicas)
|
||||
a.setCurrentReplicasInStatus(hpa, currentReplicas)
|
||||
if err := a.updateStatusIfNeeded(hpaStatusOriginal, hpa); err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
}
|
||||
a.eventRecorder.Event(hpa, v1.EventTypeWarning, "FailedComputeMetricsReplicas", err.Error())
|
||||
return fmt.Errorf("failed to compute desired number of replicas based on listed metrics for %s: %v", reference, err)
|
||||
}
|
||||
|
@ -489,7 +499,10 @@ func (a *HorizontalController) reconcileAutoscaler(hpav1Shared *autoscalingv1.Ho
|
|||
if err != nil {
|
||||
a.eventRecorder.Eventf(hpa, v1.EventTypeWarning, "FailedRescale", "New size: %d; reason: %s; error: %v", desiredReplicas, rescaleReason, err.Error())
|
||||
setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionFalse, "FailedUpdateScale", "the HPA controller was unable to update the target scale: %v", err)
|
||||
a.updateCurrentReplicasInStatus(hpa, currentReplicas)
|
||||
a.setCurrentReplicasInStatus(hpa, currentReplicas)
|
||||
if err := a.updateStatusIfNeeded(hpaStatusOriginal, hpa); err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
}
|
||||
return fmt.Errorf("failed to rescale %s: %v", reference, err)
|
||||
}
|
||||
setCondition(hpa, autoscalingv2.AbleToScale, v1.ConditionTrue, "SucceededRescale", "the HPA controller was able to update the target scale to %d", desiredReplicas)
|
||||
|
@ -501,7 +514,8 @@ func (a *HorizontalController) reconcileAutoscaler(hpav1Shared *autoscalingv1.Ho
|
|||
desiredReplicas = currentReplicas
|
||||
}
|
||||
|
||||
return a.updateStatusWithReplicas(hpa, currentReplicas, desiredReplicas, metricStatuses, rescale)
|
||||
a.setStatus(hpa, currentReplicas, desiredReplicas, metricStatuses, rescale)
|
||||
return a.updateStatusIfNeeded(hpaStatusOriginal, hpa)
|
||||
}
|
||||
|
||||
func (a *HorizontalController) shouldScale(hpa *autoscalingv2.HorizontalPodAutoscaler, currentReplicas, desiredReplicas int32, timestamp time.Time) bool {
|
||||
|
@ -528,14 +542,14 @@ func (a *HorizontalController) shouldScale(hpa *autoscalingv2.HorizontalPodAutos
|
|||
return false
|
||||
}
|
||||
|
||||
func (a *HorizontalController) updateCurrentReplicasInStatus(hpa *autoscalingv2.HorizontalPodAutoscaler, currentReplicas int32) {
|
||||
err := a.updateStatusWithReplicas(hpa, currentReplicas, hpa.Status.DesiredReplicas, hpa.Status.CurrentMetrics, false)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
}
|
||||
// setCurrentReplicasInStatus sets the current replica count in the status of the HPA.
|
||||
func (a *HorizontalController) setCurrentReplicasInStatus(hpa *autoscalingv2.HorizontalPodAutoscaler, currentReplicas int32) {
|
||||
a.setStatus(hpa, currentReplicas, hpa.Status.DesiredReplicas, hpa.Status.CurrentMetrics, false)
|
||||
}
|
||||
|
||||
func (a *HorizontalController) updateStatusWithReplicas(hpa *autoscalingv2.HorizontalPodAutoscaler, currentReplicas, desiredReplicas int32, metricStatuses []autoscalingv2.MetricStatus, rescale bool) error {
|
||||
// setStatus recreates the status of the given HPA, updating the current and
|
||||
// desired replicas, as well as the metric statuses
|
||||
func (a *HorizontalController) setStatus(hpa *autoscalingv2.HorizontalPodAutoscaler, currentReplicas, desiredReplicas int32, metricStatuses []autoscalingv2.MetricStatus, rescale bool) {
|
||||
hpa.Status = autoscalingv2.HorizontalPodAutoscalerStatus{
|
||||
CurrentReplicas: currentReplicas,
|
||||
DesiredReplicas: desiredReplicas,
|
||||
|
@ -548,11 +562,19 @@ func (a *HorizontalController) updateStatusWithReplicas(hpa *autoscalingv2.Horiz
|
|||
now := metav1.NewTime(time.Now())
|
||||
hpa.Status.LastScaleTime = &now
|
||||
}
|
||||
|
||||
return a.update(hpa)
|
||||
}
|
||||
|
||||
func (a *HorizontalController) update(hpa *autoscalingv2.HorizontalPodAutoscaler) error {
|
||||
// updateStatusIfNeeded calls updateStatus only if the status of the new HPA is not the same as the old status
|
||||
func (a *HorizontalController) updateStatusIfNeeded(oldStatus *autoscalingv2.HorizontalPodAutoscalerStatus, newHPA *autoscalingv2.HorizontalPodAutoscaler) error {
|
||||
// skip a write if we wouldn't need to update
|
||||
if apiequality.Semantic.DeepEqual(oldStatus, &newHPA.Status) {
|
||||
return nil
|
||||
}
|
||||
return a.updateStatus(newHPA)
|
||||
}
|
||||
|
||||
// updateStatus actually does the update request for the status of the given HPA
|
||||
func (a *HorizontalController) updateStatus(hpa *autoscalingv2.HorizontalPodAutoscaler) error {
|
||||
// convert back to autoscalingv1
|
||||
hpaRaw, err := UnsafeConvertToVersionVia(hpa, autoscalingv1.SchemeGroupVersion)
|
||||
if err != nil {
|
||||
|
|
|
@ -538,7 +538,7 @@ func (tc *testCase) verifyResults(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func (tc *testCase) runTest(t *testing.T) {
|
||||
func (tc *testCase) setupController(t *testing.T) (*HorizontalController, informers.SharedInformerFactory) {
|
||||
testClient, testMetricsClient, testCMClient := tc.prepareTestClient(t)
|
||||
if tc.testClient != nil {
|
||||
testClient = tc.testClient
|
||||
|
@ -598,6 +598,10 @@ func (tc *testCase) runTest(t *testing.T) {
|
|||
)
|
||||
hpaController.hpaListerSynced = alwaysReady
|
||||
|
||||
return hpaController, informerFactory
|
||||
}
|
||||
|
||||
func (tc *testCase) runTestWithController(t *testing.T, hpaController *HorizontalController, informerFactory informers.SharedInformerFactory) {
|
||||
stop := make(chan struct{})
|
||||
defer close(stop)
|
||||
informerFactory.Start(stop)
|
||||
|
@ -616,6 +620,11 @@ func (tc *testCase) runTest(t *testing.T) {
|
|||
tc.verifyResults(t)
|
||||
}
|
||||
|
||||
func (tc *testCase) runTest(t *testing.T) {
|
||||
hpaController, informerFactory := tc.setupController(t)
|
||||
tc.runTestWithController(t, hpaController, informerFactory)
|
||||
}
|
||||
|
||||
func TestScaleUp(t *testing.T) {
|
||||
tc := testCase{
|
||||
minReplicas: 2,
|
||||
|
@ -1594,4 +1603,73 @@ func TestScaleDownRCImmediately(t *testing.T) {
|
|||
tc.runTest(t)
|
||||
}
|
||||
|
||||
func TestAvoidUncessaryUpdates(t *testing.T) {
|
||||
tc := testCase{
|
||||
minReplicas: 2,
|
||||
maxReplicas: 6,
|
||||
initialReplicas: 3,
|
||||
desiredReplicas: 3,
|
||||
CPUTarget: 30,
|
||||
CPUCurrent: 40,
|
||||
verifyCPUCurrent: true,
|
||||
reportedLevels: []uint64{400, 500, 700},
|
||||
reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
|
||||
reportedPodReadiness: []v1.ConditionStatus{v1.ConditionTrue, v1.ConditionFalse, v1.ConditionFalse},
|
||||
useMetricsApi: true,
|
||||
}
|
||||
testClient, _, _ := tc.prepareTestClient(t)
|
||||
tc.testClient = testClient
|
||||
var savedHPA *autoscalingv1.HorizontalPodAutoscaler
|
||||
testClient.PrependReactor("list", "horizontalpodautoscalers", func(action core.Action) (handled bool, ret runtime.Object, err error) {
|
||||
tc.Lock()
|
||||
defer tc.Unlock()
|
||||
|
||||
if savedHPA != nil {
|
||||
// fake out the verification logic and mark that we're done processing
|
||||
go func() {
|
||||
// wait a tick and then mark that we're finished (otherwise, we have no
|
||||
// way to indicate that we're finished, because the function decides not to do anything)
|
||||
time.Sleep(1 * time.Second)
|
||||
tc.statusUpdated = true
|
||||
tc.processed <- "test-hpa"
|
||||
}()
|
||||
return true, &autoscalingv1.HorizontalPodAutoscalerList{
|
||||
Items: []autoscalingv1.HorizontalPodAutoscaler{*savedHPA},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// fallthrough
|
||||
return false, nil, nil
|
||||
})
|
||||
testClient.PrependReactor("update", "horizontalpodautoscalers", func(action core.Action) (handled bool, ret runtime.Object, err error) {
|
||||
tc.Lock()
|
||||
defer tc.Unlock()
|
||||
|
||||
if savedHPA == nil {
|
||||
// save the HPA and return it
|
||||
savedHPA = action.(core.UpdateAction).GetObject().(*autoscalingv1.HorizontalPodAutoscaler)
|
||||
return true, savedHPA, nil
|
||||
}
|
||||
|
||||
assert.Fail(t, "should not have attempted to update the HPA when nothing changed")
|
||||
// mark that we've processed this HPA
|
||||
tc.processed <- ""
|
||||
return true, nil, fmt.Errorf("unexpected call")
|
||||
})
|
||||
|
||||
controller, informerFactory := tc.setupController(t)
|
||||
|
||||
// fake an initial processing loop to populate savedHPA
|
||||
initialHPAs, err := testClient.Autoscaling().HorizontalPodAutoscalers("test-namespace").List(metav1.ListOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if err := controller.reconcileAutoscaler(&initialHPAs.Items[0]); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
// actually run the test
|
||||
tc.runTestWithController(t, controller, informerFactory)
|
||||
}
|
||||
|
||||
// TODO: add more tests
|
||||
|
|
Loading…
Reference in New Issue