MetricsClient for HorizontalPodAutoscaler

pull/6/head
Marcin Wielgus 2015-08-28 12:24:00 +02:00
parent cb2252b57f
commit 064b09ff0b
5 changed files with 350 additions and 160 deletions

View File

@ -36,6 +36,7 @@ import (
clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller/autoscaler"
"k8s.io/kubernetes/pkg/controller/autoscaler/metrics"
"k8s.io/kubernetes/pkg/controller/endpoint"
"k8s.io/kubernetes/pkg/controller/namespace"
"k8s.io/kubernetes/pkg/controller/node"
@ -276,7 +277,8 @@ func (s *CMServer) Run(_ []string) error {
if err != nil {
glog.Fatalf("Invalid API configuration: %v", err)
}
horizontalPodAutoscalerController := autoscalercontroller.New(kubeClient, expClient)
horizontalPodAutoscalerController := autoscalercontroller.New(kubeClient, expClient,
metrics.NewHeapsterMetricsClient(kubeClient))
horizontalPodAutoscalerController.Run(s.HorizontalPodAutoscalerSyncPeriod)
}

View File

@ -17,64 +17,33 @@ limitations under the License.
package autoscalercontroller
import (
"encoding/json"
"fmt"
"strings"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller/autoscaler/metrics"
"k8s.io/kubernetes/pkg/expapi"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util"
heapster "k8s.io/heapster/api/v1/types"
)
const (
heapsterNamespace = "kube-system"
heapsterService = "monitoring-heapster"
)
type HorizontalPodAutoscalerController struct {
client client.Interface
expClient client.ExperimentalInterface
client client.Interface
expClient client.ExperimentalInterface
metricsClient metrics.MetricsClient
}
// Aggregates results into ResourceConsumption. Also returns number of
// pods included in the aggregation.
type metricAggregator func(heapster.MetricResultList) (expapi.ResourceConsumption, int)
type metricDefinition struct {
name string
aggregator metricAggregator
}
var resourceDefinitions = map[api.ResourceName]metricDefinition{
//TODO: add memory
api.ResourceCPU: {"cpu-usage",
func(metrics heapster.MetricResultList) (expapi.ResourceConsumption, int) {
sum, count := calculateSumFromLatestSample(metrics)
value := "0"
if count > 0 {
// assumes that cpu usage is in millis
value = fmt.Sprintf("%dm", sum/uint64(count))
}
return expapi.ResourceConsumption{Resource: api.ResourceCPU, Quantity: resource.MustParse(value)}, count
}},
}
var heapsterQueryStart, _ = time.ParseDuration("-5m")
var downscaleForbiddenWindow, _ = time.ParseDuration("20m")
var upscaleForbiddenWindow, _ = time.ParseDuration("3m")
func New(client client.Interface, expClient client.ExperimentalInterface) *HorizontalPodAutoscalerController {
func New(client client.Interface, expClient client.ExperimentalInterface, metricsClient metrics.MetricsClient) *HorizontalPodAutoscalerController {
return &HorizontalPodAutoscalerController{
client: client,
expClient: expClient,
client: client,
expClient: expClient,
metricsClient: metricsClient,
}
}
@ -100,57 +69,18 @@ func (a *HorizontalPodAutoscalerController) reconcileAutoscalers() error {
glog.Warningf("Failed to query scale subresource for %s: %v", reference, err)
continue
}
podList, err := a.client.Pods(hpa.Spec.ScaleRef.Namespace).
List(labels.SelectorFromSet(labels.Set(scale.Status.Selector)), fields.Everything())
currentReplicas := scale.Status.Replicas
currentConsumption, err := a.metricsClient.ResourceConsumption(hpa.Spec.ScaleRef.Namespace).Get(hpa.Spec.Target.Resource,
scale.Status.Selector)
// TODO: what to do on partial errors (like metrics obtained for 75% of pods).
if err != nil {
glog.Warningf("Failed to get pod list for %s: %v", reference, err)
continue
}
podNames := []string{}
for _, pod := range podList.Items {
podNames = append(podNames, pod.Name)
}
metricSpec, metricDefined := resourceDefinitions[hpa.Spec.Target.Resource]
if !metricDefined {
glog.Warningf("Heapster metric not defined for %s %v", reference, hpa.Spec.Target.Resource)
continue
}
now := time.Now()
startTime := now.Add(heapsterQueryStart)
metricPath := fmt.Sprintf("/api/v1/model/namespaces/%s/pod-list/%s/metrics/%s",
hpa.Spec.ScaleRef.Namespace,
strings.Join(podNames, ","),
metricSpec.name)
resultRaw, err := a.client.Services(heapsterNamespace).
ProxyGet(heapsterService, metricPath, map[string]string{"start": startTime.Format(time.RFC3339)}).
DoRaw()
if err != nil {
glog.Warningf("Failed to get pods metrics for %s: %v", reference, err)
continue
}
var metrics heapster.MetricResultList
err = json.Unmarshal(resultRaw, &metrics)
if err != nil {
glog.Warningf("Failed to unmarshall heapster response: %v", err)
continue
}
glog.Infof("Metrics available for %s: %s", reference, string(resultRaw))
currentConsumption, count := metricSpec.aggregator(metrics)
if count != len(podList.Items) {
glog.Warningf("Metrics obtained for %d/%d of pods", count, len(podList.Items))
glog.Warningf("Error while getting metrics for %s: %v", reference, err)
continue
}
// if the ratio is 1.2 we want to have 2 replicas
desiredReplicas := 1 + int((currentConsumption.Quantity.MilliValue()*int64(count))/hpa.Spec.Target.Quantity.MilliValue())
desiredReplicas := 1 + int((currentConsumption.Quantity.MilliValue()*int64(currentReplicas))/hpa.Spec.Target.Quantity.MilliValue())
if desiredReplicas < hpa.Spec.MinCount {
desiredReplicas = hpa.Spec.MinCount
@ -158,18 +88,17 @@ func (a *HorizontalPodAutoscalerController) reconcileAutoscalers() error {
if desiredReplicas > hpa.Spec.MaxCount {
desiredReplicas = hpa.Spec.MaxCount
}
now := time.Now()
rescale := false
if desiredReplicas != count {
if desiredReplicas != currentReplicas {
// Going down
if desiredReplicas < count && (hpa.Status == nil || hpa.Status.LastScaleTimestamp == nil ||
if desiredReplicas < currentReplicas && (hpa.Status == nil || hpa.Status.LastScaleTimestamp == nil ||
hpa.Status.LastScaleTimestamp.Add(downscaleForbiddenWindow).Before(now)) {
rescale = true
}
// Going up
if desiredReplicas > count && (hpa.Status == nil || hpa.Status.LastScaleTimestamp == nil ||
if desiredReplicas > currentReplicas && (hpa.Status == nil || hpa.Status.LastScaleTimestamp == nil ||
hpa.Status.LastScaleTimestamp.Add(upscaleForbiddenWindow).Before(now)) {
rescale = true
}
@ -185,9 +114,9 @@ func (a *HorizontalPodAutoscalerController) reconcileAutoscalers() error {
}
status := expapi.HorizontalPodAutoscalerStatus{
CurrentReplicas: count,
CurrentReplicas: currentReplicas,
DesiredReplicas: desiredReplicas,
CurrentConsumption: &currentConsumption,
CurrentConsumption: currentConsumption,
}
hpa.Status = &status
if rescale {
@ -203,22 +132,3 @@ func (a *HorizontalPodAutoscalerController) reconcileAutoscalers() error {
}
return nil
}
func calculateSumFromLatestSample(metrics heapster.MetricResultList) (uint64, int) {
sum := uint64(0)
count := 0
for _, metrics := range metrics.Items {
var newest *heapster.MetricPoint
newest = nil
for _, metricPoint := range metrics.Metrics {
if newest == nil || newest.Timestamp.Before(metricPoint.Timestamp) {
newest = &metricPoint
}
}
if newest != nil {
sum += newest.Value
count++
}
}
return sum, count
}

View File

@ -17,39 +17,33 @@ limitations under the License.
package autoscalercontroller
import (
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
_ "k8s.io/kubernetes/pkg/api/latest"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/testapi"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller/autoscaler/metrics"
"k8s.io/kubernetes/pkg/expapi"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
"github.com/golang/glog"
"github.com/stretchr/testify/assert"
heapster "k8s.io/heapster/api/v1/types"
)
const (
namespace = api.NamespaceDefault
rcName = "app-rc"
podNameLabel = "app"
podName = "p1"
hpaName = "foo"
hpaListHandler = "HpaList"
scaleHandler = "Scale"
podListHandler = "PodList"
heapsterHandler = "Heapster"
updateHpaHandler = "HpaUpdate"
)
@ -58,6 +52,26 @@ type serverResponse struct {
obj interface{}
}
type fakeMetricsClient struct {
consumption metrics.ResourceConsumptionClient
}
type fakeResourceConsumptionClient struct {
metrics map[api.ResourceName]expapi.ResourceConsumption
}
func (f *fakeMetricsClient) ResourceConsumption(namespace string) metrics.ResourceConsumptionClient {
return f.consumption
}
func (f *fakeResourceConsumptionClient) Get(resource api.ResourceName, selector map[string]string) (*expapi.ResourceConsumption, error) {
consumption, found := f.metrics[resource]
if !found {
return nil, fmt.Errorf("resource not found: %v", resource)
}
return &consumption, nil
}
func makeTestServer(t *testing.T, responses map[string]*serverResponse) (*httptest.Server, map[string]*util.FakeHandler) {
handlers := map[string]*util.FakeHandler{}
@ -73,16 +87,6 @@ func makeTestServer(t *testing.T, responses map[string]*serverResponse) (*httpte
return &handler
}
mkRawHandler := func(url string, response serverResponse) *util.FakeHandler {
handler := util.FakeHandler{
StatusCode: response.statusCode,
ResponseBody: *response.obj.(*string),
}
mux.Handle(url, &handler)
glog.Infof("Will handle %s", url)
return &handler
}
if responses[hpaListHandler] != nil {
handlers[hpaListHandler] = mkHandler("/experimental/v1/horizontalpodautoscalers", *responses[hpaListHandler])
}
@ -92,16 +96,6 @@ func makeTestServer(t *testing.T, responses map[string]*serverResponse) (*httpte
fmt.Sprintf("/experimental/v1/namespaces/%s/replicationcontrollers/%s/scale", namespace, rcName), *responses[scaleHandler])
}
if responses[podListHandler] != nil {
handlers[podListHandler] = mkHandler(fmt.Sprintf("/api/v1/namespaces/%s/pods", namespace), *responses[podListHandler])
}
if responses[heapsterHandler] != nil {
handlers[heapsterHandler] = mkRawHandler(
fmt.Sprintf("/api/v1/proxy/namespaces/kube-system/services/monitoring-heapster/api/v1/model/namespaces/%s/pod-list/%s/metrics/cpu-usage",
namespace, podName), *responses[heapsterHandler])
}
if responses[updateHpaHandler] != nil {
handlers[updateHpaHandler] = mkHandler(fmt.Sprintf("/experimental/v1/namespaces/%s/horizontalpodautoscalers/%s", namespace, hpaName),
*responses[updateHpaHandler])
@ -150,21 +144,6 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
},
}}
podListResponse := serverResponse{http.StatusOK, &api.PodList{
Items: []api.Pod{
{
ObjectMeta: api.ObjectMeta{
Name: podName,
Namespace: namespace,
},
}}}}
timestamp := time.Now()
metrics := heapster.MetricResultList{
Items: []heapster.MetricResult{{
Metrics: []heapster.MetricPoint{{timestamp, 650}},
LatestTimestamp: timestamp,
}}}
status := expapi.HorizontalPodAutoscalerStatus{
CurrentReplicas: 1,
DesiredReplicas: 3,
@ -189,16 +168,10 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
Status: &status,
}}
heapsterRawResponse, _ := json.Marshal(&metrics)
heapsterStrResponse := string(heapsterRawResponse)
heapsterResponse := serverResponse{http.StatusOK, &heapsterStrResponse}
testServer, handlers := makeTestServer(t,
map[string]*serverResponse{
hpaListHandler: &hpaResponse,
scaleHandler: &scaleResponse,
podListHandler: &podListResponse,
heapsterHandler: &heapsterResponse,
updateHpaHandler: &updateHpaResponse,
})
@ -206,7 +179,13 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
kubeClient := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
expClient := client.NewExperimentalOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
hpaController := New(kubeClient, expClient)
fakeRC := fakeResourceConsumptionClient{metrics: map[api.ResourceName]expapi.ResourceConsumption{
api.ResourceCPU: {Resource: api.ResourceCPU, Quantity: resource.MustParse("650m")},
}}
fake := fakeMetricsClient{consumption: &fakeRC}
hpaController := New(kubeClient, expClient, &fake)
err := hpaController.reconcileAutoscalers()
if err != nil {
t.Fatal("Failed to reconcile: %v", err)

View File

@ -0,0 +1,168 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"encoding/json"
"fmt"
"strings"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/expapi"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
heapster "k8s.io/heapster/api/v1/types"
)
const (
heapsterNamespace = "kube-system"
heapsterService = "monitoring-heapster"
)
var heapsterQueryStart, _ = time.ParseDuration("-5m")
// An interface for getting metrics for pods.
type MetricsClient interface {
ResourceConsumption(namespace string) ResourceConsumptionClient
}
type ResourceConsumptionClient interface {
// Gets average resource consumption for pods under the given selector.
Get(resourceName api.ResourceName, selector map[string]string) (*expapi.ResourceConsumption, error)
}
// Aggregates results into ResourceConsumption. Also returns number of
// pods included in the aggregation.
type metricAggregator func(heapster.MetricResultList) (expapi.ResourceConsumption, int)
type metricDefinition struct {
name string
aggregator metricAggregator
}
// Heapster-based implementation of MetricsClient
type HeapsterMetricsClient struct {
client client.Interface
}
type HeapsterResourceConsumptionClient struct {
namespace string
client client.Interface
resourceDefinitions map[api.ResourceName]metricDefinition
}
func NewHeapsterMetricsClient(client client.Interface) *HeapsterMetricsClient {
return &HeapsterMetricsClient{client: client}
}
var heapsterMetricDefinitions = map[api.ResourceName]metricDefinition{
//TODO: add memory
api.ResourceCPU: {"cpu-usage",
func(metrics heapster.MetricResultList) (expapi.ResourceConsumption, int) {
sum, count := calculateSumFromLatestSample(metrics)
value := "0"
if count > 0 {
// assumes that cpu usage is in millis
value = fmt.Sprintf("%dm", sum/uint64(count))
}
return expapi.ResourceConsumption{Resource: api.ResourceCPU, Quantity: resource.MustParse(value)}, count
}},
}
func (h *HeapsterMetricsClient) ResourceConsumption(namespace string) ResourceConsumptionClient {
return &HeapsterResourceConsumptionClient{
namespace: namespace,
client: h.client,
resourceDefinitions: heapsterMetricDefinitions,
}
}
func (h *HeapsterResourceConsumptionClient) Get(resourceName api.ResourceName, selector map[string]string) (*expapi.ResourceConsumption, error) {
podList, err := h.client.Pods(h.namespace).
List(labels.SelectorFromSet(labels.Set(selector)), fields.Everything())
if err != nil {
return nil, fmt.Errorf("failed to get pod list: %v", err)
}
podNames := []string{}
for _, pod := range podList.Items {
podNames = append(podNames, pod.Name)
}
return h.getForPods(resourceName, podNames)
}
func (h *HeapsterResourceConsumptionClient) getForPods(resourceName api.ResourceName, podNames []string) (*expapi.ResourceConsumption, error) {
metricSpec, metricDefined := h.resourceDefinitions[resourceName]
if !metricDefined {
return nil, fmt.Errorf("heapster metric not defined for %v", resourceName)
}
now := time.Now()
startTime := now.Add(heapsterQueryStart)
metricPath := fmt.Sprintf("/api/v1/model/namespaces/%s/pod-list/%s/metrics/%s",
h.namespace,
strings.Join(podNames, ","),
metricSpec.name)
resultRaw, err := h.client.Services(heapsterNamespace).
ProxyGet(heapsterService, metricPath, map[string]string{"start": startTime.Format(time.RFC3339)}).
DoRaw()
if err != nil {
return nil, fmt.Errorf("failed to get pods metrics: %v", err)
}
var metrics heapster.MetricResultList
err = json.Unmarshal(resultRaw, &metrics)
if err != nil {
return nil, fmt.Errorf("failed to unmarshall heapster response: %v", err)
}
glog.Infof("Metrics available: %s", string(resultRaw))
currentConsumption, count := metricSpec.aggregator(metrics)
if count != len(podNames) {
return nil, fmt.Errorf("metrics obtained for %d/%d of pods", count, len(podNames))
}
return &currentConsumption, nil
}
func calculateSumFromLatestSample(metrics heapster.MetricResultList) (uint64, int) {
sum := uint64(0)
count := 0
for _, metrics := range metrics.Items {
var newest *heapster.MetricPoint
newest = nil
for _, metricPoint := range metrics.Metrics {
if newest == nil || newest.Timestamp.Before(metricPoint.Timestamp) {
newest = &metricPoint
}
}
if newest != nil {
sum += newest.Value
count++
}
}
return sum, count
}

View File

@ -0,0 +1,131 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
_ "k8s.io/kubernetes/pkg/api/latest"
"k8s.io/kubernetes/pkg/api/testapi"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
heapster "k8s.io/heapster/api/v1/types"
"github.com/golang/glog"
"github.com/stretchr/testify/assert"
)
const (
namespace = "test-namespace"
podName = "pod1"
podListHandler = "podlisthandler"
heapsterHandler = "heapsterhandler"
)
type serverResponse struct {
statusCode int
obj interface{}
}
func makeTestServer(t *testing.T, responses map[string]*serverResponse) (*httptest.Server, map[string]*util.FakeHandler) {
handlers := map[string]*util.FakeHandler{}
mux := http.NewServeMux()
mkHandler := func(url string, response serverResponse) *util.FakeHandler {
handler := util.FakeHandler{
StatusCode: response.statusCode,
ResponseBody: runtime.EncodeOrDie(testapi.Codec(), response.obj.(runtime.Object)),
}
mux.Handle(url, &handler)
glog.Infof("Will handle %s", url)
return &handler
}
mkRawHandler := func(url string, response serverResponse) *util.FakeHandler {
handler := util.FakeHandler{
StatusCode: response.statusCode,
ResponseBody: *response.obj.(*string),
}
mux.Handle(url, &handler)
glog.Infof("Will handle %s", url)
return &handler
}
if responses[podListHandler] != nil {
handlers[podListHandler] = mkHandler(fmt.Sprintf("/api/v1/namespaces/%s/pods", namespace), *responses[podListHandler])
}
if responses[heapsterHandler] != nil {
handlers[heapsterHandler] = mkRawHandler(
fmt.Sprintf("/api/v1/proxy/namespaces/kube-system/services/monitoring-heapster/api/v1/model/namespaces/%s/pod-list/%s/metrics/cpu-usage",
namespace, podName), *responses[heapsterHandler])
}
mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) {
t.Errorf("unexpected request: %v", req.RequestURI)
res.WriteHeader(http.StatusNotFound)
})
return httptest.NewServer(mux), handlers
}
func TestHeapsterResourceConsumptionGet(t *testing.T) {
podListResponse := serverResponse{http.StatusOK, &api.PodList{
Items: []api.Pod{
{
ObjectMeta: api.ObjectMeta{
Name: podName,
Namespace: namespace,
},
}}}}
timestamp := time.Now()
metrics := heapster.MetricResultList{
Items: []heapster.MetricResult{{
Metrics: []heapster.MetricPoint{{timestamp, 650}},
LatestTimestamp: timestamp,
}}}
heapsterRawResponse, _ := json.Marshal(&metrics)
heapsterStrResponse := string(heapsterRawResponse)
heapsterResponse := serverResponse{http.StatusOK, &heapsterStrResponse}
testServer, _ := makeTestServer(t,
map[string]*serverResponse{
heapsterHandler: &heapsterResponse,
podListHandler: &podListResponse,
})
defer testServer.Close()
kubeClient := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
metricsClient := NewHeapsterMetricsClient(kubeClient)
val, err := metricsClient.ResourceConsumption(namespace).Get(api.ResourceCPU, map[string]string{"app": "test"})
if err != nil {
t.Fatalf("Error while getting consumption: %v", err)
}
assert.Equal(t, int64(650), val.Quantity.MilliValue())
}