Enable specifying scheme/port for metrics client

pull/6/head
Jordan Liggitt 2015-11-06 12:55:31 -05:00
parent 65c285f5ab
commit 165d7d5030
8 changed files with 45 additions and 12 deletions

View File

@ -342,7 +342,7 @@ func (s *CMServer) Run(_ []string) error {
glog.Infof("Starting %s apis", groupVersion)
if containsResource(resources, "horizontalpodautoscalers") {
glog.Infof("Starting horizontal pod controller.")
metricsClient := metrics.NewHeapsterMetricsClient(kubeClient, metrics.DefaultHeapsterNamespace, metrics.DefaultHeapsterService)
metricsClient := metrics.NewHeapsterMetricsClient(kubeClient, metrics.DefaultHeapsterScheme, metrics.DefaultHeapsterNamespace, metrics.DefaultHeapsterPort, metrics.DefaultHeapsterService)
podautoscaler.NewHorizontalController(kubeClient, metricsClient).
Run(s.HorizontalPodAutoscalerSyncPeriod)
}

View File

@ -20,6 +20,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/watch"
)
@ -36,7 +37,7 @@ type ServiceInterface interface {
Update(srv *api.Service) (*api.Service, error)
Delete(name string) error
Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error)
ProxyGet(name, path string, params map[string]string) ResponseWrapper
ProxyGet(scheme, name, port, path string, params map[string]string) ResponseWrapper
}
// services implements ServicesNamespacer interface
@ -102,12 +103,12 @@ func (c *services) Watch(label labels.Selector, field fields.Selector, opts api.
}
// ProxyGet returns a response of the service by calling it through the proxy.
func (c *services) ProxyGet(name, path string, params map[string]string) ResponseWrapper {
func (c *services) ProxyGet(scheme, name, port, path string, params map[string]string) ResponseWrapper {
request := c.r.Get().
Prefix("proxy").
Namespace(c.ns).
Resource("services").
Name(name).
Name(util.JoinSchemeNamePort(scheme, name, port)).
Suffix(path)
for k, v := range params {
request = request.Param(k, v)

View File

@ -166,6 +166,18 @@ func TestServiceProxyGet(t *testing.T) {
},
Response: Response{StatusCode: 200, RawBody: &body},
}
response, err := c.Setup(t).Services(ns).ProxyGet("service-1", "foo", map[string]string{"param-name": "param-value"}).DoRaw()
response, err := c.Setup(t).Services(ns).ProxyGet("", "service-1", "", "foo", map[string]string{"param-name": "param-value"}).DoRaw()
c.ValidateRaw(t, response, err)
// With scheme and port specified
c = &testClient{
Request: testRequest{
Method: "GET",
Path: testapi.Default.ResourcePathWithPrefix("proxy", "services", ns, "https:service-1:my-port") + "/foo",
Query: buildQueryValues(url.Values{"param-name": []string{"param-value"}}),
},
Response: Response{StatusCode: 200, RawBody: &body},
}
response, err = c.Setup(t).Services(ns).ProxyGet("https", "service-1", "my-port", "foo", map[string]string{"param-name": "param-value"}).DoRaw()
c.ValidateRaw(t, response, err)
}

View File

@ -169,12 +169,14 @@ func NewWatchAction(resource, namespace string, label labels.Selector, field fie
return action
}
func NewProxyGetAction(resource, namespace, name, path string, params map[string]string) ProxyGetActionImpl {
func NewProxyGetAction(resource, namespace, scheme, name, port, path string, params map[string]string) ProxyGetActionImpl {
action := ProxyGetActionImpl{}
action.Verb = "get"
action.Resource = resource
action.Namespace = namespace
action.Scheme = scheme
action.Name = name
action.Port = port
action.Path = path
action.Params = params
return action
@ -235,7 +237,9 @@ type WatchAction interface {
type ProxyGetAction interface {
Action
GetScheme() string
GetName() string
GetPort() string
GetPath() string
GetParams() map[string]string
}
@ -338,15 +342,25 @@ func (a WatchActionImpl) GetWatchRestrictions() WatchRestrictions {
type ProxyGetActionImpl struct {
ActionImpl
Scheme string
Name string
Port string
Path string
Params map[string]string
}
func (a ProxyGetActionImpl) GetScheme() string {
return a.Scheme
}
func (a ProxyGetActionImpl) GetName() string {
return a.Name
}
func (a ProxyGetActionImpl) GetPort() string {
return a.Port
}
func (a ProxyGetActionImpl) GetPath() string {
return a.Path
}

View File

@ -76,6 +76,6 @@ func (c *FakeServices) Watch(label labels.Selector, field fields.Selector, opts
return c.Fake.InvokesWatch(NewWatchAction("services", c.Namespace, label, field, opts))
}
func (c *FakeServices) ProxyGet(name, path string, params map[string]string) unversioned.ResponseWrapper {
return c.Fake.InvokesProxy(NewProxyGetAction("services", c.Namespace, name, path, params))
func (c *FakeServices) ProxyGet(scheme, name, port, path string, params map[string]string) unversioned.ResponseWrapper {
return c.Fake.InvokesProxy(NewProxyGetAction("services", c.Namespace, scheme, name, port, path, params))
}

View File

@ -205,7 +205,7 @@ func (tc *testCase) verifyResults(t *testing.T) {
func (tc *testCase) runTest(t *testing.T) {
testClient := tc.prepareTestClient(t)
metricsClient := metrics.NewHeapsterMetricsClient(testClient, metrics.DefaultHeapsterNamespace, metrics.DefaultHeapsterService)
metricsClient := metrics.NewHeapsterMetricsClient(testClient, metrics.DefaultHeapsterNamespace, metrics.DefaultHeapsterScheme, metrics.DefaultHeapsterService, metrics.DefaultHeapsterPort)
hpaController := NewHorizontalController(testClient, metricsClient)
err := hpaController.reconcileAutoscalers()
assert.Equal(t, nil, err)

View File

@ -34,7 +34,9 @@ import (
const (
DefaultHeapsterNamespace = "kube-system"
DefaultHeapsterScheme = "http"
DefaultHeapsterService = "heapster"
DefaultHeapsterPort = "" // use the first exposed port on the service
)
var heapsterQueryStart = -5 * time.Minute
@ -67,7 +69,9 @@ type HeapsterMetricsClient struct {
client client.Interface
resourceDefinitions map[api.ResourceName]metricDefinition
heapsterNamespace string
heapsterScheme string
heapsterService string
heapsterPort string
}
var heapsterMetricDefinitions = map[api.ResourceName]metricDefinition{
@ -93,12 +97,14 @@ var heapsterMetricDefinitions = map[api.ResourceName]metricDefinition{
}
// NewHeapsterMetricsClient returns a new instance of Heapster-based implementation of MetricsClient interface.
func NewHeapsterMetricsClient(client client.Interface, namespace, service string) *HeapsterMetricsClient {
func NewHeapsterMetricsClient(client client.Interface, namespace, scheme, service, port string) *HeapsterMetricsClient {
return &HeapsterMetricsClient{
client: client,
resourceDefinitions: heapsterMetricDefinitions,
heapsterNamespace: namespace,
heapsterScheme: scheme,
heapsterService: service,
heapsterPort: port,
}
}
@ -160,7 +166,7 @@ func (h *HeapsterMetricsClient) getForPods(resourceName api.ResourceName, namesp
metricSpec.name)
resultRaw, err := h.client.Services(h.heapsterNamespace).
ProxyGet(h.heapsterService, metricPath, map[string]string{"start": startTime.Format(time.RFC3339)}).
ProxyGet(h.heapsterScheme, h.heapsterService, h.heapsterPort, metricPath, map[string]string{"start": startTime.Format(time.RFC3339)}).
DoRaw()
if err != nil {

View File

@ -148,7 +148,7 @@ func (tc *testCase) verifyResults(t *testing.T, val *ResourceConsumption, err er
func (tc *testCase) runTest(t *testing.T) {
testClient := tc.prepareTestClient(t)
metricsClient := NewHeapsterMetricsClient(testClient, DefaultHeapsterNamespace, DefaultHeapsterService)
metricsClient := NewHeapsterMetricsClient(testClient, DefaultHeapsterNamespace, DefaultHeapsterScheme, DefaultHeapsterService, DefaultHeapsterPort)
val, _, err := metricsClient.GetResourceConsumptionAndRequest(tc.targetResource, tc.namespace, tc.selector)
tc.verifyResults(t, val, err)
}