diff --git a/pkg/controller/podautoscaler/horizontal.go b/pkg/controller/podautoscaler/horizontal.go index 4220f6158f..b73c1b15c6 100644 --- a/pkg/controller/podautoscaler/horizontal.go +++ b/pkg/controller/podautoscaler/horizontal.go @@ -17,12 +17,14 @@ limitations under the License. package podautoscaler import ( + "encoding/json" "fmt" "math" "time" "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/resource" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/client/record" @@ -35,6 +37,9 @@ const ( // Usage shoud exceed the tolerance before we start downscale or upscale the pods. // TODO: make it a flag or HPA spec element. tolerance = 0.1 + + HpaCustomMetricsDefinitionAnnotationName = "alpha/definiton.custom-metrics.podautoscaler.kubernetes.io" + HpaCustomMetricsStatusAnnotationName = "alpha/status.custom-metrics.podautoscaler.kubernetes.io" ) type HorizontalController struct { @@ -93,6 +98,71 @@ func (a *HorizontalController) computeReplicasForCPUUtilization(hpa extensions.H } } +// Computes the desired number of replicas based on the CustomMetrics passed in cmAnnotation as json-serialized +// extensions.CustomMetricsTargetList. +// Returns number of replicas, status string (also json-serialized extensions.CustomMetricsCurrentStatusList), +// last timestamp of the metrics involved in computations or error, if occurred. +func (a *HorizontalController) computeReplicasForCustomMetrics(hpa extensions.HorizontalPodAutoscaler, scale *extensions.Scale, + cmAnnotation string) (int, string, time.Time, error) { + + currentReplicas := scale.Status.Replicas + replicas := 0 + timestamp := time.Time{} + + if cmAnnotation == "" { + return 0, "", time.Time{}, nil + } + + var targetList extensions.CustomMetricTargetList + if err := json.Unmarshal([]byte(cmAnnotation), &targetList); err != nil { + return 0, "", time.Time{}, fmt.Errorf("failed to parse custom metrics annotation: %v", err) + } + if len(targetList.Items) == 0 { + return 0, "", time.Time{}, fmt.Errorf("no custom metrics in annotation") + } + + statusList := extensions.CustomMetricCurrentStatusList{ + Items: make([]extensions.CustomMetricCurrentStatus, 0), + } + + for _, customMetricTarget := range targetList.Items { + value, currentTimestamp, err := a.metricsClient.GetCustomMetric(customMetricTarget.Name, hpa.Namespace, scale.Status.Selector) + // TODO: what to do on partial errors (like metrics obtained for 75% of pods). + if err != nil { + a.eventRecorder.Event(&hpa, api.EventTypeWarning, "FailedGetCustomMetrics", err.Error()) + return 0, "", time.Time{}, fmt.Errorf("failed to get custom metric value: %v", err) + } + floatTarget := float64(customMetricTarget.TargetValue.MilliValue()) / 1000.0 + usageRatio := *value / floatTarget + + replicaCountProposal := 0 + if math.Abs(1.0-usageRatio) > tolerance { + replicaCountProposal = int(math.Ceil(usageRatio * float64(currentReplicas))) + } else { + replicaCountProposal = currentReplicas + } + if replicaCountProposal > replicas { + timestamp = currentTimestamp + replicas = replicaCountProposal + } + quantity, err := resource.ParseQuantity(fmt.Sprintf("%.3f", *value)) + if err != nil { + return 0, "", time.Time{}, fmt.Errorf("failed to set custom metric value: %v", err) + } + statusList.Items = append(statusList.Items, extensions.CustomMetricCurrentStatus{ + Name: customMetricTarget.Name, + CurrentValue: *quantity, + }) + } + byteStatusList, err := json.Marshal(statusList) + if err != nil { + return 0, "", time.Time{}, fmt.Errorf("failed to serialize custom metric status: %v", err) + } + + return replicas, string(byteStatusList), timestamp, nil + +} + func (a *HorizontalController) reconcileAutoscaler(hpa extensions.HorizontalPodAutoscaler) error { reference := fmt.Sprintf("%s/%s/%s", hpa.Spec.ScaleRef.Kind, hpa.Namespace, hpa.Spec.ScaleRef.Name) @@ -103,10 +173,40 @@ func (a *HorizontalController) reconcileAutoscaler(hpa extensions.HorizontalPodA } currentReplicas := scale.Status.Replicas - desiredReplicas, currentUtilization, timestamp, err := a.computeReplicasForCPUUtilization(hpa, scale) - if err != nil { - a.eventRecorder.Event(&hpa, api.EventTypeWarning, "FailedComputeReplicas", err.Error()) - return fmt.Errorf("failed to compute desired number of replicas based on CPU utilization for %s: %v", reference, err) + cpuDesiredReplicas := 0 + var cpuCurrentUtilization *int = nil + cpuTimestamp := time.Time{} + + cmDesiredReplicas := 0 + cmStatus := "" + cmTimestamp := time.Time{} + + if hpa.Spec.CPUUtilization != nil { + cpuDesiredReplicas, cpuCurrentUtilization, cpuTimestamp, err = a.computeReplicasForCPUUtilization(hpa, scale) + if err != nil { + a.eventRecorder.Event(&hpa, api.EventTypeWarning, "FailedComputeReplicas", err.Error()) + return fmt.Errorf("failed to compute desired number of replicas based on CPU utilization for %s: %v", reference, err) + } + } + + if cmAnnotation, cmAnnotationFound := hpa.Annotations[HpaCustomMetricsDefinitionAnnotationName]; cmAnnotationFound { + cmDesiredReplicas, cmStatus, cmTimestamp, err = a.computeReplicasForCustomMetrics(hpa, scale, cmAnnotation) + if err != nil { + a.eventRecorder.Event(&hpa, api.EventTypeWarning, "FailedComputeCMReplicas", err.Error()) + return fmt.Errorf("failed to compute desired number of replicas based on Custom Metrics for %s: %v", reference, err) + } + } + + desiredReplicas := 0 + timestamp := time.Time{} + + if cpuDesiredReplicas > desiredReplicas { + desiredReplicas = cpuDesiredReplicas + timestamp = cpuTimestamp + } + if cmDesiredReplicas > desiredReplicas { + desiredReplicas = cmDesiredReplicas + timestamp = cmTimestamp } if hpa.Spec.MinReplicas != nil && desiredReplicas < *hpa.Spec.MinReplicas { @@ -158,9 +258,13 @@ func (a *HorizontalController) reconcileAutoscaler(hpa extensions.HorizontalPodA hpa.Status = extensions.HorizontalPodAutoscalerStatus{ CurrentReplicas: currentReplicas, DesiredReplicas: desiredReplicas, - CurrentCPUUtilizationPercentage: currentUtilization, + CurrentCPUUtilizationPercentage: cpuCurrentUtilization, LastScaleTime: hpa.Status.LastScaleTime, } + if cmStatus != "" { + hpa.Annotations[HpaCustomMetricsStatusAnnotationName] = cmStatus + } + if rescale { now := unversioned.NewTime(time.Now()) hpa.Status.LastScaleTime = &now diff --git a/pkg/controller/podautoscaler/horizontal_test.go b/pkg/controller/podautoscaler/horizontal_test.go index bd0bd63b69..c60bb812e5 100644 --- a/pkg/controller/podautoscaler/horizontal_test.go +++ b/pkg/controller/podautoscaler/horizontal_test.go @@ -62,6 +62,7 @@ type testCase struct { CPUTarget int reportedLevels []uint64 reportedCPURequests []resource.Quantity + cmTarget *extensions.CustomMetricTargetList scaleUpdated bool eventCreated bool verifyEvents bool @@ -101,6 +102,14 @@ func (tc *testCase) prepareTestClient(t *testing.T) *testclient.Fake { if tc.CPUTarget > 0.0 { obj.Items[0].Spec.CPUUtilization = &extensions.CPUTargetUtilization{TargetPercentage: tc.CPUTarget} } + if tc.cmTarget != nil { + b, err := json.Marshal(tc.cmTarget) + if err != nil { + t.Fatalf("Failed to marshal cm: %v", err) + } + obj.Items[0].Annotations = make(map[string]string) + obj.Items[0].Annotations[HpaCustomMetricsDefinitionAnnotationName] = string(b) + } return true, obj, nil }) @@ -229,6 +238,25 @@ func TestScaleUp(t *testing.T) { tc.runTest(t) } +func TestScaleUpCM(t *testing.T) { + tc := testCase{ + minReplicas: 2, + maxReplicas: 6, + initialReplicas: 3, + desiredReplicas: 4, + CPUTarget: 0, + cmTarget: &extensions.CustomMetricTargetList{ + Items: []extensions.CustomMetricTarget{{ + Name: "qps", + TargetValue: resource.MustParse("15.0"), + }}, + }, + reportedLevels: []uint64{20, 10, 30}, + reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")}, + } + tc.runTest(t) +} + func TestScaleDown(t *testing.T) { tc := testCase{ minReplicas: 2, @@ -242,6 +270,24 @@ func TestScaleDown(t *testing.T) { tc.runTest(t) } +func TestScaleDownCM(t *testing.T) { + tc := testCase{ + minReplicas: 2, + maxReplicas: 6, + initialReplicas: 5, + desiredReplicas: 3, + CPUTarget: 0, + cmTarget: &extensions.CustomMetricTargetList{ + Items: []extensions.CustomMetricTarget{{ + Name: "qps", + TargetValue: resource.MustParse("20"), + }}}, + reportedLevels: []uint64{12, 12, 12, 12, 12}, + reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")}, + } + tc.runTest(t) +} + func TestTolerance(t *testing.T) { tc := testCase{ minReplicas: 1, @@ -255,6 +301,23 @@ func TestTolerance(t *testing.T) { tc.runTest(t) } +func TestToleranceCM(t *testing.T) { + tc := testCase{ + minReplicas: 1, + maxReplicas: 5, + initialReplicas: 3, + desiredReplicas: 3, + cmTarget: &extensions.CustomMetricTargetList{ + Items: []extensions.CustomMetricTarget{{ + Name: "qps", + TargetValue: resource.MustParse("20"), + }}}, + reportedLevels: []uint64{20, 21, 21}, + reportedCPURequests: []resource.Quantity{resource.MustParse("0.9"), resource.MustParse("1.0"), resource.MustParse("1.1")}, + } + tc.runTest(t) +} + func TestMinReplicas(t *testing.T) { tc := testCase{ minReplicas: 2,