Merge pull request #43295 from wojtek-t/remove_muxer_from_kube_proxy

Automatic merge from submit-queue

Simplify proxy config for Endpoints by removing Mux.

This will help in future optimizations (for now the gain from this PR is negligible), but at least in my opinion the code is much cleaner and understandable.
pull/6/head
Kubernetes Submit Queue 2017-03-26 03:46:55 -07:00 committed by GitHub
commit c8e15e135b
9 changed files with 328 additions and 597 deletions

View File

@ -305,18 +305,13 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
// Note: RegisterHandler() calls need to happen before creation of Sources because sources
// only notify on changes, and the initial update (on process start) may be lost if no handlers
// are registered yet.
serviceConfig := proxyconfig.NewServiceConfig()
serviceConfig := proxyconfig.NewServiceConfig(client.Core().RESTClient(), config.ConfigSyncPeriod)
serviceConfig.RegisterHandler(proxier)
go serviceConfig.Run(wait.NeverStop)
endpointsConfig := proxyconfig.NewEndpointsConfig()
endpointsConfig := proxyconfig.NewEndpointsConfig(client.Core().RESTClient(), config.ConfigSyncPeriod)
endpointsConfig.RegisterHandler(endpointsHandler)
proxyconfig.NewSourceAPI(
client.Core().RESTClient(),
config.ConfigSyncPeriod,
serviceConfig.Channel("api"),
endpointsConfig.Channel("api"),
)
go endpointsConfig.Run(wait.NeverStop)
config.NodeRef = &clientv1.ObjectReference{
Kind: "Node",

View File

@ -18,6 +18,7 @@ package main
import (
"fmt"
"time"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/util/flag"
@ -135,10 +136,10 @@ func main() {
iptInterface := fakeiptables.NewFake()
serviceConfig := proxyconfig.NewServiceConfig()
serviceConfig := proxyconfig.NewServiceConfig(internalClientset.Core().RESTClient(), 15*time.Minute)
serviceConfig.RegisterHandler(&kubemark.FakeProxyHandler{})
endpointsConfig := proxyconfig.NewEndpointsConfig()
endpointsConfig := proxyconfig.NewEndpointsConfig(internalClientset.Core().RESTClient(), 15*time.Minute)
endpointsConfig.RegisterHandler(&kubemark.FakeProxyHandler{})
eventClient, err := clientgoclientset.NewForConfig(clientConfig)

View File

@ -41,6 +41,7 @@ go_library(
"//vendor:github.com/golang/glog",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/types",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/client-go/kubernetes/typed/core/v1",
"//vendor:k8s.io/client-go/pkg/api/v1",
"//vendor:k8s.io/client-go/tools/record",

View File

@ -17,9 +17,8 @@ limitations under the License.
package kubemark
import (
"time"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
clientv1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/record"
@ -71,12 +70,9 @@ func NewHollowProxyOrDie(
UID: types.UID(nodeName),
Namespace: "",
}
proxyconfig.NewSourceAPI(
client.Core().RESTClient(),
15*time.Minute,
serviceConfig.Channel("api"),
endpointsConfig.Channel("api"),
)
go endpointsConfig.Run(wait.NeverStop)
go serviceConfig.Run(wait.NeverStop)
hollowProxy, err := proxyapp.NewProxyServer(client, eventClient, config, iptInterface, &FakeProxier{}, broadcaster, recorder, nil, "fake")
if err != nil {

View File

@ -11,21 +11,19 @@ load(
go_library(
name = "go_default_library",
srcs = [
"api.go",
"config.go",
"doc.go",
],
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/client/listers/core/internalversion:go_default_library",
"//pkg/util/config:go_default_library",
"//vendor:github.com/davecgh/go-spew/spew",
"//vendor:github.com/golang/glog",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/fields",
"//vendor:k8s.io/apimachinery/pkg/types",
"//vendor:k8s.io/apimachinery/pkg/labels",
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/client-go/tools/cache",
],
)
@ -40,7 +38,6 @@ go_test(
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//vendor:k8s.io/apimachinery/pkg/api/equality",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/wait",

View File

@ -1,175 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
"fmt"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/api"
)
// NewSourceAPI creates config source that watches for changes to the services and endpoints.
func NewSourceAPI(c cache.Getter, period time.Duration, servicesChan chan<- ServiceUpdate, endpointsChan chan<- EndpointsUpdate) {
servicesLW := cache.NewListWatchFromClient(c, "services", metav1.NamespaceAll, fields.Everything())
endpointsLW := cache.NewListWatchFromClient(c, "endpoints", metav1.NamespaceAll, fields.Everything())
newSourceAPI(servicesLW, endpointsLW, period, servicesChan, endpointsChan, wait.NeverStop)
}
func newSourceAPI(
servicesLW cache.ListerWatcher,
endpointsLW cache.ListerWatcher,
period time.Duration,
servicesChan chan<- ServiceUpdate,
endpointsChan chan<- EndpointsUpdate,
stopCh <-chan struct{}) {
serviceController := NewServiceController(servicesLW, period, servicesChan)
go serviceController.Run(stopCh)
endpointsController := NewEndpointsController(endpointsLW, period, endpointsChan)
go endpointsController.Run(stopCh)
if !cache.WaitForCacheSync(stopCh, serviceController.HasSynced, endpointsController.HasSynced) {
utilruntime.HandleError(fmt.Errorf("source controllers not synced"))
return
}
servicesChan <- ServiceUpdate{Op: SYNCED}
endpointsChan <- EndpointsUpdate{Op: SYNCED}
}
func sendAddService(servicesChan chan<- ServiceUpdate) func(obj interface{}) {
return func(obj interface{}) {
service, ok := obj.(*api.Service)
if !ok {
utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Service: %v", obj))
return
}
servicesChan <- ServiceUpdate{Op: ADD, Service: service}
}
}
func sendUpdateService(servicesChan chan<- ServiceUpdate) func(oldObj, newObj interface{}) {
return func(_, newObj interface{}) {
service, ok := newObj.(*api.Service)
if !ok {
utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Service: %v", newObj))
return
}
servicesChan <- ServiceUpdate{Op: UPDATE, Service: service}
}
}
func sendDeleteService(servicesChan chan<- ServiceUpdate) func(obj interface{}) {
return func(obj interface{}) {
var service *api.Service
switch t := obj.(type) {
case *api.Service:
service = t
case cache.DeletedFinalStateUnknown:
var ok bool
service, ok = t.Obj.(*api.Service)
if !ok {
utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Service: %v", t.Obj))
return
}
default:
utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Service: %v", t))
return
}
servicesChan <- ServiceUpdate{Op: REMOVE, Service: service}
}
}
func sendAddEndpoints(endpointsChan chan<- EndpointsUpdate) func(obj interface{}) {
return func(obj interface{}) {
endpoints, ok := obj.(*api.Endpoints)
if !ok {
utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Endpoints: %v", obj))
return
}
endpointsChan <- EndpointsUpdate{Op: ADD, Endpoints: endpoints}
}
}
func sendUpdateEndpoints(endpointsChan chan<- EndpointsUpdate) func(oldObj, newObj interface{}) {
return func(_, newObj interface{}) {
endpoints, ok := newObj.(*api.Endpoints)
if !ok {
utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Endpoints: %v", newObj))
return
}
endpointsChan <- EndpointsUpdate{Op: UPDATE, Endpoints: endpoints}
}
}
func sendDeleteEndpoints(endpointsChan chan<- EndpointsUpdate) func(obj interface{}) {
return func(obj interface{}) {
var endpoints *api.Endpoints
switch t := obj.(type) {
case *api.Endpoints:
endpoints = t
case cache.DeletedFinalStateUnknown:
var ok bool
endpoints, ok = t.Obj.(*api.Endpoints)
if !ok {
utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Endpoints: %v", t.Obj))
return
}
default:
utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Endpoints: %v", obj))
return
}
endpointsChan <- EndpointsUpdate{Op: REMOVE, Endpoints: endpoints}
}
}
// NewServiceController creates a controller that is watching services and sending
// updates into ServiceUpdate channel.
func NewServiceController(lw cache.ListerWatcher, period time.Duration, ch chan<- ServiceUpdate) cache.Controller {
_, serviceController := cache.NewInformer(
lw,
&api.Service{},
period,
cache.ResourceEventHandlerFuncs{
AddFunc: sendAddService(ch),
UpdateFunc: sendUpdateService(ch),
DeleteFunc: sendDeleteService(ch),
},
)
return serviceController
}
// NewEndpointsController creates a controller that is watching endpoints and sending
// updates into EndpointsUpdate channel.
func NewEndpointsController(lw cache.ListerWatcher, period time.Duration, ch chan<- EndpointsUpdate) cache.Controller {
_, endpointsController := cache.NewInformer(
lw,
&api.Endpoints{},
period,
cache.ResourceEventHandlerFuncs{
AddFunc: sendAddEndpoints(ch),
UpdateFunc: sendUpdateEndpoints(ch),
DeleteFunc: sendDeleteEndpoints(ch),
},
)
return endpointsController
}

View File

@ -23,10 +23,8 @@ import (
"testing"
"time"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/api"
@ -65,68 +63,40 @@ func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) {
watchResp: fakeWatch,
}
ch := make(chan ServiceUpdate)
stopCh := make(chan struct{})
defer close(stopCh)
serviceController := NewServiceController(lw, 30*time.Second, ch)
go serviceController.Run(wait.NeverStop)
ch := make(chan struct{})
handler := newSvcHandler(t, nil, func() { ch <- struct{}{} })
serviceConfig := newServiceConfig(lw, time.Minute)
serviceConfig.RegisterHandler(handler)
go serviceConfig.Run(stopCh)
// Add the first service
handler.expected = []api.Service{*service1v1}
fakeWatch.Add(service1v1)
got, ok := <-ch
if !ok {
t.Errorf("Unable to read from channel when expected")
}
expected := ServiceUpdate{Op: ADD, Service: service1v1}
if !apiequality.Semantic.DeepEqual(expected, got) {
t.Errorf("Expected %#v; Got %#v", expected, got)
}
<-ch
// Add another service
handler.expected = []api.Service{*service1v1, *service2}
fakeWatch.Add(service2)
got, ok = <-ch
if !ok {
t.Errorf("Unable to read from channel when expected")
}
// Could be sorted either of these two ways:
expected = ServiceUpdate{Op: ADD, Service: service2}
if !apiequality.Semantic.DeepEqual(expected, got) {
t.Errorf("Expected %#v, Got %#v", expected, got)
}
<-ch
// Modify service1
handler.expected = []api.Service{*service1v2, *service2}
fakeWatch.Modify(service1v2)
got, ok = <-ch
if !ok {
t.Errorf("Unable to read from channel when expected")
}
expected = ServiceUpdate{Op: UPDATE, Service: service1v2}
if !apiequality.Semantic.DeepEqual(expected, got) {
t.Errorf("Expected %#v, Got %#v", expected, got)
}
<-ch
// Delete service1
handler.expected = []api.Service{*service2}
fakeWatch.Delete(service1v2)
got, ok = <-ch
if !ok {
t.Errorf("Unable to read from channel when expected")
}
expected = ServiceUpdate{Op: REMOVE, Service: service1v2}
if !apiequality.Semantic.DeepEqual(expected, got) {
t.Errorf("Expected %#v, Got %#v", expected, got)
}
<-ch
// Delete service2
handler.expected = []api.Service{}
fakeWatch.Delete(service2)
got, ok = <-ch
if !ok {
t.Errorf("Unable to read from channel when expected")
}
expected = ServiceUpdate{Op: REMOVE, Service: service2}
if !apiequality.Semantic.DeepEqual(expected, got) {
t.Errorf("Expected %#v, Got %#v", expected, got)
}
<-ch
}
func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) {
@ -166,68 +136,40 @@ func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) {
watchResp: fakeWatch,
}
ch := make(chan EndpointsUpdate)
stopCh := make(chan struct{})
defer close(stopCh)
endpointsController := NewEndpointsController(lw, 30*time.Second, ch)
go endpointsController.Run(wait.NeverStop)
ch := make(chan struct{})
handler := newEpsHandler(t, nil, func() { ch <- struct{}{} })
endpointsConfig := newEndpointsConfig(lw, time.Minute)
endpointsConfig.RegisterHandler(handler)
go endpointsConfig.Run(stopCh)
// Add the first endpoints
handler.expected = []*api.Endpoints{endpoints1v1}
fakeWatch.Add(endpoints1v1)
got, ok := <-ch
if !ok {
t.Errorf("Unable to read from channel when expected")
}
expected := EndpointsUpdate{Op: ADD, Endpoints: endpoints1v1}
if !apiequality.Semantic.DeepEqual(expected, got) {
t.Errorf("Expected %#v; Got %#v", expected, got)
}
<-ch
// Add another endpoints
handler.expected = []*api.Endpoints{endpoints1v1, endpoints2}
fakeWatch.Add(endpoints2)
got, ok = <-ch
if !ok {
t.Errorf("Unable to read from channel when expected")
}
// Could be sorted either of these two ways:
expected = EndpointsUpdate{Op: ADD, Endpoints: endpoints2}
if !apiequality.Semantic.DeepEqual(expected, got) {
t.Errorf("Expected %#v, Got %#v", expected, got)
}
<-ch
// Modify endpoints1
handler.expected = []*api.Endpoints{endpoints1v2, endpoints2}
fakeWatch.Modify(endpoints1v2)
got, ok = <-ch
if !ok {
t.Errorf("Unable to read from channel when expected")
}
expected = EndpointsUpdate{Op: UPDATE, Endpoints: endpoints1v2}
if !apiequality.Semantic.DeepEqual(expected, got) {
t.Errorf("Expected %#v, Got %#v", expected, got)
}
<-ch
// Delete endpoints1
handler.expected = []*api.Endpoints{endpoints2}
fakeWatch.Delete(endpoints1v2)
got, ok = <-ch
if !ok {
t.Errorf("Unable to read from channel when expected")
}
expected = EndpointsUpdate{Op: REMOVE, Endpoints: endpoints1v2}
if !apiequality.Semantic.DeepEqual(expected, got) {
t.Errorf("Expected %#v, Got %#v", expected, got)
}
<-ch
// Delete endpoints2
handler.expected = []*api.Endpoints{}
fakeWatch.Delete(endpoints2)
got, ok = <-ch
if !ok {
t.Errorf("Unable to read from channel when expected")
}
expected = EndpointsUpdate{Op: REMOVE, Endpoints: endpoints2}
if !apiequality.Semantic.DeepEqual(expected, got) {
t.Errorf("Expected %#v, Got %#v", expected, got)
}
<-ch
}
type svcHandler struct {
@ -286,13 +228,6 @@ func TestInitialSync(t *testing.T) {
// Wait for both services and endpoints handler.
wg.Add(2)
svcConfig := NewServiceConfig()
epsConfig := NewEndpointsConfig()
svcHandler := newSvcHandler(t, []api.Service{*svc2, *svc1}, wg.Done)
svcConfig.RegisterHandler(svcHandler)
epsHandler := newEpsHandler(t, []*api.Endpoints{eps2, eps1}, wg.Done)
epsConfig.RegisterHandler(epsHandler)
// Setup fake api client.
fakeSvcWatch := watch.NewFake()
svcLW := fakeLW{
@ -305,8 +240,16 @@ func TestInitialSync(t *testing.T) {
watchResp: fakeEpsWatch,
}
svcConfig := newServiceConfig(svcLW, time.Minute)
epsConfig := newEndpointsConfig(epsLW, time.Minute)
svcHandler := newSvcHandler(t, []api.Service{*svc2, *svc1}, wg.Done)
svcConfig.RegisterHandler(svcHandler)
epsHandler := newEpsHandler(t, []*api.Endpoints{eps2, eps1}, wg.Done)
epsConfig.RegisterHandler(epsHandler)
stopCh := make(chan struct{})
defer close(stopCh)
newSourceAPI(svcLW, epsLW, time.Minute, svcConfig.Channel("one"), epsConfig.Channel("two"), stopCh)
go svcConfig.Run(stopCh)
go epsConfig.Run(stopCh)
wg.Wait()
}

View File

@ -17,40 +17,20 @@ limitations under the License.
package config
import (
"sync"
"fmt"
"time"
"github.com/davecgh/go-spew/spew"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/types"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/api"
listers "k8s.io/kubernetes/pkg/client/listers/core/internalversion"
"k8s.io/kubernetes/pkg/util/config"
)
// Operation is a type of operation of services or endpoints.
type Operation int
// These are the available operation types.
const (
ADD Operation = iota
UPDATE
REMOVE
SYNCED
)
// ServiceUpdate describes an operation of services, sent on the channel.
// You can add, update or remove single service by setting Op == ADD|UPDATE|REMOVE.
type ServiceUpdate struct {
Service *api.Service
Op Operation
}
// EndpointsUpdate describes an operation of endpoints, sent on the channel.
// You can add, update or remove single endpoints by setting Op == ADD|UPDATE|REMOVE.
type EndpointsUpdate struct {
Endpoints *api.Endpoints
Op Operation
}
// ServiceConfigHandler is an abstract interface of objects which receive update notifications for the set of services.
type ServiceConfigHandler interface {
// OnServiceUpdate gets called when a configuration has been changed by one of the sources.
@ -76,203 +56,204 @@ type EndpointsConfigHandler interface {
// EndpointsConfig tracks a set of endpoints configurations.
// It accepts "set", "add" and "remove" operations of endpoints via channels, and invokes registered handlers on change.
type EndpointsConfig struct {
mux *config.Mux
bcaster *config.Broadcaster
store *endpointsStore
informer cache.Controller
lister listers.EndpointsLister
handlers []EndpointsConfigHandler
// updates channel is used to trigger registered handlers.
updates chan struct{}
}
// NewEndpointsConfig creates a new EndpointsConfig.
// It immediately runs the created EndpointsConfig.
func NewEndpointsConfig() *EndpointsConfig {
// The updates channel is used to send interrupts to the Endpoints handler.
// It's buffered because we never want to block for as long as there is a
// pending interrupt, but don't want to drop them if the handler is doing
// work.
updates := make(chan struct{}, 1)
store := &endpointsStore{updates: updates, endpoints: make(map[string]map[types.NamespacedName]*api.Endpoints)}
mux := config.NewMux(store)
bcaster := config.NewBroadcaster()
go watchForUpdates(bcaster, store, updates)
return &EndpointsConfig{mux, bcaster, store}
func NewEndpointsConfig(c cache.Getter, period time.Duration) *EndpointsConfig {
endpointsLW := cache.NewListWatchFromClient(c, "endpoints", metav1.NamespaceAll, fields.Everything())
return newEndpointsConfig(endpointsLW, period)
}
func newEndpointsConfig(lw cache.ListerWatcher, period time.Duration) *EndpointsConfig {
result := &EndpointsConfig{}
store, informer := cache.NewIndexerInformer(
lw,
&api.Endpoints{},
period,
cache.ResourceEventHandlerFuncs{
AddFunc: result.handleAddEndpoints,
UpdateFunc: result.handleUpdateEndpoints,
DeleteFunc: result.handleDeleteEndpoints,
},
cache.Indexers{},
)
result.informer = informer
result.lister = listers.NewEndpointsLister(store)
return result
}
// RegisterHandler registers a handler which is called on every endpoints change.
func (c *EndpointsConfig) RegisterHandler(handler EndpointsConfigHandler) {
c.bcaster.Add(config.ListenerFunc(func(instance interface{}) {
glog.V(3).Infof("Calling handler.OnEndpointsUpdate()")
handler.OnEndpointsUpdate(instance.([]*api.Endpoints))
}))
c.handlers = append(c.handlers, handler)
}
// Channel returns a channel to which endpoints updates should be delivered.
func (c *EndpointsConfig) Channel(source string) chan EndpointsUpdate {
ch := c.mux.Channel(source)
endpointsCh := make(chan EndpointsUpdate)
// Run starts the underlying informer and goroutine responsible for calling
// registered handlers.
func (c *EndpointsConfig) Run(stopCh <-chan struct{}) {
// The updates channel is used to send interrupts to the Endpoints handler.
// It's buffered because we never want to block for as long as there is a
// pending interrupt, but don't want to drop them if the handler is doing
// work.
c.updates = make(chan struct{}, 1)
go c.informer.Run(stopCh)
if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
utilruntime.HandleError(fmt.Errorf("endpoint controller not synced"))
return
}
// We have synced informers. Now we can start delivering updates
// to the registered handler.
go func() {
for update := range endpointsCh {
ch <- update
for range c.updates {
endpoints, err := c.lister.List(labels.Everything())
if err != nil {
glog.Errorf("Error while listing endpoints from cache: %v", err)
// This will cause a retry (if there isn't any other trigger in-flight).
c.dispatchUpdate()
continue
}
if endpoints == nil {
endpoints = []*api.Endpoints{}
}
for i := range c.handlers {
glog.V(3).Infof("Calling handler.OnEndpointsUpdate()")
c.handlers[i].OnEndpointsUpdate(endpoints)
}
}
}()
return endpointsCh
// Close updates channel when stopCh is closed.
go func() {
<-stopCh
close(c.updates)
}()
}
// Config returns list of all endpoints from underlying store.
func (c *EndpointsConfig) Config() []api.Endpoints {
return c.store.MergedState().([]api.Endpoints)
func (c *EndpointsConfig) handleAddEndpoints(_ interface{}) {
c.dispatchUpdate()
}
type endpointsStore struct {
endpointLock sync.RWMutex
endpoints map[string]map[types.NamespacedName]*api.Endpoints
synced bool
updates chan<- struct{}
func (c *EndpointsConfig) handleUpdateEndpoints(_, _ interface{}) {
c.dispatchUpdate()
}
func (s *endpointsStore) Merge(source string, change interface{}) error {
s.endpointLock.Lock()
endpoints := s.endpoints[source]
if endpoints == nil {
endpoints = make(map[types.NamespacedName]*api.Endpoints)
}
update := change.(EndpointsUpdate)
switch update.Op {
case ADD, UPDATE:
glog.V(5).Infof("Adding new endpoint from source %s : %s", source, spew.Sdump(update.Endpoints))
name := types.NamespacedName{Namespace: update.Endpoints.Namespace, Name: update.Endpoints.Name}
endpoints[name] = update.Endpoints
case REMOVE:
glog.V(5).Infof("Removing an endpoint %s", spew.Sdump(update.Endpoints))
name := types.NamespacedName{Namespace: update.Endpoints.Namespace, Name: update.Endpoints.Name}
delete(endpoints, name)
case SYNCED:
s.synced = true
default:
glog.V(4).Infof("Received invalid update type: %s", spew.Sdump(update))
}
s.endpoints[source] = endpoints
synced := s.synced
s.endpointLock.Unlock()
if s.updates != nil && synced {
func (c *EndpointsConfig) handleDeleteEndpoints(_ interface{}) {
c.dispatchUpdate()
}
func (c *EndpointsConfig) dispatchUpdate() {
select {
case s.updates <- struct{}{}:
case c.updates <- struct{}{}:
default:
glog.V(4).Infof("Endpoints handler already has a pending interrupt.")
}
}
return nil
}
func (s *endpointsStore) MergedState() interface{} {
s.endpointLock.RLock()
defer s.endpointLock.RUnlock()
endpoints := make([]*api.Endpoints, 0)
for _, sourceEndpoints := range s.endpoints {
for _, value := range sourceEndpoints {
endpoints = append(endpoints, value)
}
}
return endpoints
}
// ServiceConfig tracks a set of service configurations.
// It accepts "set", "add" and "remove" operations of services via channels, and invokes registered handlers on change.
type ServiceConfig struct {
mux *config.Mux
bcaster *config.Broadcaster
store *serviceStore
informer cache.Controller
lister listers.ServiceLister
handlers []ServiceConfigHandler
// updates channel is used to trigger registered handlers
updates chan struct{}
}
// NewServiceConfig creates a new ServiceConfig.
// It immediately runs the created ServiceConfig.
func NewServiceConfig() *ServiceConfig {
// The updates channel is used to send interrupts to the Services handler.
// It's buffered because we never want to block for as long as there is a
// pending interrupt, but don't want to drop them if the handler is doing
// work.
updates := make(chan struct{}, 1)
store := &serviceStore{updates: updates, services: make(map[string]map[types.NamespacedName]*api.Service)}
mux := config.NewMux(store)
bcaster := config.NewBroadcaster()
go watchForUpdates(bcaster, store, updates)
return &ServiceConfig{mux, bcaster, store}
func NewServiceConfig(c cache.Getter, period time.Duration) *ServiceConfig {
servicesLW := cache.NewListWatchFromClient(c, "services", metav1.NamespaceAll, fields.Everything())
return newServiceConfig(servicesLW, period)
}
func newServiceConfig(lw cache.ListerWatcher, period time.Duration) *ServiceConfig {
result := &ServiceConfig{}
store, informer := cache.NewIndexerInformer(
lw,
&api.Service{},
period,
cache.ResourceEventHandlerFuncs{
AddFunc: result.handleAddService,
UpdateFunc: result.handleUpdateService,
DeleteFunc: result.handleDeleteService,
},
cache.Indexers{},
)
result.informer = informer
result.lister = listers.NewServiceLister(store)
return result
}
// RegisterHandler registers a handler which is called on every services change.
func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) {
c.bcaster.Add(config.ListenerFunc(func(instance interface{}) {
glog.V(3).Infof("Calling handler.OnServiceUpdate()")
handler.OnServiceUpdate(instance.([]api.Service))
}))
c.handlers = append(c.handlers, handler)
}
// Channel returns a channel to which services updates should be delivered.
func (c *ServiceConfig) Channel(source string) chan ServiceUpdate {
ch := c.mux.Channel(source)
serviceCh := make(chan ServiceUpdate)
// Run starts the underlying informer and goroutine responsible for calling
// registered handlers.
func (c *ServiceConfig) Run(stopCh <-chan struct{}) {
// The updates channel is used to send interrupts to the Services handler.
// It's buffered because we never want to block for as long as there is a
// pending interrupt, but don't want to drop them if the handler is doing
// work.
c.updates = make(chan struct{}, 1)
go c.informer.Run(stopCh)
if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
utilruntime.HandleError(fmt.Errorf("service controller not synced"))
return
}
// We hanve synced informers. Now we can start delivering updates
// to the registered handler.
go func() {
for update := range serviceCh {
ch <- update
for range c.updates {
services, err := c.lister.List(labels.Everything())
if err != nil {
glog.Errorf("Error while listing services from cache: %v", err)
// This will cause a retry (if there isnt' any other trigger in-flight).
c.dispatchUpdate()
continue
}
svcs := make([]api.Service, 0, len(services))
for i := range services {
svcs = append(svcs, *services[i])
}
for i := range c.handlers {
glog.V(3).Infof("Calling handler.OnServiceUpdate()")
c.handlers[i].OnServiceUpdate(svcs)
}
}
}()
return serviceCh
// Close updates channel when stopCh is closed.
go func() {
<-stopCh
close(c.updates)
}()
}
// Config returns list of all services from underlying store.
func (c *ServiceConfig) Config() []api.Service {
return c.store.MergedState().([]api.Service)
func (c *ServiceConfig) handleAddService(_ interface{}) {
c.dispatchUpdate()
}
type serviceStore struct {
serviceLock sync.RWMutex
services map[string]map[types.NamespacedName]*api.Service
synced bool
updates chan<- struct{}
func (c *ServiceConfig) handleUpdateService(_, _ interface{}) {
c.dispatchUpdate()
}
func (s *serviceStore) Merge(source string, change interface{}) error {
s.serviceLock.Lock()
services := s.services[source]
if services == nil {
services = make(map[types.NamespacedName]*api.Service)
}
update := change.(ServiceUpdate)
switch update.Op {
case ADD, UPDATE:
glog.V(5).Infof("Adding new service from source %s : %s", source, spew.Sdump(update.Service))
name := types.NamespacedName{Namespace: update.Service.Namespace, Name: update.Service.Name}
services[name] = update.Service
case REMOVE:
glog.V(5).Infof("Removing a service %s", spew.Sdump(update.Service))
name := types.NamespacedName{Namespace: update.Service.Namespace, Name: update.Service.Name}
delete(services, name)
case SYNCED:
s.synced = true
default:
glog.V(4).Infof("Received invalid update type: %s", spew.Sdump(update))
}
s.services[source] = services
synced := s.synced
s.serviceLock.Unlock()
if s.updates != nil && synced {
func (c *ServiceConfig) handleDeleteService(_ interface{}) {
c.dispatchUpdate()
}
func (c *ServiceConfig) dispatchUpdate() {
select {
case s.updates <- struct{}{}:
case c.updates <- struct{}{}:
default:
glog.V(4).Infof("Service handler already has a pending interrupt.")
glog.V(4).Infof("Service handler alread has a pending interrupt.")
}
}
return nil
}
func (s *serviceStore) MergedState() interface{} {
s.serviceLock.RLock()
defer s.serviceLock.RUnlock()
services := make([]api.Service, 0)
for _, sourceServices := range s.services {
for _, value := range sourceServices {
services = append(services, *value)
}
}
return services
}
// watchForUpdates invokes bcaster.Notify() with the latest version of an object

View File

@ -24,19 +24,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/kubernetes/pkg/api"
)
const TomcatPort int = 8080
const TomcatName = "tomcat"
var TomcatEndpoints = map[string]string{"c0": "1.1.1.1:18080", "c1": "2.2.2.2:18081"}
const MysqlPort int = 3306
const MysqlName = "mysql"
var MysqlEndpoints = map[string]string{"c0": "1.1.1.1:13306", "c3": "2.2.2.2:13306"}
type sortedServices []api.Service
func (s sortedServices) Len() int {
@ -129,197 +120,198 @@ func (h *EndpointsHandlerMock) ValidateEndpoints(t *testing.T, expectedEndpoints
}
}
func CreateServiceUpdate(op Operation, service *api.Service) ServiceUpdate {
return ServiceUpdate{Op: op, Service: service}
}
func CreateEndpointsUpdate(op Operation, endpoints *api.Endpoints) EndpointsUpdate {
return EndpointsUpdate{Op: op, Endpoints: endpoints}
}
func TestNewServiceAddedAndNotified(t *testing.T) {
config := NewServiceConfig()
config.store.synced = true
channel := config.Channel("one")
fakeWatch := watch.NewFake()
lw := fakeLW{
listResp: &api.ServiceList{Items: []api.Service{}},
watchResp: fakeWatch,
}
stopCh := make(chan struct{})
defer close(stopCh)
config := newServiceConfig(lw, time.Minute)
handler := NewServiceHandlerMock()
config.RegisterHandler(handler)
serviceUpdate := CreateServiceUpdate(ADD, &api.Service{
go config.Run(stopCh)
service := &api.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}},
})
channel <- serviceUpdate
handler.ValidateServices(t, []api.Service{*serviceUpdate.Service})
}
fakeWatch.Add(service)
handler.ValidateServices(t, []api.Service{*service})
}
func TestServiceAddedRemovedSetAndNotified(t *testing.T) {
config := NewServiceConfig()
config.store.synced = true
channel := config.Channel("one")
handler := NewServiceHandlerMock()
config.RegisterHandler(handler)
serviceUpdate := CreateServiceUpdate(ADD, &api.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}},
})
channel <- serviceUpdate
handler.ValidateServices(t, []api.Service{*serviceUpdate.Service})
serviceUpdate2 := CreateServiceUpdate(ADD, &api.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 20}}},
})
channel <- serviceUpdate2
services := []api.Service{*serviceUpdate2.Service, *serviceUpdate.Service}
handler.ValidateServices(t, services)
serviceUpdate3 := CreateServiceUpdate(REMOVE, &api.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
})
channel <- serviceUpdate3
services = []api.Service{*serviceUpdate2.Service}
handler.ValidateServices(t, services)
}
func TestNewMultipleSourcesServicesAddedAndNotified(t *testing.T) {
config := NewServiceConfig()
config.store.synced = true
channelOne := config.Channel("one")
channelTwo := config.Channel("two")
if channelOne == channelTwo {
t.Error("Same channel handed back for one and two")
fakeWatch := watch.NewFake()
lw := fakeLW{
listResp: &api.ServiceList{Items: []api.Service{}},
watchResp: fakeWatch,
}
stopCh := make(chan struct{})
defer close(stopCh)
config := newServiceConfig(lw, time.Minute)
handler := NewServiceHandlerMock()
config.RegisterHandler(handler)
serviceUpdate1 := CreateServiceUpdate(ADD, &api.Service{
go config.Run(stopCh)
service1 := &api.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}},
})
serviceUpdate2 := CreateServiceUpdate(ADD, &api.Service{
}
fakeWatch.Add(service1)
handler.ValidateServices(t, []api.Service{*service1})
service2 := &api.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 20}}},
})
channelOne <- serviceUpdate1
channelTwo <- serviceUpdate2
services := []api.Service{*serviceUpdate2.Service, *serviceUpdate1.Service}
}
fakeWatch.Add(service2)
services := []api.Service{*service2, *service1}
handler.ValidateServices(t, services)
fakeWatch.Delete(service1)
services = []api.Service{*service2}
handler.ValidateServices(t, services)
}
func TestNewMultipleSourcesServicesMultipleHandlersAddedAndNotified(t *testing.T) {
config := NewServiceConfig()
config.store.synced = true
channelOne := config.Channel("one")
channelTwo := config.Channel("two")
func TestNewServicesMultipleHandlersAddedAndNotified(t *testing.T) {
fakeWatch := watch.NewFake()
lw := fakeLW{
listResp: &api.ServiceList{Items: []api.Service{}},
watchResp: fakeWatch,
}
stopCh := make(chan struct{})
defer close(stopCh)
config := newServiceConfig(lw, time.Minute)
handler := NewServiceHandlerMock()
handler2 := NewServiceHandlerMock()
config.RegisterHandler(handler)
config.RegisterHandler(handler2)
serviceUpdate1 := CreateServiceUpdate(ADD, &api.Service{
go config.Run(stopCh)
service1 := &api.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}},
})
serviceUpdate2 := CreateServiceUpdate(ADD, &api.Service{
}
service2 := &api.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 20}}},
})
channelOne <- serviceUpdate1
channelTwo <- serviceUpdate2
services := []api.Service{*serviceUpdate2.Service, *serviceUpdate1.Service}
}
fakeWatch.Add(service1)
fakeWatch.Add(service2)
services := []api.Service{*service2, *service1}
handler.ValidateServices(t, services)
handler2.ValidateServices(t, services)
}
func TestNewMultipleSourcesEndpointsMultipleHandlersAddedAndNotified(t *testing.T) {
config := NewEndpointsConfig()
config.store.synced = true
channelOne := config.Channel("one")
channelTwo := config.Channel("two")
func TestNewEndpointsMultipleHandlersAddedAndNotified(t *testing.T) {
fakeWatch := watch.NewFake()
lw := fakeLW{
listResp: &api.EndpointsList{Items: []api.Endpoints{}},
watchResp: fakeWatch,
}
stopCh := make(chan struct{})
defer close(stopCh)
config := newEndpointsConfig(lw, time.Minute)
handler := NewEndpointsHandlerMock()
handler2 := NewEndpointsHandlerMock()
config.RegisterHandler(handler)
config.RegisterHandler(handler2)
endpointsUpdate1 := CreateEndpointsUpdate(ADD, &api.Endpoints{
go config.Run(stopCh)
endpoints1 := &api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.1.1.1"}, {IP: "2.2.2.2"}},
Ports: []api.EndpointPort{{Port: 80}},
}},
})
endpointsUpdate2 := CreateEndpointsUpdate(ADD, &api.Endpoints{
}
endpoints2 := &api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "3.3.3.3"}, {IP: "4.4.4.4"}},
Ports: []api.EndpointPort{{Port: 80}},
}},
})
channelOne <- endpointsUpdate1
channelTwo <- endpointsUpdate2
}
fakeWatch.Add(endpoints1)
fakeWatch.Add(endpoints2)
endpoints := []*api.Endpoints{endpointsUpdate2.Endpoints, endpointsUpdate1.Endpoints}
endpoints := []*api.Endpoints{endpoints2, endpoints1}
handler.ValidateEndpoints(t, endpoints)
handler2.ValidateEndpoints(t, endpoints)
}
func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *testing.T) {
config := NewEndpointsConfig()
config.store.synced = true
channelOne := config.Channel("one")
channelTwo := config.Channel("two")
func TestNewEndpointsMultipleHandlersAddRemoveSetAndNotified(t *testing.T) {
fakeWatch := watch.NewFake()
lw := fakeLW{
listResp: &api.EndpointsList{Items: []api.Endpoints{}},
watchResp: fakeWatch,
}
stopCh := make(chan struct{})
defer close(stopCh)
config := newEndpointsConfig(lw, time.Minute)
handler := NewEndpointsHandlerMock()
handler2 := NewEndpointsHandlerMock()
config.RegisterHandler(handler)
config.RegisterHandler(handler2)
endpointsUpdate1 := CreateEndpointsUpdate(ADD, &api.Endpoints{
go config.Run(stopCh)
endpoints1 := &api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.1.1.1"}, {IP: "2.2.2.2"}},
Ports: []api.EndpointPort{{Port: 80}},
}},
})
endpointsUpdate2 := CreateEndpointsUpdate(ADD, &api.Endpoints{
}
endpoints2 := &api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "3.3.3.3"}, {IP: "4.4.4.4"}},
Ports: []api.EndpointPort{{Port: 80}},
}},
})
channelOne <- endpointsUpdate1
channelTwo <- endpointsUpdate2
}
fakeWatch.Add(endpoints1)
fakeWatch.Add(endpoints2)
endpoints := []*api.Endpoints{endpointsUpdate2.Endpoints, endpointsUpdate1.Endpoints}
endpoints := []*api.Endpoints{endpoints2, endpoints1}
handler.ValidateEndpoints(t, endpoints)
handler2.ValidateEndpoints(t, endpoints)
// Add one more
endpointsUpdate3 := CreateEndpointsUpdate(ADD, &api.Endpoints{
endpoints3 := &api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foobar"},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "5.5.5.5"}, {IP: "6.6.6.6"}},
Ports: []api.EndpointPort{{Port: 80}},
}},
})
channelTwo <- endpointsUpdate3
endpoints = []*api.Endpoints{endpointsUpdate2.Endpoints, endpointsUpdate1.Endpoints, endpointsUpdate3.Endpoints}
}
fakeWatch.Add(endpoints3)
endpoints = []*api.Endpoints{endpoints2, endpoints1, endpoints3}
handler.ValidateEndpoints(t, endpoints)
handler2.ValidateEndpoints(t, endpoints)
// Update the "foo" service with new endpoints
endpointsUpdate1 = CreateEndpointsUpdate(ADD, &api.Endpoints{
endpoints1v2 := &api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "7.7.7.7"}},
Ports: []api.EndpointPort{{Port: 80}},
}},
})
channelOne <- endpointsUpdate1
endpoints = []*api.Endpoints{endpointsUpdate2.Endpoints, endpointsUpdate1.Endpoints, endpointsUpdate3.Endpoints}
}
fakeWatch.Modify(endpoints1v2)
endpoints = []*api.Endpoints{endpoints2, endpoints1v2, endpoints3}
handler.ValidateEndpoints(t, endpoints)
handler2.ValidateEndpoints(t, endpoints)
// Remove "bar" service
endpointsUpdate2 = CreateEndpointsUpdate(REMOVE, &api.Endpoints{ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"}})
channelTwo <- endpointsUpdate2
endpoints = []*api.Endpoints{endpointsUpdate1.Endpoints, endpointsUpdate3.Endpoints}
// Remove "bar" endpoints
fakeWatch.Delete(endpoints2)
endpoints = []*api.Endpoints{endpoints1v2, endpoints3}
handler.ValidateEndpoints(t, endpoints)
handler2.ValidateEndpoints(t, endpoints)
}