diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 77d153787b..b08124f97a 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -272,7 +272,7 @@ func (s *CMServer) Run(_ []string) error { // if err != nil { // glog.Fatalf("Invalid API configuration: %v", err) // } - // horizontalPodAutoscalerController := autoscalercontroller.New(expClient) + // horizontalPodAutoscalerController := autoscalercontroller.New(kubeClient, expClient) // horizontalPodAutoscalerController.Run(s.NodeSyncPeriod) select {} diff --git a/pkg/client/unversioned/experimental.go b/pkg/client/unversioned/experimental.go index 396aa7ce2c..512385dd98 100644 --- a/pkg/client/unversioned/experimental.go +++ b/pkg/client/unversioned/experimental.go @@ -33,6 +33,7 @@ import ( type ExperimentalInterface interface { VersionInterface HorizontalPodAutoscalersNamespacer + ScaleNamespacer } // ExperimentalClient is used to interact with experimental Kubernetes features. @@ -75,6 +76,10 @@ func (c *ExperimentalClient) HorizontalPodAutoscalers(namespace string) Horizont return newHorizontalPodAutoscalers(c, namespace) } +func (c *ExperimentalClient) Scales(namespace string) ScaleInterface { + return newScales(c, namespace) +} + // NewExperimental creates a new ExperimentalClient for the given config. This client // provides access to experimental Kubernetes features. // Experimental features are not supported and may be changed or removed in diff --git a/pkg/client/unversioned/scale.go b/pkg/client/unversioned/scale.go new file mode 100644 index 0000000000..4a3ba9a00f --- /dev/null +++ b/pkg/client/unversioned/scale.go @@ -0,0 +1,59 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package unversioned + +import ( + "fmt" + "strings" + + "k8s.io/kubernetes/pkg/expapi" +) + +type ScaleNamespacer interface { + Scales(namespace string) ScaleInterface +} + +// ScaleInterface has methods to work with Scale (sub)resources. +type ScaleInterface interface { + Get(string, string) (*expapi.Scale, error) +} + +// horizontalPodAutoscalers implements HorizontalPodAutoscalersNamespacer interface +type scales struct { + client *ExperimentalClient + ns string +} + +// newHorizontalPodAutoscalers returns a horizontalPodAutoscalers +func newScales(c *ExperimentalClient, namespace string) *scales { + return &scales{ + client: c, + ns: namespace, + } +} + +// Get takes the reference to scale subresource and returns the subresource or error, if one occurs. +func (c *scales) Get(kind string, name string) (result *expapi.Scale, err error) { + result = &expapi.Scale{} + if strings.ToLower(kind) == "replicationcontroller" { + kind = "replicationControllers" + err = c.client.Get().Namespace(c.ns).Resource(kind).Name(name).SubResource("scale").Do().Into(result) + return + } + err = fmt.Errorf("Kind not supported: %s", kind) + return +} diff --git a/pkg/controller/autoscaler/horizontalpodautoscaler_controller.go b/pkg/controller/autoscaler/horizontalpodautoscaler_controller.go index d49d98a771..dfc2486845 100644 --- a/pkg/controller/autoscaler/horizontalpodautoscaler_controller.go +++ b/pkg/controller/autoscaler/horizontalpodautoscaler_controller.go @@ -18,23 +18,37 @@ package autoscalercontroller import ( "fmt" + "strings" "time" "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/client/unversioned" + client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/util" ) +const ( + heapsterNamespace = "kube-system" + heapsterService = "monitoring-heapster" +) + +var resourceToMetric = map[api.ResourceName]string{ + api.ResourceCPU: "cpu-usage", +} +var heapsterQueryStart, _ = time.ParseDuration("-20m") + type HorizontalPodAutoscalerController struct { - kubeClient unversioned.ExperimentalInterface + client *client.Client + expClient client.ExperimentalInterface } -func New(kubeClient unversioned.ExperimentalInterface) *HorizontalPodAutoscalerController { +func New(client *client.Client, expClient client.ExperimentalInterface) *HorizontalPodAutoscalerController { + //TODO: switch to client.Interface return &HorizontalPodAutoscalerController{ - kubeClient: kubeClient, + client: client, + expClient: expClient, } } @@ -48,11 +62,58 @@ func (a *HorizontalPodAutoscalerController) Run(syncPeriod time.Duration) { func (a *HorizontalPodAutoscalerController) reconcileAutoscalers() error { ns := api.NamespaceAll - list, err := a.kubeClient.HorizontalPodAutoscalers(ns).List(labels.Everything(), fields.Everything()) + list, err := a.expClient.HorizontalPodAutoscalers(ns).List(labels.Everything(), fields.Everything()) if err != nil { return fmt.Errorf("error listing nodes: %v", err) } - // TODO: implement! - glog.Info("autoscalers: %v", list) + for _, hpa := range list.Items { + reference := fmt.Sprintf("%s/%s/%s", hpa.Spec.ScaleRef.Kind, hpa.Spec.ScaleRef.Namespace, hpa.Spec.ScaleRef.Name) + + scale, err := a.expClient.Scales(hpa.Spec.ScaleRef.Namespace).Get(hpa.Spec.ScaleRef.Kind, hpa.Spec.ScaleRef.Name) + if err != nil { + glog.Warningf("Failed to query scale subresource for %s: %v", reference, err) + continue + } + podList, err := a.client.Pods(hpa.Spec.ScaleRef.Namespace). + List(labels.SelectorFromSet(labels.Set(scale.Status.Selector)), fields.Everything()) + + if err != nil { + glog.Warningf("Failed to get pod list for %s: %v", reference, err) + continue + } + podNames := []string{} + for _, pod := range podList.Items { + podNames = append(podNames, pod.Name) + } + + metric, metricDefined := resourceToMetric[hpa.Spec.Target.Resource] + if !metricDefined { + glog.Warningf("Heapster metric not defined for %s %v", reference, hpa.Spec.Target.Resource) + continue + } + startTime := time.Now().Add(heapsterQueryStart) + metricPath := fmt.Sprintf("/api/v1/model/namespaces/%s/pod-list/%s/metrics/%s", + hpa.Spec.ScaleRef.Namespace, + strings.Join(podNames, ","), + metric) + + resultRaw, err := a.client. + Get(). + Prefix("proxy"). + Resource("services"). + Namespace(heapsterNamespace). + Name(heapsterService). + Suffix(metricPath). + Param("start", startTime.Format(time.RFC3339)). + Do(). + Raw() + + if err != nil { + glog.Warningf("Failed to get pods metrics for %s: %v", reference, err) + continue + } + + glog.Infof("Metrics available for %s: %s", reference, string(resultRaw)) + } return nil } diff --git a/pkg/controller/autoscaler/horizontalpodautoscaler_controller_test.go b/pkg/controller/autoscaler/horizontalpodautoscaler_controller_test.go new file mode 100644 index 0000000000..b7ab76b1cf --- /dev/null +++ b/pkg/controller/autoscaler/horizontalpodautoscaler_controller_test.go @@ -0,0 +1,154 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package autoscalercontroller + +import ( + "fmt" + "net/http" + "net/http/httptest" + "testing" + + "k8s.io/kubernetes/pkg/api" + _ "k8s.io/kubernetes/pkg/api/latest" + "k8s.io/kubernetes/pkg/api/resource" + "k8s.io/kubernetes/pkg/api/testapi" + client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/expapi" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util" + + "github.com/golang/glog" +) + +const ( + namespace = api.NamespaceDefault + rcName = "app-rc" + podNameLabel = "app" + podName = "p1" +) + +var target = expapi.ResourceConsumption{Resource: api.ResourceCPU, Quantity: resource.MustParse("0.8")} + +type serverResponse struct { + statusCode int + obj interface{} +} + +func makeTestServer(t *testing.T, hpaResponse serverResponse, + scaleResponse serverResponse, podListResponse serverResponse, + heapsterResponse serverResponse) (*httptest.Server, []*util.FakeHandler) { + + handlers := []*util.FakeHandler{} + mux := http.NewServeMux() + + mkHandler := func(url string, response serverResponse) *util.FakeHandler { + handler := util.FakeHandler{ + StatusCode: response.statusCode, + ResponseBody: runtime.EncodeOrDie(testapi.Codec(), response.obj.(runtime.Object)), + } + mux.Handle(url, &handler) + glog.Infof("Will handle %s", url) + return &handler + } + + mkRawHandler := func(url string, response serverResponse) *util.FakeHandler { + handler := util.FakeHandler{ + StatusCode: response.statusCode, + ResponseBody: *response.obj.(*string), + } + mux.Handle(url, &handler) + glog.Infof("Will handle %s", url) + return &handler + } + + handlers = append(handlers, mkHandler("/experimental/v1/horizontalpodautoscalers", hpaResponse)) + handlers = append(handlers, mkHandler( + fmt.Sprintf("/experimental/v1/namespaces/%s/replicationcontrollers/%s/scale", namespace, rcName), scaleResponse)) + handlers = append(handlers, mkHandler(fmt.Sprintf("/api/v1/namespaces/%s/pods", namespace), podListResponse)) + handlers = append(handlers, mkRawHandler( + fmt.Sprintf("/api/v1/proxy/namespaces/kube-system/services/monitoring-heapster/api/v1/model/namespaces/%s/pod-list/%s/metrics/cpu-usage", + namespace, podName), heapsterResponse)) + + mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) { + t.Errorf("unexpected request: %v", req.RequestURI) + res.WriteHeader(http.StatusNotFound) + }) + return httptest.NewServer(mux), handlers +} + +func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) { + + hpaResponse := serverResponse{http.StatusOK, &expapi.HorizontalPodAutoscalerList{ + Items: []expapi.HorizontalPodAutoscaler{ + { + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: namespace, + }, + Spec: expapi.HorizontalPodAutoscalerSpec{ + ScaleRef: &expapi.SubresourceReference{ + Kind: "replicationController", + Name: rcName, + Namespace: namespace, + Subresource: "scale", + }, + MinCount: 1, + MaxCount: 5, + Target: target, + }, + }}}} + + scaleResponse := serverResponse{http.StatusOK, &expapi.Scale{ + ObjectMeta: api.ObjectMeta{ + Name: "rcName", + Namespace: namespace, + }, + Spec: expapi.ScaleSpec{ + Replicas: 5, + }, + Status: expapi.ScaleStatus{ + Replicas: 2, + Selector: map[string]string{"name": podNameLabel}, + }, + }} + + podListResponse := serverResponse{http.StatusOK, &api.PodList{ + Items: []api.Pod{ + { + ObjectMeta: api.ObjectMeta{ + Name: podName, + Namespace: namespace, + }, + }}}} + + heapsterRawResponse := "UPADTE ME" + heapsterResponse := serverResponse{http.StatusOK, &heapsterRawResponse} + + testServer, handlers := makeTestServer(t, hpaResponse, scaleResponse, podListResponse, heapsterResponse) + defer testServer.Close() + kubeClient := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) + expClient := client.NewExperimentalOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) + + hpaController := New(kubeClient, expClient) + err := hpaController.reconcileAutoscalers() + if err != nil { + t.Fatal("Failed to reconcile %v", err) + } + for _, h := range handlers { + h.ValidateRequestCount(t, 1) + } +}