kube-proxy can read config from the apiserver

All clients that talk to a "master" as a host:port or URL
(scheme://host:port) parameter.  Add tests.
pull/6/head
Clayton Coleman 2014-08-15 17:14:22 -04:00
parent 083d81b6d7
commit 9006eadcfe
9 changed files with 393 additions and 17 deletions

View File

@ -47,7 +47,7 @@ func main() {
}
controllerManager := controller.NewReplicationManager(
client.New("http://"+*master, nil))
client.New(*master, nil))
controllerManager.Run(10 * time.Second)
select {}

View File

@ -18,7 +18,9 @@ package main
import (
"flag"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/proxy"
"github.com/GoogleCloudPlatform/kubernetes/pkg/proxy/config"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
@ -29,11 +31,12 @@ import (
var (
configFile = flag.String("configfile", "/tmp/proxy_config", "Configuration file for the proxy")
master = flag.String("master", "", "The address of the Kubernetes API server (optional)")
etcdServerList util.StringList
)
func init() {
flag.Var(&etcdServerList, "etcd_servers", "List of etcd servers to watch (http://ip:port), comma separated")
flag.Var(&etcdServerList, "etcd_servers", "List of etcd servers to watch (http://ip:port), comma separated (optional)")
}
func main() {
@ -43,24 +46,39 @@ func main() {
verflag.PrintAndExitIfRequested()
// Set up logger for etcd client
etcd.SetLogger(util.NewLogger("etcd "))
glog.Infof("Using configuration file %s and etcd_servers %v", *configFile, etcdServerList)
serviceConfig := config.NewServiceConfig()
endpointsConfig := config.NewEndpointsConfig()
// define api config source
if *master != "" {
glog.Infof("Using api calls to get config %v", *master)
//TODO: add auth info
client := client.New(*master, nil)
config.NewSourceAPI(
client,
30*time.Second,
serviceConfig.Channel("api"),
endpointsConfig.Channel("api"),
)
}
// Create a configuration source that handles configuration from etcd.
etcdClient := etcd.NewClient(etcdServerList)
config.NewConfigSourceEtcd(etcdClient,
serviceConfig.Channel("etcd"),
endpointsConfig.Channel("etcd"))
if len(etcdServerList) > 0 && *master == "" {
glog.Infof("Using etcd servers %v", etcdServerList)
// Set up logger for etcd client
etcd.SetLogger(util.NewLogger("etcd "))
etcdClient := etcd.NewClient(etcdServerList)
config.NewConfigSourceEtcd(etcdClient,
serviceConfig.Channel("etcd"),
endpointsConfig.Channel("etcd"))
}
// And create a configuration source that reads from a local file
config.NewConfigSourceFile(*configFile,
serviceConfig.Channel("file"),
endpointsConfig.Channel("file"))
glog.Infof("Using configuration file %s", *configFile)
loadBalancer := proxy.NewLoadBalancerRR()
proxier := proxy.NewProxier(loadBalancer)

View File

@ -66,7 +66,7 @@ KUBELET_PID=$!
PROXY_LOG=/tmp/kube-proxy.log
${GO_OUT}/proxy \
--etcd_servers="http://127.0.0.1:4001" &> ${PROXY_LOG} &
--master="http://${API_HOST}:${API_PORT}" &> ${PROXY_LOG} &
PROXY_PID=$!
echo "Local Kubernetes cluster is running. Press Ctrl-C to shut it down."

View File

@ -23,6 +23,7 @@ import (
"io"
"io/ioutil"
"net/http"
"net/url"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@ -67,6 +68,9 @@ type ServiceInterface interface {
CreateService(api.Service) (api.Service, error)
UpdateService(api.Service) (api.Service, error)
DeleteService(string) error
WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error)
WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error)
}
// VersionInterface has a method to retrieve the server version
@ -183,7 +187,12 @@ func (c *RESTClient) doRequest(request *http.Request) ([]byte, error) {
// requestBody is the body of the request. Can be nil.
// target the interface to marshal the JSON response into. Can be nil.
func (c *RESTClient) rawRequest(method, path string, requestBody io.Reader, target interface{}) ([]byte, error) {
request, err := http.NewRequest(method, c.makeURL(path), requestBody)
reqUrl, err := c.makeURL(path)
if err != nil {
return nil, err
}
request, err := http.NewRequest(method, reqUrl, requestBody)
if err != nil {
return nil, err
}
@ -201,8 +210,24 @@ func (c *RESTClient) rawRequest(method, path string, requestBody io.Reader, targ
return body, err
}
func (c *RESTClient) makeURL(path string) string {
return c.host + c.Prefix + path
func (c *RESTClient) makeURL(path string) (string, error) {
base := c.host
hostURL, err := url.Parse(base)
if err != nil {
return "", err
}
if hostURL.Scheme == "" {
hostURL, err = url.Parse("http://" + base)
if err != nil {
return "", err
}
if hostURL.Path != "" && hostURL.Path != "/" {
return "", fmt.Errorf("host must be a URL or a host:port pair: %s", base)
}
}
hostURL.Path += c.Prefix + path
return hostURL.String(), nil
}
// ListPods takes a selector, and returns the list of pods that match that selector
@ -309,6 +334,28 @@ func (c *Client) DeleteService(name string) error {
return c.Delete().Path("services").Path(name).Do().Error()
}
// WatchService returns a watch.Interface that watches the requested services.
func (c *Client) WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
return c.Get().
Path("watch").
Path("services").
UintParam("resourceVersion", resourceVersion).
SelectorParam("labels", label).
SelectorParam("fields", field).
Watch()
}
// WatchEndpoints returns a watch.Interface that watches the requested endpoints for a service.
func (c *Client) WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
return c.Get().
Path("watch").
Path("endpoints").
UintParam("resourceVersion", resourceVersion).
SelectorParam("labels", label).
SelectorParam("fields", field).
Watch()
}
// ServerVersion retrieves and parses the server's version.
func (c *Client) ServerVersion() (*version.Info, error) {
body, err := c.Get().AbsPath("/version").Do().Raw()

View File

@ -37,6 +37,33 @@ func makeURL(suffix string) string {
return apiPath + suffix
}
func TestValidatesHostParameter(t *testing.T) {
testCases := map[string]struct {
Value string
Err bool
}{
"foo.bar.com": {"http://foo.bar.com/api/v1beta1/", false},
"http://host/server": {"http://host/server/api/v1beta1/", false},
"host/server": {"", true},
}
for k, expected := range testCases {
c := RESTClient{host: k, Prefix: "/api/v1beta1/"}
actual, err := c.makeURL("")
switch {
case err == nil && expected.Err:
t.Errorf("expected error but was nil")
continue
case err != nil && !expected.Err:
t.Errorf("unexpected error %v", err)
continue
}
if expected.Value != actual {
t.Errorf("%s: expected %s, got %s", k, expected.Value, actual)
continue
}
}
}
func TestListEmptyPods(t *testing.T) {
c := &testClient{
Request: testRequest{Method: "GET", Path: "/pods"},

View File

@ -35,6 +35,7 @@ type Fake struct {
Actions []FakeAction
Pods api.PodList
Ctrl api.ReplicationController
Watch watch.Interface
}
func (c *Fake) ListPods(selector labels.Selector) (api.PodList, error) {
@ -88,8 +89,8 @@ func (c *Fake) DeleteReplicationController(controller string) error {
}
func (c *Fake) WatchReplicationControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
c.Actions = append(c.Actions, FakeAction{Action: "watch-controllers"})
return watch.NewFake(), nil
c.Actions = append(c.Actions, FakeAction{Action: "watch-controllers", Value: resourceVersion})
return c.Watch, nil
}
func (c *Fake) GetService(name string) (api.Service, error) {
@ -112,6 +113,16 @@ func (c *Fake) DeleteService(service string) error {
return nil
}
func (c *Fake) WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
c.Actions = append(c.Actions, FakeAction{Action: "watch-services", Value: resourceVersion})
return c.Watch, nil
}
func (c *Fake) WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
c.Actions = append(c.Actions, FakeAction{Action: "watch-endpoints", Value: resourceVersion})
return c.Watch, nil
}
func (c *Fake) ServerVersion() (*version.Info, error) {
c.Actions = append(c.Actions, FakeAction{Action: "get-version", Value: nil})
versionInfo := version.Get()

145
pkg/proxy/config/api.go Normal file
View File

@ -0,0 +1,145 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/golang/glog"
)
// Watcher is the interface needed to receive changes to services and endpoints
type Watcher interface {
WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error)
WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error)
}
// SourceAPI implements a configuration source for services and endpoints that
// uses the client watch API to efficiently detect changes.
type SourceAPI struct {
client Watcher
services chan<- ServiceUpdate
endpoints chan<- EndpointsUpdate
waitDuration time.Duration
reconnectDuration time.Duration
}
// NewSourceAPI creates a config source that watches for changes to the services and endpoints
func NewSourceAPI(client Watcher, period time.Duration, services chan<- ServiceUpdate, endpoints chan<- EndpointsUpdate) *SourceAPI {
config := &SourceAPI{
client: client,
services: services,
endpoints: endpoints,
waitDuration: period,
// prevent hot loops if the server starts to misbehave
reconnectDuration: time.Second * 1,
}
serviceVersion := uint64(0)
go util.Forever(func() {
config.runServices(&serviceVersion)
time.Sleep(wait.Jitter(config.reconnectDuration, 0.0))
}, period)
endpointVersion := uint64(0)
go util.Forever(func() {
config.runEndpoints(&endpointVersion)
time.Sleep(wait.Jitter(config.reconnectDuration, 0.0))
}, period)
return config
}
// runServices loops forever looking for changes to services
func (s *SourceAPI) runServices(resourceVersion *uint64) {
watcher, err := s.client.WatchServices(labels.Everything(), labels.Everything(), *resourceVersion)
if err != nil {
glog.Errorf("Unable to watch for services changes: %v", err)
time.Sleep(wait.Jitter(s.waitDuration, 0.0))
return
}
defer watcher.Stop()
ch := watcher.ResultChan()
handleServicesWatch(resourceVersion, ch, s.services)
}
// handleServicesWatch loops over an event channel and delivers config changes to an update channel
func handleServicesWatch(resourceVersion *uint64, ch <-chan watch.Event, updates chan<- ServiceUpdate) {
for {
select {
case event, ok := <-ch:
if !ok {
glog.V(2).Infof("WatchServices channel closed")
return
}
service := event.Object.(*api.Service)
*resourceVersion = service.ResourceVersion + 1
switch event.Type {
case watch.Added, watch.Modified:
updates <- ServiceUpdate{Op: SET, Services: []api.Service{*service}}
case watch.Deleted:
updates <- ServiceUpdate{Op: SET}
}
}
}
}
// runEndpoints loops forever looking for changes to endpoints
func (s *SourceAPI) runEndpoints(resourceVersion *uint64) {
watcher, err := s.client.WatchEndpoints(labels.Everything(), labels.Everything(), *resourceVersion)
if err != nil {
glog.Errorf("Unable to watch for endpoints changes: %v", err)
time.Sleep(wait.Jitter(s.waitDuration, 0.0))
return
}
defer watcher.Stop()
ch := watcher.ResultChan()
handleEndpointsWatch(resourceVersion, ch, s.endpoints)
}
// handleEndpointsWatch loops over an event channel and delivers config changes to an update channel
func handleEndpointsWatch(resourceVersion *uint64, ch <-chan watch.Event, updates chan<- EndpointsUpdate) {
for {
select {
case event, ok := <-ch:
if !ok {
glog.V(2).Infof("WatchEndpoints channel closed")
return
}
endpoints := event.Object.(*api.Endpoints)
*resourceVersion = endpoints.ResourceVersion + 1
switch event.Type {
case watch.Added, watch.Modified:
updates <- EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{*endpoints}}
case watch.Deleted:
updates <- EndpointsUpdate{Op: SET}
}
}
}
}

View File

@ -0,0 +1,116 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
"reflect"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
func TestServices(t *testing.T) {
service := api.Service{JSONBase: api.JSONBase{ID: "bar", ResourceVersion: uint64(2)}}
fakeWatch := watch.NewFake()
fakeClient := &client.Fake{Watch: fakeWatch}
services := make(chan ServiceUpdate)
source := SourceAPI{client: fakeClient, services: services}
resourceVersion := uint64(0)
go func() {
// called twice
source.runServices(&resourceVersion)
source.runServices(&resourceVersion)
}()
// test adding a service to the watch
fakeWatch.Add(&service)
if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-services", uint64(0)}}) {
t.Errorf("expected call to watch-services, got %#v", fakeClient)
}
actual := <-services
expected := ServiceUpdate{Op: SET, Services: []api.Service{service}}
if !reflect.DeepEqual(expected, actual) {
t.Errorf("expected %#v, got %#v", expected, actual)
}
// verify that a delete results in a config change
fakeWatch.Delete(&service)
actual = <-services
expected = ServiceUpdate{Op: SET}
if !reflect.DeepEqual(expected, actual) {
t.Errorf("expected %#v, got %#v", expected, actual)
}
// verify that closing the channel results in a new call to WatchServices with a higher resource version
newFakeWatch := watch.NewFake()
fakeClient.Watch = newFakeWatch
fakeWatch.Stop()
newFakeWatch.Add(&service)
if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-services", uint64(0)}, {"watch-services", uint64(3)}}) {
t.Errorf("expected call to watch-endpoints, got %#v", fakeClient)
}
}
func TestEndpoints(t *testing.T) {
endpoint := api.Endpoints{JSONBase: api.JSONBase{ID: "bar", ResourceVersion: uint64(2)}, Endpoints: []string{"127.0.0.1:9000"}}
fakeWatch := watch.NewFake()
fakeClient := &client.Fake{Watch: fakeWatch}
endpoints := make(chan EndpointsUpdate)
source := SourceAPI{client: fakeClient, endpoints: endpoints}
resourceVersion := uint64(0)
go func() {
// called twice
source.runEndpoints(&resourceVersion)
source.runEndpoints(&resourceVersion)
}()
// test adding an endpoint to the watch
fakeWatch.Add(&endpoint)
if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-endpoints", uint64(0)}}) {
t.Errorf("expected call to watch-endpoints, got %#v", fakeClient)
}
actual := <-endpoints
expected := EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{endpoint}}
if !reflect.DeepEqual(expected, actual) {
t.Errorf("expected %#v, got %#v", expected, actual)
}
// verify that a delete results in a config change
fakeWatch.Delete(&endpoint)
actual = <-endpoints
expected = EndpointsUpdate{Op: SET}
if !reflect.DeepEqual(expected, actual) {
t.Errorf("expected %#v, got %#v", expected, actual)
}
// verify that closing the channel results in a new call to WatchEndpoints with a higher resource version
newFakeWatch := watch.NewFake()
fakeClient.Watch = newFakeWatch
fakeWatch.Stop()
newFakeWatch.Add(&endpoint)
if !reflect.DeepEqual(fakeClient.Actions, []client.FakeAction{{"watch-endpoints", uint64(0)}, {"watch-endpoints", uint64(3)}}) {
t.Errorf("expected call to watch-endpoints, got %#v", fakeClient)
}
}

View File

@ -18,9 +18,21 @@ package wait
import (
"errors"
"math/rand"
"time"
)
// Jitter returns a time.Duration between duration and duration + maxFactor * duration,
// to allow clients to avoid converging on periodic behavior. If maxFactor is 0.0, a
// suggested default value will be chosen.
func Jitter(duration time.Duration, maxFactor float64) time.Duration {
if maxFactor <= 0.0 {
maxFactor = 1.0
}
wait := duration + time.Duration(rand.Float64()*maxFactor*float64(duration))
return wait
}
// ErrWaitTimeout is returned when the condition exited without success
var ErrWaitTimeout = errors.New("timed out waiting for the condition")