mirror of https://github.com/k3s-io/k3s
commit
849280f18d
|
@ -342,7 +342,7 @@ func (s *CMServer) Run(_ []string) error {
|
||||||
glog.Infof("Starting %s apis", groupVersion)
|
glog.Infof("Starting %s apis", groupVersion)
|
||||||
if containsResource(resources, "horizontalpodautoscalers") {
|
if containsResource(resources, "horizontalpodautoscalers") {
|
||||||
glog.Infof("Starting horizontal pod controller.")
|
glog.Infof("Starting horizontal pod controller.")
|
||||||
metricsClient := metrics.NewHeapsterMetricsClient(kubeClient, metrics.DefaultHeapsterNamespace, metrics.DefaultHeapsterService)
|
metricsClient := metrics.NewHeapsterMetricsClient(kubeClient, metrics.DefaultHeapsterScheme, metrics.DefaultHeapsterNamespace, metrics.DefaultHeapsterPort, metrics.DefaultHeapsterService)
|
||||||
podautoscaler.NewHorizontalController(kubeClient, metricsClient).
|
podautoscaler.NewHorizontalController(kubeClient, metricsClient).
|
||||||
Run(s.HorizontalPodAutoscalerSyncPeriod)
|
Run(s.HorizontalPodAutoscalerSyncPeriod)
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,7 @@ import (
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/fields"
|
"k8s.io/kubernetes/pkg/fields"
|
||||||
"k8s.io/kubernetes/pkg/labels"
|
"k8s.io/kubernetes/pkg/labels"
|
||||||
|
"k8s.io/kubernetes/pkg/util"
|
||||||
"k8s.io/kubernetes/pkg/watch"
|
"k8s.io/kubernetes/pkg/watch"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -36,7 +37,7 @@ type ServiceInterface interface {
|
||||||
Update(srv *api.Service) (*api.Service, error)
|
Update(srv *api.Service) (*api.Service, error)
|
||||||
Delete(name string) error
|
Delete(name string) error
|
||||||
Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error)
|
Watch(label labels.Selector, field fields.Selector, opts api.ListOptions) (watch.Interface, error)
|
||||||
ProxyGet(name, path string, params map[string]string) ResponseWrapper
|
ProxyGet(scheme, name, port, path string, params map[string]string) ResponseWrapper
|
||||||
}
|
}
|
||||||
|
|
||||||
// services implements ServicesNamespacer interface
|
// services implements ServicesNamespacer interface
|
||||||
|
@ -102,12 +103,12 @@ func (c *services) Watch(label labels.Selector, field fields.Selector, opts api.
|
||||||
}
|
}
|
||||||
|
|
||||||
// ProxyGet returns a response of the service by calling it through the proxy.
|
// ProxyGet returns a response of the service by calling it through the proxy.
|
||||||
func (c *services) ProxyGet(name, path string, params map[string]string) ResponseWrapper {
|
func (c *services) ProxyGet(scheme, name, port, path string, params map[string]string) ResponseWrapper {
|
||||||
request := c.r.Get().
|
request := c.r.Get().
|
||||||
Prefix("proxy").
|
Prefix("proxy").
|
||||||
Namespace(c.ns).
|
Namespace(c.ns).
|
||||||
Resource("services").
|
Resource("services").
|
||||||
Name(name).
|
Name(util.JoinSchemeNamePort(scheme, name, port)).
|
||||||
Suffix(path)
|
Suffix(path)
|
||||||
for k, v := range params {
|
for k, v := range params {
|
||||||
request = request.Param(k, v)
|
request = request.Param(k, v)
|
||||||
|
|
|
@ -166,6 +166,18 @@ func TestServiceProxyGet(t *testing.T) {
|
||||||
},
|
},
|
||||||
Response: Response{StatusCode: 200, RawBody: &body},
|
Response: Response{StatusCode: 200, RawBody: &body},
|
||||||
}
|
}
|
||||||
response, err := c.Setup(t).Services(ns).ProxyGet("service-1", "foo", map[string]string{"param-name": "param-value"}).DoRaw()
|
response, err := c.Setup(t).Services(ns).ProxyGet("", "service-1", "", "foo", map[string]string{"param-name": "param-value"}).DoRaw()
|
||||||
|
c.ValidateRaw(t, response, err)
|
||||||
|
|
||||||
|
// With scheme and port specified
|
||||||
|
c = &testClient{
|
||||||
|
Request: testRequest{
|
||||||
|
Method: "GET",
|
||||||
|
Path: testapi.Default.ResourcePathWithPrefix("proxy", "services", ns, "https:service-1:my-port") + "/foo",
|
||||||
|
Query: buildQueryValues(url.Values{"param-name": []string{"param-value"}}),
|
||||||
|
},
|
||||||
|
Response: Response{StatusCode: 200, RawBody: &body},
|
||||||
|
}
|
||||||
|
response, err = c.Setup(t).Services(ns).ProxyGet("https", "service-1", "my-port", "foo", map[string]string{"param-name": "param-value"}).DoRaw()
|
||||||
c.ValidateRaw(t, response, err)
|
c.ValidateRaw(t, response, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -169,12 +169,14 @@ func NewWatchAction(resource, namespace string, label labels.Selector, field fie
|
||||||
return action
|
return action
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewProxyGetAction(resource, namespace, name, path string, params map[string]string) ProxyGetActionImpl {
|
func NewProxyGetAction(resource, namespace, scheme, name, port, path string, params map[string]string) ProxyGetActionImpl {
|
||||||
action := ProxyGetActionImpl{}
|
action := ProxyGetActionImpl{}
|
||||||
action.Verb = "get"
|
action.Verb = "get"
|
||||||
action.Resource = resource
|
action.Resource = resource
|
||||||
action.Namespace = namespace
|
action.Namespace = namespace
|
||||||
|
action.Scheme = scheme
|
||||||
action.Name = name
|
action.Name = name
|
||||||
|
action.Port = port
|
||||||
action.Path = path
|
action.Path = path
|
||||||
action.Params = params
|
action.Params = params
|
||||||
return action
|
return action
|
||||||
|
@ -235,7 +237,9 @@ type WatchAction interface {
|
||||||
|
|
||||||
type ProxyGetAction interface {
|
type ProxyGetAction interface {
|
||||||
Action
|
Action
|
||||||
|
GetScheme() string
|
||||||
GetName() string
|
GetName() string
|
||||||
|
GetPort() string
|
||||||
GetPath() string
|
GetPath() string
|
||||||
GetParams() map[string]string
|
GetParams() map[string]string
|
||||||
}
|
}
|
||||||
|
@ -338,15 +342,25 @@ func (a WatchActionImpl) GetWatchRestrictions() WatchRestrictions {
|
||||||
|
|
||||||
type ProxyGetActionImpl struct {
|
type ProxyGetActionImpl struct {
|
||||||
ActionImpl
|
ActionImpl
|
||||||
|
Scheme string
|
||||||
Name string
|
Name string
|
||||||
|
Port string
|
||||||
Path string
|
Path string
|
||||||
Params map[string]string
|
Params map[string]string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a ProxyGetActionImpl) GetScheme() string {
|
||||||
|
return a.Scheme
|
||||||
|
}
|
||||||
|
|
||||||
func (a ProxyGetActionImpl) GetName() string {
|
func (a ProxyGetActionImpl) GetName() string {
|
||||||
return a.Name
|
return a.Name
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a ProxyGetActionImpl) GetPort() string {
|
||||||
|
return a.Port
|
||||||
|
}
|
||||||
|
|
||||||
func (a ProxyGetActionImpl) GetPath() string {
|
func (a ProxyGetActionImpl) GetPath() string {
|
||||||
return a.Path
|
return a.Path
|
||||||
}
|
}
|
||||||
|
|
|
@ -76,6 +76,6 @@ func (c *FakeServices) Watch(label labels.Selector, field fields.Selector, opts
|
||||||
return c.Fake.InvokesWatch(NewWatchAction("services", c.Namespace, label, field, opts))
|
return c.Fake.InvokesWatch(NewWatchAction("services", c.Namespace, label, field, opts))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *FakeServices) ProxyGet(name, path string, params map[string]string) unversioned.ResponseWrapper {
|
func (c *FakeServices) ProxyGet(scheme, name, port, path string, params map[string]string) unversioned.ResponseWrapper {
|
||||||
return c.Fake.InvokesProxy(NewProxyGetAction("services", c.Namespace, name, path, params))
|
return c.Fake.InvokesProxy(NewProxyGetAction("services", c.Namespace, scheme, name, port, path, params))
|
||||||
}
|
}
|
||||||
|
|
|
@ -205,7 +205,7 @@ func (tc *testCase) verifyResults(t *testing.T) {
|
||||||
|
|
||||||
func (tc *testCase) runTest(t *testing.T) {
|
func (tc *testCase) runTest(t *testing.T) {
|
||||||
testClient := tc.prepareTestClient(t)
|
testClient := tc.prepareTestClient(t)
|
||||||
metricsClient := metrics.NewHeapsterMetricsClient(testClient, metrics.DefaultHeapsterNamespace, metrics.DefaultHeapsterService)
|
metricsClient := metrics.NewHeapsterMetricsClient(testClient, metrics.DefaultHeapsterNamespace, metrics.DefaultHeapsterScheme, metrics.DefaultHeapsterService, metrics.DefaultHeapsterPort)
|
||||||
hpaController := NewHorizontalController(testClient, metricsClient)
|
hpaController := NewHorizontalController(testClient, metricsClient)
|
||||||
err := hpaController.reconcileAutoscalers()
|
err := hpaController.reconcileAutoscalers()
|
||||||
assert.Equal(t, nil, err)
|
assert.Equal(t, nil, err)
|
||||||
|
|
|
@ -34,7 +34,9 @@ import (
|
||||||
|
|
||||||
const (
|
const (
|
||||||
DefaultHeapsterNamespace = "kube-system"
|
DefaultHeapsterNamespace = "kube-system"
|
||||||
|
DefaultHeapsterScheme = "http"
|
||||||
DefaultHeapsterService = "heapster"
|
DefaultHeapsterService = "heapster"
|
||||||
|
DefaultHeapsterPort = "" // use the first exposed port on the service
|
||||||
)
|
)
|
||||||
|
|
||||||
var heapsterQueryStart = -5 * time.Minute
|
var heapsterQueryStart = -5 * time.Minute
|
||||||
|
@ -67,7 +69,9 @@ type HeapsterMetricsClient struct {
|
||||||
client client.Interface
|
client client.Interface
|
||||||
resourceDefinitions map[api.ResourceName]metricDefinition
|
resourceDefinitions map[api.ResourceName]metricDefinition
|
||||||
heapsterNamespace string
|
heapsterNamespace string
|
||||||
|
heapsterScheme string
|
||||||
heapsterService string
|
heapsterService string
|
||||||
|
heapsterPort string
|
||||||
}
|
}
|
||||||
|
|
||||||
var heapsterMetricDefinitions = map[api.ResourceName]metricDefinition{
|
var heapsterMetricDefinitions = map[api.ResourceName]metricDefinition{
|
||||||
|
@ -93,12 +97,14 @@ var heapsterMetricDefinitions = map[api.ResourceName]metricDefinition{
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewHeapsterMetricsClient returns a new instance of Heapster-based implementation of MetricsClient interface.
|
// NewHeapsterMetricsClient returns a new instance of Heapster-based implementation of MetricsClient interface.
|
||||||
func NewHeapsterMetricsClient(client client.Interface, namespace, service string) *HeapsterMetricsClient {
|
func NewHeapsterMetricsClient(client client.Interface, namespace, scheme, service, port string) *HeapsterMetricsClient {
|
||||||
return &HeapsterMetricsClient{
|
return &HeapsterMetricsClient{
|
||||||
client: client,
|
client: client,
|
||||||
resourceDefinitions: heapsterMetricDefinitions,
|
resourceDefinitions: heapsterMetricDefinitions,
|
||||||
heapsterNamespace: namespace,
|
heapsterNamespace: namespace,
|
||||||
|
heapsterScheme: scheme,
|
||||||
heapsterService: service,
|
heapsterService: service,
|
||||||
|
heapsterPort: port,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -160,7 +166,7 @@ func (h *HeapsterMetricsClient) getForPods(resourceName api.ResourceName, namesp
|
||||||
metricSpec.name)
|
metricSpec.name)
|
||||||
|
|
||||||
resultRaw, err := h.client.Services(h.heapsterNamespace).
|
resultRaw, err := h.client.Services(h.heapsterNamespace).
|
||||||
ProxyGet(h.heapsterService, metricPath, map[string]string{"start": startTime.Format(time.RFC3339)}).
|
ProxyGet(h.heapsterScheme, h.heapsterService, h.heapsterPort, metricPath, map[string]string{"start": startTime.Format(time.RFC3339)}).
|
||||||
DoRaw()
|
DoRaw()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -148,7 +148,7 @@ func (tc *testCase) verifyResults(t *testing.T, val *ResourceConsumption, err er
|
||||||
|
|
||||||
func (tc *testCase) runTest(t *testing.T) {
|
func (tc *testCase) runTest(t *testing.T) {
|
||||||
testClient := tc.prepareTestClient(t)
|
testClient := tc.prepareTestClient(t)
|
||||||
metricsClient := NewHeapsterMetricsClient(testClient, DefaultHeapsterNamespace, DefaultHeapsterService)
|
metricsClient := NewHeapsterMetricsClient(testClient, DefaultHeapsterNamespace, DefaultHeapsterScheme, DefaultHeapsterService, DefaultHeapsterPort)
|
||||||
val, _, err := metricsClient.GetResourceConsumptionAndRequest(tc.targetResource, tc.namespace, tc.selector)
|
val, _, err := metricsClient.GetResourceConsumptionAndRequest(tc.targetResource, tc.namespace, tc.selector)
|
||||||
tc.verifyResults(t, val, err)
|
tc.verifyResults(t, val, err)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue