From 9006eadcfee2e0493590fce7dc2968db2779bb43 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Fri, 15 Aug 2014 17:14:22 -0400 Subject: [PATCH] kube-proxy can read config from the apiserver All clients that talk to a "master" as a host:port or URL (scheme://host:port) parameter. Add tests. --- cmd/controller-manager/controller-manager.go | 2 +- cmd/proxy/proxy.go | 38 +++-- hack/local-up-cluster.sh | 2 +- pkg/client/client.go | 53 ++++++- pkg/client/client_test.go | 27 ++++ pkg/client/fake.go | 15 +- pkg/proxy/config/api.go | 145 +++++++++++++++++++ pkg/proxy/config/api_test.go | 116 +++++++++++++++ pkg/util/wait/wait.go | 12 ++ 9 files changed, 393 insertions(+), 17 deletions(-) create mode 100644 pkg/proxy/config/api.go create mode 100644 pkg/proxy/config/api_test.go diff --git a/cmd/controller-manager/controller-manager.go b/cmd/controller-manager/controller-manager.go index d32992b120..28f6303c7b 100644 --- a/cmd/controller-manager/controller-manager.go +++ b/cmd/controller-manager/controller-manager.go @@ -47,7 +47,7 @@ func main() { } controllerManager := controller.NewReplicationManager( - client.New("http://"+*master, nil)) + client.New(*master, nil)) controllerManager.Run(10 * time.Second) select {} diff --git a/cmd/proxy/proxy.go b/cmd/proxy/proxy.go index 63a82fcf4f..af16a8c17b 100644 --- a/cmd/proxy/proxy.go +++ b/cmd/proxy/proxy.go @@ -18,7 +18,9 @@ package main import ( "flag" + "time" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/proxy" "github.com/GoogleCloudPlatform/kubernetes/pkg/proxy/config" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -29,11 +31,12 @@ import ( var ( configFile = flag.String("configfile", "/tmp/proxy_config", "Configuration file for the proxy") + master = flag.String("master", "", "The address of the Kubernetes API server (optional)") etcdServerList util.StringList ) func init() { - flag.Var(&etcdServerList, "etcd_servers", "List of etcd servers to watch (http://ip:port), comma separated") + flag.Var(&etcdServerList, "etcd_servers", "List of etcd servers to watch (http://ip:port), comma separated (optional)") } func main() { @@ -43,24 +46,39 @@ func main() { verflag.PrintAndExitIfRequested() - // Set up logger for etcd client - etcd.SetLogger(util.NewLogger("etcd ")) - - glog.Infof("Using configuration file %s and etcd_servers %v", *configFile, etcdServerList) - serviceConfig := config.NewServiceConfig() endpointsConfig := config.NewEndpointsConfig() + // define api config source + if *master != "" { + glog.Infof("Using api calls to get config %v", *master) + //TODO: add auth info + client := client.New(*master, nil) + config.NewSourceAPI( + client, + 30*time.Second, + serviceConfig.Channel("api"), + endpointsConfig.Channel("api"), + ) + } + // Create a configuration source that handles configuration from etcd. - etcdClient := etcd.NewClient(etcdServerList) - config.NewConfigSourceEtcd(etcdClient, - serviceConfig.Channel("etcd"), - endpointsConfig.Channel("etcd")) + if len(etcdServerList) > 0 && *master == "" { + glog.Infof("Using etcd servers %v", etcdServerList) + + // Set up logger for etcd client + etcd.SetLogger(util.NewLogger("etcd ")) + etcdClient := etcd.NewClient(etcdServerList) + config.NewConfigSourceEtcd(etcdClient, + serviceConfig.Channel("etcd"), + endpointsConfig.Channel("etcd")) + } // And create a configuration source that reads from a local file config.NewConfigSourceFile(*configFile, serviceConfig.Channel("file"), endpointsConfig.Channel("file")) + glog.Infof("Using configuration file %s", *configFile) loadBalancer := proxy.NewLoadBalancerRR() proxier := proxy.NewProxier(loadBalancer) diff --git a/hack/local-up-cluster.sh b/hack/local-up-cluster.sh index b4a42741d0..372263096f 100755 --- a/hack/local-up-cluster.sh +++ b/hack/local-up-cluster.sh @@ -66,7 +66,7 @@ KUBELET_PID=$! PROXY_LOG=/tmp/kube-proxy.log ${GO_OUT}/proxy \ - --etcd_servers="http://127.0.0.1:4001" &> ${PROXY_LOG} & + --master="http://${API_HOST}:${API_PORT}" &> ${PROXY_LOG} & PROXY_PID=$! echo "Local Kubernetes cluster is running. Press Ctrl-C to shut it down." diff --git a/pkg/client/client.go b/pkg/client/client.go index d624442910..7746cbcc4e 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -23,6 +23,7 @@ import ( "io" "io/ioutil" "net/http" + "net/url" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -67,6 +68,9 @@ type ServiceInterface interface { CreateService(api.Service) (api.Service, error) UpdateService(api.Service) (api.Service, error) DeleteService(string) error + WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) + + WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) } // VersionInterface has a method to retrieve the server version @@ -183,7 +187,12 @@ func (c *RESTClient) doRequest(request *http.Request) ([]byte, error) { // requestBody is the body of the request. Can be nil. // target the interface to marshal the JSON response into. Can be nil. func (c *RESTClient) rawRequest(method, path string, requestBody io.Reader, target interface{}) ([]byte, error) { - request, err := http.NewRequest(method, c.makeURL(path), requestBody) + reqUrl, err := c.makeURL(path) + if err != nil { + return nil, err + } + + request, err := http.NewRequest(method, reqUrl, requestBody) if err != nil { return nil, err } @@ -201,8 +210,24 @@ func (c *RESTClient) rawRequest(method, path string, requestBody io.Reader, targ return body, err } -func (c *RESTClient) makeURL(path string) string { - return c.host + c.Prefix + path +func (c *RESTClient) makeURL(path string) (string, error) { + base := c.host + hostURL, err := url.Parse(base) + if err != nil { + return "", err + } + if hostURL.Scheme == "" { + hostURL, err = url.Parse("http://" + base) + if err != nil { + return "", err + } + if hostURL.Path != "" && hostURL.Path != "/" { + return "", fmt.Errorf("host must be a URL or a host:port pair: %s", base) + } + } + hostURL.Path += c.Prefix + path + + return hostURL.String(), nil } // ListPods takes a selector, and returns the list of pods that match that selector @@ -309,6 +334,28 @@ func (c *Client) DeleteService(name string) error { return c.Delete().Path("services").Path(name).Do().Error() } +// WatchService returns a watch.Interface that watches the requested services. +func (c *Client) WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + return c.Get(). + Path("watch"). + Path("services"). + UintParam("resourceVersion", resourceVersion). + SelectorParam("labels", label). + SelectorParam("fields", field). + Watch() +} + +// WatchEndpoints returns a watch.Interface that watches the requested endpoints for a service. +func (c *Client) WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + return c.Get(). + Path("watch"). + Path("endpoints"). + UintParam("resourceVersion", resourceVersion). + SelectorParam("labels", label). + SelectorParam("fields", field). + Watch() +} + // ServerVersion retrieves and parses the server's version. func (c *Client) ServerVersion() (*version.Info, error) { body, err := c.Get().AbsPath("/version").Do().Raw() diff --git a/pkg/client/client_test.go b/pkg/client/client_test.go index 21c2d440d2..269d0c0237 100644 --- a/pkg/client/client_test.go +++ b/pkg/client/client_test.go @@ -37,6 +37,33 @@ func makeURL(suffix string) string { return apiPath + suffix } +func TestValidatesHostParameter(t *testing.T) { + testCases := map[string]struct { + Value string + Err bool + }{ + "foo.bar.com": {"http://foo.bar.com/api/v1beta1/", false}, + "http://host/server": {"http://host/server/api/v1beta1/", false}, + "host/server": {"", true}, + } + for k, expected := range testCases { + c := RESTClient{host: k, Prefix: "/api/v1beta1/"} + actual, err := c.makeURL("") + switch { + case err == nil && expected.Err: + t.Errorf("expected error but was nil") + continue + case err != nil && !expected.Err: + t.Errorf("unexpected error %v", err) + continue + } + if expected.Value != actual { + t.Errorf("%s: expected %s, got %s", k, expected.Value, actual) + continue + } + } +} + func TestListEmptyPods(t *testing.T) { c := &testClient{ Request: testRequest{Method: "GET", Path: "/pods"}, diff --git a/pkg/client/fake.go b/pkg/client/fake.go index b03f599a1c..dbe49d26ac 100644 --- a/pkg/client/fake.go +++ b/pkg/client/fake.go @@ -35,6 +35,7 @@ type Fake struct { Actions []FakeAction Pods api.PodList Ctrl api.ReplicationController + Watch watch.Interface } func (c *Fake) ListPods(selector labels.Selector) (api.PodList, error) { @@ -88,8 +89,8 @@ func (c *Fake) DeleteReplicationController(controller string) error { } func (c *Fake) WatchReplicationControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { - c.Actions = append(c.Actions, FakeAction{Action: "watch-controllers"}) - return watch.NewFake(), nil + c.Actions = append(c.Actions, FakeAction{Action: "watch-controllers", Value: resourceVersion}) + return c.Watch, nil } func (c *Fake) GetService(name string) (api.Service, error) { @@ -112,6 +113,16 @@ func (c *Fake) DeleteService(service string) error { return nil } +func (c *Fake) WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + c.Actions = append(c.Actions, FakeAction{Action: "watch-services", Value: resourceVersion}) + return c.Watch, nil +} + +func (c *Fake) WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + c.Actions = append(c.Actions, FakeAction{Action: "watch-endpoints", Value: resourceVersion}) + return c.Watch, nil +} + func (c *Fake) ServerVersion() (*version.Info, error) { c.Actions = append(c.Actions, FakeAction{Action: "get-version", Value: nil}) versionInfo := version.Get() diff --git a/pkg/proxy/config/api.go b/pkg/proxy/config/api.go new file mode 100644 index 0000000000..e9e5d625dc --- /dev/null +++ b/pkg/proxy/config/api.go @@ -0,0 +1,145 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + "github.com/golang/glog" +) + +// Watcher is the interface needed to receive changes to services and endpoints +type Watcher interface { + WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) + WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) +} + +// SourceAPI implements a configuration source for services and endpoints that +// uses the client watch API to efficiently detect changes. +type SourceAPI struct { + client Watcher + services chan<- ServiceUpdate + endpoints chan<- EndpointsUpdate + + waitDuration time.Duration + reconnectDuration time.Duration +} + +// NewSourceAPI creates a config source that watches for changes to the services and endpoints +func NewSourceAPI(client Watcher, period time.Duration, services chan<- ServiceUpdate, endpoints chan<- EndpointsUpdate) *SourceAPI { + config := &SourceAPI{ + client: client, + services: services, + endpoints: endpoints, + + waitDuration: period, + // prevent hot loops if the server starts to misbehave + reconnectDuration: time.Second * 1, + } + serviceVersion := uint64(0) + go util.Forever(func() { + config.runServices(&serviceVersion) + time.Sleep(wait.Jitter(config.reconnectDuration, 0.0)) + }, period) + endpointVersion := uint64(0) + go util.Forever(func() { + config.runEndpoints(&endpointVersion) + time.Sleep(wait.Jitter(config.reconnectDuration, 0.0)) + }, period) + return config +} + +// runServices loops forever looking for changes to services +func (s *SourceAPI) runServices(resourceVersion *uint64) { + watcher, err := s.client.WatchServices(labels.Everything(), labels.Everything(), *resourceVersion) + if err != nil { + glog.Errorf("Unable to watch for services changes: %v", err) + time.Sleep(wait.Jitter(s.waitDuration, 0.0)) + return + } + defer watcher.Stop() + + ch := watcher.ResultChan() + handleServicesWatch(resourceVersion, ch, s.services) +} + +// handleServicesWatch loops over an event channel and delivers config changes to an update channel +func handleServicesWatch(resourceVersion *uint64, ch <-chan watch.Event, updates chan<- ServiceUpdate) { + for { + select { + case event, ok := <-ch: + if !ok { + glog.V(2).Infof("WatchServices channel closed") + return + } + + service := event.Object.(*api.Service) + *resourceVersion = service.ResourceVersion + 1 + + switch event.Type { + case watch.Added, watch.Modified: + updates <- ServiceUpdate{Op: SET, Services: []api.Service{*service}} + + case watch.Deleted: + updates <- ServiceUpdate{Op: SET} + } + } + } +} + +// runEndpoints loops forever looking for changes to endpoints +func (s *SourceAPI) runEndpoints(resourceVersion *uint64) { + watcher, err := s.client.WatchEndpoints(labels.Everything(), labels.Everything(), *resourceVersion) + if err != nil { + glog.Errorf("Unable to watch for endpoints changes: %v", err) + time.Sleep(wait.Jitter(s.waitDuration, 0.0)) + return + } + defer watcher.Stop() + + ch := watcher.ResultChan() + handleEndpointsWatch(resourceVersion, ch, s.endpoints) +} + +// handleEndpointsWatch loops over an event channel and delivers config changes to an update channel +func handleEndpointsWatch(resourceVersion *uint64, ch <-chan watch.Event, updates chan<- EndpointsUpdate) { + for { + select { + case event, ok := <-ch: + if !ok { + glog.V(2).Infof("WatchEndpoints channel closed") + return + } + + endpoints := event.Object.(*api.Endpoints) + *resourceVersion = endpoints.ResourceVersion + 1 + + switch event.Type { + case watch.Added, watch.Modified: + updates <- EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{*endpoints}} + + case watch.Deleted: + updates <- EndpointsUpdate{Op: SET} + } + } + } +} diff --git a/pkg/proxy/config/api_test.go b/pkg/proxy/config/api_test.go new file mode 100644 index 0000000000..f08c6f0d2c --- /dev/null +++ b/pkg/proxy/config/api_test.go @@ -0,0 +1,116 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "reflect" + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" +) + +func TestServices(t *testing.T) { + service := api.Service{JSONBase: api.JSONBase{ID: "bar", ResourceVersion: uint64(2)}} + + fakeWatch := watch.NewFake() + fakeClient := &client.Fake{Watch: fakeWatch} + services := make(chan ServiceUpdate) + source := SourceAPI{client: fakeClient, services: services} + resourceVersion := uint64(0) + go func() { + // called twice + source.runServices(&resourceVersion) + source.runServices(&resourceVersion) + }() + + // test adding a service to the watch + fakeWatch.Add(&service) + if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-services", uint64(0)}}) { + t.Errorf("expected call to watch-services, got %#v", fakeClient) + } + + actual := <-services + expected := ServiceUpdate{Op: SET, Services: []api.Service{service}} + if !reflect.DeepEqual(expected, actual) { + t.Errorf("expected %#v, got %#v", expected, actual) + } + + // verify that a delete results in a config change + fakeWatch.Delete(&service) + actual = <-services + expected = ServiceUpdate{Op: SET} + if !reflect.DeepEqual(expected, actual) { + t.Errorf("expected %#v, got %#v", expected, actual) + } + + // verify that closing the channel results in a new call to WatchServices with a higher resource version + newFakeWatch := watch.NewFake() + fakeClient.Watch = newFakeWatch + fakeWatch.Stop() + + newFakeWatch.Add(&service) + if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-services", uint64(0)}, {"watch-services", uint64(3)}}) { + t.Errorf("expected call to watch-endpoints, got %#v", fakeClient) + } +} + +func TestEndpoints(t *testing.T) { + endpoint := api.Endpoints{JSONBase: api.JSONBase{ID: "bar", ResourceVersion: uint64(2)}, Endpoints: []string{"127.0.0.1:9000"}} + + fakeWatch := watch.NewFake() + fakeClient := &client.Fake{Watch: fakeWatch} + endpoints := make(chan EndpointsUpdate) + source := SourceAPI{client: fakeClient, endpoints: endpoints} + resourceVersion := uint64(0) + go func() { + // called twice + source.runEndpoints(&resourceVersion) + source.runEndpoints(&resourceVersion) + }() + + // test adding an endpoint to the watch + fakeWatch.Add(&endpoint) + if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-endpoints", uint64(0)}}) { + t.Errorf("expected call to watch-endpoints, got %#v", fakeClient) + } + + actual := <-endpoints + expected := EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{endpoint}} + if !reflect.DeepEqual(expected, actual) { + t.Errorf("expected %#v, got %#v", expected, actual) + } + + // verify that a delete results in a config change + fakeWatch.Delete(&endpoint) + actual = <-endpoints + expected = EndpointsUpdate{Op: SET} + if !reflect.DeepEqual(expected, actual) { + t.Errorf("expected %#v, got %#v", expected, actual) + } + + // verify that closing the channel results in a new call to WatchEndpoints with a higher resource version + newFakeWatch := watch.NewFake() + fakeClient.Watch = newFakeWatch + fakeWatch.Stop() + + newFakeWatch.Add(&endpoint) + if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-endpoints", uint64(0)}, {"watch-endpoints", uint64(3)}}) { + t.Errorf("expected call to watch-endpoints, got %#v", fakeClient) + } +} diff --git a/pkg/util/wait/wait.go b/pkg/util/wait/wait.go index d4c4714237..e22c6461de 100644 --- a/pkg/util/wait/wait.go +++ b/pkg/util/wait/wait.go @@ -18,9 +18,21 @@ package wait import ( "errors" + "math/rand" "time" ) +// Jitter returns a time.Duration between duration and duration + maxFactor * duration, +// to allow clients to avoid converging on periodic behavior. If maxFactor is 0.0, a +// suggested default value will be chosen. +func Jitter(duration time.Duration, maxFactor float64) time.Duration { + if maxFactor <= 0.0 { + maxFactor = 1.0 + } + wait := duration + time.Duration(rand.Float64()*maxFactor*float64(duration)) + return wait +} + // ErrWaitTimeout is returned when the condition exited without success var ErrWaitTimeout = errors.New("timed out waiting for the condition")