mirror of https://github.com/k3s-io/k3s
commit
5a099308f5
|
@ -17,12 +17,14 @@ limitations under the License.
|
|||
package podautoscaler
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/resource"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/apis/extensions"
|
||||
"k8s.io/kubernetes/pkg/client/record"
|
||||
|
@ -35,6 +37,9 @@ const (
|
|||
// Usage shoud exceed the tolerance before we start downscale or upscale the pods.
|
||||
// TODO: make it a flag or HPA spec element.
|
||||
tolerance = 0.1
|
||||
|
||||
HpaCustomMetricsDefinitionAnnotationName = "alpha/definiton.custom-metrics.podautoscaler.kubernetes.io"
|
||||
HpaCustomMetricsStatusAnnotationName = "alpha/status.custom-metrics.podautoscaler.kubernetes.io"
|
||||
)
|
||||
|
||||
type HorizontalController struct {
|
||||
|
@ -93,6 +98,71 @@ func (a *HorizontalController) computeReplicasForCPUUtilization(hpa extensions.H
|
|||
}
|
||||
}
|
||||
|
||||
// Computes the desired number of replicas based on the CustomMetrics passed in cmAnnotation as json-serialized
|
||||
// extensions.CustomMetricsTargetList.
|
||||
// Returns number of replicas, status string (also json-serialized extensions.CustomMetricsCurrentStatusList),
|
||||
// last timestamp of the metrics involved in computations or error, if occurred.
|
||||
func (a *HorizontalController) computeReplicasForCustomMetrics(hpa extensions.HorizontalPodAutoscaler, scale *extensions.Scale,
|
||||
cmAnnotation string) (int, string, time.Time, error) {
|
||||
|
||||
currentReplicas := scale.Status.Replicas
|
||||
replicas := 0
|
||||
timestamp := time.Time{}
|
||||
|
||||
if cmAnnotation == "" {
|
||||
return 0, "", time.Time{}, nil
|
||||
}
|
||||
|
||||
var targetList extensions.CustomMetricTargetList
|
||||
if err := json.Unmarshal([]byte(cmAnnotation), &targetList); err != nil {
|
||||
return 0, "", time.Time{}, fmt.Errorf("failed to parse custom metrics annotation: %v", err)
|
||||
}
|
||||
if len(targetList.Items) == 0 {
|
||||
return 0, "", time.Time{}, fmt.Errorf("no custom metrics in annotation")
|
||||
}
|
||||
|
||||
statusList := extensions.CustomMetricCurrentStatusList{
|
||||
Items: make([]extensions.CustomMetricCurrentStatus, 0),
|
||||
}
|
||||
|
||||
for _, customMetricTarget := range targetList.Items {
|
||||
value, currentTimestamp, err := a.metricsClient.GetCustomMetric(customMetricTarget.Name, hpa.Namespace, scale.Status.Selector)
|
||||
// TODO: what to do on partial errors (like metrics obtained for 75% of pods).
|
||||
if err != nil {
|
||||
a.eventRecorder.Event(&hpa, api.EventTypeWarning, "FailedGetCustomMetrics", err.Error())
|
||||
return 0, "", time.Time{}, fmt.Errorf("failed to get custom metric value: %v", err)
|
||||
}
|
||||
floatTarget := float64(customMetricTarget.TargetValue.MilliValue()) / 1000.0
|
||||
usageRatio := *value / floatTarget
|
||||
|
||||
replicaCountProposal := 0
|
||||
if math.Abs(1.0-usageRatio) > tolerance {
|
||||
replicaCountProposal = int(math.Ceil(usageRatio * float64(currentReplicas)))
|
||||
} else {
|
||||
replicaCountProposal = currentReplicas
|
||||
}
|
||||
if replicaCountProposal > replicas {
|
||||
timestamp = currentTimestamp
|
||||
replicas = replicaCountProposal
|
||||
}
|
||||
quantity, err := resource.ParseQuantity(fmt.Sprintf("%.3f", *value))
|
||||
if err != nil {
|
||||
return 0, "", time.Time{}, fmt.Errorf("failed to set custom metric value: %v", err)
|
||||
}
|
||||
statusList.Items = append(statusList.Items, extensions.CustomMetricCurrentStatus{
|
||||
Name: customMetricTarget.Name,
|
||||
CurrentValue: *quantity,
|
||||
})
|
||||
}
|
||||
byteStatusList, err := json.Marshal(statusList)
|
||||
if err != nil {
|
||||
return 0, "", time.Time{}, fmt.Errorf("failed to serialize custom metric status: %v", err)
|
||||
}
|
||||
|
||||
return replicas, string(byteStatusList), timestamp, nil
|
||||
|
||||
}
|
||||
|
||||
func (a *HorizontalController) reconcileAutoscaler(hpa extensions.HorizontalPodAutoscaler) error {
|
||||
reference := fmt.Sprintf("%s/%s/%s", hpa.Spec.ScaleRef.Kind, hpa.Namespace, hpa.Spec.ScaleRef.Name)
|
||||
|
||||
|
@ -103,11 +173,41 @@ func (a *HorizontalController) reconcileAutoscaler(hpa extensions.HorizontalPodA
|
|||
}
|
||||
currentReplicas := scale.Status.Replicas
|
||||
|
||||
desiredReplicas, currentUtilization, timestamp, err := a.computeReplicasForCPUUtilization(hpa, scale)
|
||||
cpuDesiredReplicas := 0
|
||||
var cpuCurrentUtilization *int = nil
|
||||
cpuTimestamp := time.Time{}
|
||||
|
||||
cmDesiredReplicas := 0
|
||||
cmStatus := ""
|
||||
cmTimestamp := time.Time{}
|
||||
|
||||
if hpa.Spec.CPUUtilization != nil {
|
||||
cpuDesiredReplicas, cpuCurrentUtilization, cpuTimestamp, err = a.computeReplicasForCPUUtilization(hpa, scale)
|
||||
if err != nil {
|
||||
a.eventRecorder.Event(&hpa, api.EventTypeWarning, "FailedComputeReplicas", err.Error())
|
||||
return fmt.Errorf("failed to compute desired number of replicas based on CPU utilization for %s: %v", reference, err)
|
||||
}
|
||||
}
|
||||
|
||||
if cmAnnotation, cmAnnotationFound := hpa.Annotations[HpaCustomMetricsDefinitionAnnotationName]; cmAnnotationFound {
|
||||
cmDesiredReplicas, cmStatus, cmTimestamp, err = a.computeReplicasForCustomMetrics(hpa, scale, cmAnnotation)
|
||||
if err != nil {
|
||||
a.eventRecorder.Event(&hpa, api.EventTypeWarning, "FailedComputeCMReplicas", err.Error())
|
||||
return fmt.Errorf("failed to compute desired number of replicas based on Custom Metrics for %s: %v", reference, err)
|
||||
}
|
||||
}
|
||||
|
||||
desiredReplicas := 0
|
||||
timestamp := time.Time{}
|
||||
|
||||
if cpuDesiredReplicas > desiredReplicas {
|
||||
desiredReplicas = cpuDesiredReplicas
|
||||
timestamp = cpuTimestamp
|
||||
}
|
||||
if cmDesiredReplicas > desiredReplicas {
|
||||
desiredReplicas = cmDesiredReplicas
|
||||
timestamp = cmTimestamp
|
||||
}
|
||||
|
||||
if hpa.Spec.MinReplicas != nil && desiredReplicas < *hpa.Spec.MinReplicas {
|
||||
desiredReplicas = *hpa.Spec.MinReplicas
|
||||
|
@ -158,9 +258,13 @@ func (a *HorizontalController) reconcileAutoscaler(hpa extensions.HorizontalPodA
|
|||
hpa.Status = extensions.HorizontalPodAutoscalerStatus{
|
||||
CurrentReplicas: currentReplicas,
|
||||
DesiredReplicas: desiredReplicas,
|
||||
CurrentCPUUtilizationPercentage: currentUtilization,
|
||||
CurrentCPUUtilizationPercentage: cpuCurrentUtilization,
|
||||
LastScaleTime: hpa.Status.LastScaleTime,
|
||||
}
|
||||
if cmStatus != "" {
|
||||
hpa.Annotations[HpaCustomMetricsStatusAnnotationName] = cmStatus
|
||||
}
|
||||
|
||||
if rescale {
|
||||
now := unversioned.NewTime(time.Now())
|
||||
hpa.Status.LastScaleTime = &now
|
||||
|
|
|
@ -62,6 +62,7 @@ type testCase struct {
|
|||
CPUTarget int
|
||||
reportedLevels []uint64
|
||||
reportedCPURequests []resource.Quantity
|
||||
cmTarget *extensions.CustomMetricTargetList
|
||||
scaleUpdated bool
|
||||
eventCreated bool
|
||||
verifyEvents bool
|
||||
|
@ -101,6 +102,14 @@ func (tc *testCase) prepareTestClient(t *testing.T) *testclient.Fake {
|
|||
if tc.CPUTarget > 0.0 {
|
||||
obj.Items[0].Spec.CPUUtilization = &extensions.CPUTargetUtilization{TargetPercentage: tc.CPUTarget}
|
||||
}
|
||||
if tc.cmTarget != nil {
|
||||
b, err := json.Marshal(tc.cmTarget)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to marshal cm: %v", err)
|
||||
}
|
||||
obj.Items[0].Annotations = make(map[string]string)
|
||||
obj.Items[0].Annotations[HpaCustomMetricsDefinitionAnnotationName] = string(b)
|
||||
}
|
||||
return true, obj, nil
|
||||
})
|
||||
|
||||
|
@ -229,6 +238,25 @@ func TestScaleUp(t *testing.T) {
|
|||
tc.runTest(t)
|
||||
}
|
||||
|
||||
func TestScaleUpCM(t *testing.T) {
|
||||
tc := testCase{
|
||||
minReplicas: 2,
|
||||
maxReplicas: 6,
|
||||
initialReplicas: 3,
|
||||
desiredReplicas: 4,
|
||||
CPUTarget: 0,
|
||||
cmTarget: &extensions.CustomMetricTargetList{
|
||||
Items: []extensions.CustomMetricTarget{{
|
||||
Name: "qps",
|
||||
TargetValue: resource.MustParse("15.0"),
|
||||
}},
|
||||
},
|
||||
reportedLevels: []uint64{20, 10, 30},
|
||||
reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
|
||||
}
|
||||
tc.runTest(t)
|
||||
}
|
||||
|
||||
func TestScaleDown(t *testing.T) {
|
||||
tc := testCase{
|
||||
minReplicas: 2,
|
||||
|
@ -242,6 +270,24 @@ func TestScaleDown(t *testing.T) {
|
|||
tc.runTest(t)
|
||||
}
|
||||
|
||||
func TestScaleDownCM(t *testing.T) {
|
||||
tc := testCase{
|
||||
minReplicas: 2,
|
||||
maxReplicas: 6,
|
||||
initialReplicas: 5,
|
||||
desiredReplicas: 3,
|
||||
CPUTarget: 0,
|
||||
cmTarget: &extensions.CustomMetricTargetList{
|
||||
Items: []extensions.CustomMetricTarget{{
|
||||
Name: "qps",
|
||||
TargetValue: resource.MustParse("20"),
|
||||
}}},
|
||||
reportedLevels: []uint64{12, 12, 12, 12, 12},
|
||||
reportedCPURequests: []resource.Quantity{resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0"), resource.MustParse("1.0")},
|
||||
}
|
||||
tc.runTest(t)
|
||||
}
|
||||
|
||||
func TestTolerance(t *testing.T) {
|
||||
tc := testCase{
|
||||
minReplicas: 1,
|
||||
|
@ -255,6 +301,23 @@ func TestTolerance(t *testing.T) {
|
|||
tc.runTest(t)
|
||||
}
|
||||
|
||||
func TestToleranceCM(t *testing.T) {
|
||||
tc := testCase{
|
||||
minReplicas: 1,
|
||||
maxReplicas: 5,
|
||||
initialReplicas: 3,
|
||||
desiredReplicas: 3,
|
||||
cmTarget: &extensions.CustomMetricTargetList{
|
||||
Items: []extensions.CustomMetricTarget{{
|
||||
Name: "qps",
|
||||
TargetValue: resource.MustParse("20"),
|
||||
}}},
|
||||
reportedLevels: []uint64{20, 21, 21},
|
||||
reportedCPURequests: []resource.Quantity{resource.MustParse("0.9"), resource.MustParse("1.0"), resource.MustParse("1.1")},
|
||||
}
|
||||
tc.runTest(t)
|
||||
}
|
||||
|
||||
func TestMinReplicas(t *testing.T) {
|
||||
tc := testCase{
|
||||
minReplicas: 2,
|
||||
|
|
Loading…
Reference in New Issue