Simplify proxy config for Services by removing Mux.

pull/6/head
Wojciech Tyczynski 2017-03-17 15:15:51 +01:00
parent 596527dafa
commit 7ce368ccd2
8 changed files with 157 additions and 319 deletions

View File

@ -305,19 +305,14 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
// Note: RegisterHandler() calls need to happen before creation of Sources because sources
// only notify on changes, and the initial update (on process start) may be lost if no handlers
// are registered yet.
serviceConfig := proxyconfig.NewServiceConfig()
serviceConfig := proxyconfig.NewServiceConfig(client.Core().RESTClient(), config.ConfigSyncPeriod)
serviceConfig.RegisterHandler(proxier)
go serviceConfig.Run(wait.NeverStop)
endpointsConfig := proxyconfig.NewEndpointsConfig(client.Core().RESTClient(), config.ConfigSyncPeriod)
endpointsConfig.RegisterHandler(endpointsHandler)
go endpointsConfig.Run(wait.NeverStop)
proxyconfig.NewSourceAPI(
client.Core().RESTClient(),
config.ConfigSyncPeriod,
serviceConfig.Channel("api"),
)
config.NodeRef = &clientv1.ObjectReference{
Kind: "Node",
Name: hostname,

View File

@ -136,7 +136,7 @@ func main() {
iptInterface := fakeiptables.NewFake()
serviceConfig := proxyconfig.NewServiceConfig()
serviceConfig := proxyconfig.NewServiceConfig(internalClientset.Core().RESTClient(), 15*time.Minute)
serviceConfig.RegisterHandler(&kubemark.FakeProxyHandler{})
endpointsConfig := proxyconfig.NewEndpointsConfig(internalClientset.Core().RESTClient(), 15*time.Minute)

View File

@ -17,8 +17,6 @@ limitations under the License.
package kubemark
import (
"time"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
@ -74,11 +72,7 @@ func NewHollowProxyOrDie(
}
go endpointsConfig.Run(wait.NeverStop)
proxyconfig.NewSourceAPI(
client.Core().RESTClient(),
15*time.Minute,
serviceConfig.Channel("api"),
)
go serviceConfig.Run(wait.NeverStop)
hollowProxy, err := proxyapp.NewProxyServer(client, eventClient, config, iptInterface, &FakeProxier{}, broadcaster, recorder, nil, "fake")
if err != nil {

View File

@ -11,7 +11,6 @@ load(
go_library(
name = "go_default_library",
srcs = [
"api.go",
"config.go",
"doc.go",
],
@ -20,14 +19,11 @@ go_library(
"//pkg/api:go_default_library",
"//pkg/client/listers/core/internalversion:go_default_library",
"//pkg/util/config:go_default_library",
"//vendor:github.com/davecgh/go-spew/spew",
"//vendor:github.com/golang/glog",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/fields",
"//vendor:k8s.io/apimachinery/pkg/labels",
"//vendor:k8s.io/apimachinery/pkg/types",
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/client-go/tools/cache",
],
)
@ -42,7 +38,6 @@ go_test(
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//vendor:k8s.io/apimachinery/pkg/api/equality",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/wait",

View File

@ -1,109 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
"fmt"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/api"
)
// NewSourceAPI creates config source that watches for changes to the services and endpoints.
func NewSourceAPI(c cache.Getter, period time.Duration, servicesChan chan<- ServiceUpdate) {
servicesLW := cache.NewListWatchFromClient(c, "services", metav1.NamespaceAll, fields.Everything())
newSourceAPI(servicesLW, period, servicesChan, wait.NeverStop)
}
func newSourceAPI(
servicesLW cache.ListerWatcher,
period time.Duration,
servicesChan chan<- ServiceUpdate,
stopCh <-chan struct{}) {
serviceController := NewServiceController(servicesLW, period, servicesChan)
go serviceController.Run(stopCh)
if !cache.WaitForCacheSync(stopCh, serviceController.HasSynced) {
utilruntime.HandleError(fmt.Errorf("source controllers not synced"))
return
}
servicesChan <- ServiceUpdate{Op: SYNCED}
}
func sendAddService(servicesChan chan<- ServiceUpdate) func(obj interface{}) {
return func(obj interface{}) {
service, ok := obj.(*api.Service)
if !ok {
utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Service: %v", obj))
return
}
servicesChan <- ServiceUpdate{Op: ADD, Service: service}
}
}
func sendUpdateService(servicesChan chan<- ServiceUpdate) func(oldObj, newObj interface{}) {
return func(_, newObj interface{}) {
service, ok := newObj.(*api.Service)
if !ok {
utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Service: %v", newObj))
return
}
servicesChan <- ServiceUpdate{Op: UPDATE, Service: service}
}
}
func sendDeleteService(servicesChan chan<- ServiceUpdate) func(obj interface{}) {
return func(obj interface{}) {
var service *api.Service
switch t := obj.(type) {
case *api.Service:
service = t
case cache.DeletedFinalStateUnknown:
var ok bool
service, ok = t.Obj.(*api.Service)
if !ok {
utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Service: %v", t.Obj))
return
}
default:
utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Service: %v", t))
return
}
servicesChan <- ServiceUpdate{Op: REMOVE, Service: service}
}
}
// NewServiceController creates a controller that is watching services and sending
// updates into ServiceUpdate channel.
func NewServiceController(lw cache.ListerWatcher, period time.Duration, ch chan<- ServiceUpdate) cache.Controller {
_, serviceController := cache.NewInformer(
lw,
&api.Service{},
period,
cache.ResourceEventHandlerFuncs{
AddFunc: sendAddService(ch),
UpdateFunc: sendUpdateService(ch),
DeleteFunc: sendDeleteService(ch),
},
)
return serviceController
}

View File

@ -23,10 +23,8 @@ import (
"testing"
"time"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/api"
@ -65,68 +63,40 @@ func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) {
watchResp: fakeWatch,
}
ch := make(chan ServiceUpdate)
stopCh := make(chan struct{})
defer close(stopCh)
serviceController := NewServiceController(lw, 30*time.Second, ch)
go serviceController.Run(wait.NeverStop)
ch := make(chan struct{})
handler := newSvcHandler(t, nil, func() { ch <- struct{}{} })
serviceConfig := newServiceConfig(lw, time.Minute)
serviceConfig.RegisterHandler(handler)
go serviceConfig.Run(stopCh)
// Add the first service
handler.expected = []api.Service{*service1v1}
fakeWatch.Add(service1v1)
got, ok := <-ch
if !ok {
t.Errorf("Unable to read from channel when expected")
}
expected := ServiceUpdate{Op: ADD, Service: service1v1}
if !apiequality.Semantic.DeepEqual(expected, got) {
t.Errorf("Expected %#v; Got %#v", expected, got)
}
<-ch
// Add another service
handler.expected = []api.Service{*service1v1, *service2}
fakeWatch.Add(service2)
got, ok = <-ch
if !ok {
t.Errorf("Unable to read from channel when expected")
}
// Could be sorted either of these two ways:
expected = ServiceUpdate{Op: ADD, Service: service2}
if !apiequality.Semantic.DeepEqual(expected, got) {
t.Errorf("Expected %#v, Got %#v", expected, got)
}
<-ch
// Modify service1
handler.expected = []api.Service{*service1v2, *service2}
fakeWatch.Modify(service1v2)
got, ok = <-ch
if !ok {
t.Errorf("Unable to read from channel when expected")
}
expected = ServiceUpdate{Op: UPDATE, Service: service1v2}
if !apiequality.Semantic.DeepEqual(expected, got) {
t.Errorf("Expected %#v, Got %#v", expected, got)
}
<-ch
// Delete service1
handler.expected = []api.Service{*service2}
fakeWatch.Delete(service1v2)
got, ok = <-ch
if !ok {
t.Errorf("Unable to read from channel when expected")
}
expected = ServiceUpdate{Op: REMOVE, Service: service1v2}
if !apiequality.Semantic.DeepEqual(expected, got) {
t.Errorf("Expected %#v, Got %#v", expected, got)
}
<-ch
// Delete service2
handler.expected = []api.Service{}
fakeWatch.Delete(service2)
got, ok = <-ch
if !ok {
t.Errorf("Unable to read from channel when expected")
}
expected = ServiceUpdate{Op: REMOVE, Service: service2}
if !apiequality.Semantic.DeepEqual(expected, got) {
t.Errorf("Expected %#v, Got %#v", expected, got)
}
<-ch
}
func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) {
@ -270,7 +240,7 @@ func TestInitialSync(t *testing.T) {
watchResp: fakeEpsWatch,
}
svcConfig := NewServiceConfig()
svcConfig := newServiceConfig(svcLW, time.Minute)
epsConfig := newEndpointsConfig(epsLW, time.Minute)
svcHandler := newSvcHandler(t, []api.Service{*svc2, *svc1}, wg.Done)
svcConfig.RegisterHandler(svcHandler)
@ -279,7 +249,7 @@ func TestInitialSync(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
go svcConfig.Run(stopCh)
go epsConfig.Run(stopCh)
newSourceAPI(svcLW, time.Minute, svcConfig.Channel("one"), stopCh)
wg.Wait()
}

View File

@ -18,15 +18,12 @@ package config
import (
"fmt"
"sync"
"time"
"github.com/davecgh/go-spew/spew"
"github.com/golang/glog"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/api"
@ -183,102 +180,101 @@ func (c *EndpointsConfig) dispatchUpdate() {
// ServiceConfig tracks a set of service configurations.
// It accepts "set", "add" and "remove" operations of services via channels, and invokes registered handlers on change.
type ServiceConfig struct {
mux *config.Mux
bcaster *config.Broadcaster
store *serviceStore
informer cache.Controller
lister listers.ServiceLister
handlers []ServiceConfigHandler
// updates channel is used to trigger registered handlers
updates chan struct{}
}
// NewServiceConfig creates a new ServiceConfig.
// It immediately runs the created ServiceConfig.
func NewServiceConfig() *ServiceConfig {
// The updates channel is used to send interrupts to the Services handler.
// It's buffered because we never want to block for as long as there is a
// pending interrupt, but don't want to drop them if the handler is doing
// work.
updates := make(chan struct{}, 1)
store := &serviceStore{updates: updates, services: make(map[string]map[types.NamespacedName]*api.Service)}
mux := config.NewMux(store)
bcaster := config.NewBroadcaster()
go watchForUpdates(bcaster, store, updates)
return &ServiceConfig{mux, bcaster, store}
func NewServiceConfig(c cache.Getter, period time.Duration) *ServiceConfig {
servicesLW := cache.NewListWatchFromClient(c, "services", metav1.NamespaceAll, fields.Everything())
return newServiceConfig(servicesLW, period)
}
func newServiceConfig(lw cache.ListerWatcher, period time.Duration) *ServiceConfig {
result := &ServiceConfig{}
store, informer := cache.NewIndexerInformer(
lw,
&api.Service{},
period,
cache.ResourceEventHandlerFuncs{
AddFunc: result.handleAddService,
UpdateFunc: result.handleUpdateService,
DeleteFunc: result.handleDeleteService,
},
cache.Indexers{},
)
result.informer = informer
result.lister = listers.NewServiceLister(store)
return result
}
// RegisterHandler registers a handler which is called on every services change.
func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) {
c.bcaster.Add(config.ListenerFunc(func(instance interface{}) {
glog.V(3).Infof("Calling handler.OnServiceUpdate()")
handler.OnServiceUpdate(instance.([]api.Service))
}))
c.handlers = append(c.handlers, handler)
}
// Channel returns a channel to which services updates should be delivered.
func (c *ServiceConfig) Channel(source string) chan ServiceUpdate {
ch := c.mux.Channel(source)
serviceCh := make(chan ServiceUpdate)
func (c *ServiceConfig) Run(stopCh <-chan struct{}) {
// The updates channel is used to send interrupts to the Services handler.
// It's buffered because we never want to block for as long as there is a
// pending interrupt, but don't want to drop them if the handler is doing
// work.
c.updates = make(chan struct{}, 1)
go c.informer.Run(stopCh)
if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
utilruntime.HandleError(fmt.Errorf("service controller not synced"))
return
}
// We hanve synced informers. Now we can start delivering updates
// to the registered handler.
go func() {
for update := range serviceCh {
ch <- update
for range c.updates {
services, err := c.lister.List(labels.Everything())
if err != nil {
glog.Errorf("Error while listing services from cache: %v", err)
// This will cause a retry (if there isnt' any other trigger in-flight).
c.dispatchUpdate()
continue
}
svcs := make([]api.Service, 0, len(services))
for i := range services {
svcs = append(svcs, *services[i])
}
for i := range c.handlers {
glog.V(3).Infof("Calling handler.OnServiceUpdate()")
c.handlers[i].OnServiceUpdate(svcs)
}
}
}()
return serviceCh
// Close updates channel when stopCh is closed.
go func() {
<-stopCh
close(c.updates)
}()
}
// Config returns list of all services from underlying store.
func (c *ServiceConfig) Config() []api.Service {
return c.store.MergedState().([]api.Service)
func (c *ServiceConfig) handleAddService(_ interface{}) {
c.dispatchUpdate()
}
type serviceStore struct {
serviceLock sync.RWMutex
services map[string]map[types.NamespacedName]*api.Service
synced bool
updates chan<- struct{}
func (c *ServiceConfig) handleUpdateService(_, _ interface{}) {
c.dispatchUpdate()
}
func (s *serviceStore) Merge(source string, change interface{}) error {
s.serviceLock.Lock()
services := s.services[source]
if services == nil {
services = make(map[types.NamespacedName]*api.Service)
}
update := change.(ServiceUpdate)
switch update.Op {
case ADD, UPDATE:
glog.V(5).Infof("Adding new service from source %s : %s", source, spew.Sdump(update.Service))
name := types.NamespacedName{Namespace: update.Service.Namespace, Name: update.Service.Name}
services[name] = update.Service
case REMOVE:
glog.V(5).Infof("Removing a service %s", spew.Sdump(update.Service))
name := types.NamespacedName{Namespace: update.Service.Namespace, Name: update.Service.Name}
delete(services, name)
case SYNCED:
s.synced = true
func (c *ServiceConfig) handleDeleteService(_ interface{}) {
c.dispatchUpdate()
}
func (c *ServiceConfig) dispatchUpdate() {
select {
case c.updates <- struct{}{}:
default:
glog.V(4).Infof("Received invalid update type: %s", spew.Sdump(update))
glog.V(4).Infof("Service handler alread has a pending interrupt.")
}
s.services[source] = services
synced := s.synced
s.serviceLock.Unlock()
if s.updates != nil && synced {
select {
case s.updates <- struct{}{}:
default:
glog.V(4).Infof("Service handler already has a pending interrupt.")
}
}
return nil
}
func (s *serviceStore) MergedState() interface{} {
s.serviceLock.RLock()
defer s.serviceLock.RUnlock()
services := make([]api.Service, 0)
for _, sourceServices := range s.services {
for _, value := range sourceServices {
services = append(services, *value)
}
}
return services
}
// watchForUpdates invokes bcaster.Notify() with the latest version of an object

View File

@ -139,92 +139,89 @@ func CreateEndpointsUpdate(op Operation, endpoints *api.Endpoints) EndpointsUpda
}
func TestNewServiceAddedAndNotified(t *testing.T) {
config := NewServiceConfig()
config.store.synced = true
channel := config.Channel("one")
fakeWatch := watch.NewFake()
lw := fakeLW{
listResp: &api.ServiceList{Items: []api.Service{}},
watchResp: fakeWatch,
}
stopCh := make(chan struct{})
defer close(stopCh)
config := newServiceConfig(lw, time.Minute)
handler := NewServiceHandlerMock()
config.RegisterHandler(handler)
serviceUpdate := CreateServiceUpdate(ADD, &api.Service{
go config.Run(stopCh)
service := &api.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}},
})
channel <- serviceUpdate
handler.ValidateServices(t, []api.Service{*serviceUpdate.Service})
}
fakeWatch.Add(service)
handler.ValidateServices(t, []api.Service{*service})
}
func TestServiceAddedRemovedSetAndNotified(t *testing.T) {
config := NewServiceConfig()
config.store.synced = true
channel := config.Channel("one")
handler := NewServiceHandlerMock()
config.RegisterHandler(handler)
serviceUpdate := CreateServiceUpdate(ADD, &api.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}},
})
channel <- serviceUpdate
handler.ValidateServices(t, []api.Service{*serviceUpdate.Service})
serviceUpdate2 := CreateServiceUpdate(ADD, &api.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 20}}},
})
channel <- serviceUpdate2
services := []api.Service{*serviceUpdate2.Service, *serviceUpdate.Service}
handler.ValidateServices(t, services)
serviceUpdate3 := CreateServiceUpdate(REMOVE, &api.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
})
channel <- serviceUpdate3
services = []api.Service{*serviceUpdate2.Service}
handler.ValidateServices(t, services)
}
func TestNewMultipleSourcesServicesAddedAndNotified(t *testing.T) {
config := NewServiceConfig()
config.store.synced = true
channelOne := config.Channel("one")
channelTwo := config.Channel("two")
if channelOne == channelTwo {
t.Error("Same channel handed back for one and two")
fakeWatch := watch.NewFake()
lw := fakeLW{
listResp: &api.ServiceList{Items: []api.Service{}},
watchResp: fakeWatch,
}
stopCh := make(chan struct{})
defer close(stopCh)
config := newServiceConfig(lw, time.Minute)
handler := NewServiceHandlerMock()
config.RegisterHandler(handler)
serviceUpdate1 := CreateServiceUpdate(ADD, &api.Service{
go config.Run(stopCh)
service1 := &api.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}},
})
serviceUpdate2 := CreateServiceUpdate(ADD, &api.Service{
}
fakeWatch.Add(service1)
handler.ValidateServices(t, []api.Service{*service1})
service2 := &api.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 20}}},
})
channelOne <- serviceUpdate1
channelTwo <- serviceUpdate2
services := []api.Service{*serviceUpdate2.Service, *serviceUpdate1.Service}
}
fakeWatch.Add(service2)
services := []api.Service{*service2, *service1}
handler.ValidateServices(t, services)
fakeWatch.Delete(service1)
services = []api.Service{*service2}
handler.ValidateServices(t, services)
}
func TestNewMultipleSourcesServicesMultipleHandlersAddedAndNotified(t *testing.T) {
config := NewServiceConfig()
config.store.synced = true
channelOne := config.Channel("one")
channelTwo := config.Channel("two")
func TestNewServicesMultipleHandlersAddedAndNotified(t *testing.T) {
fakeWatch := watch.NewFake()
lw := fakeLW{
listResp: &api.ServiceList{Items: []api.Service{}},
watchResp: fakeWatch,
}
stopCh := make(chan struct{})
defer close(stopCh)
config := newServiceConfig(lw, time.Minute)
handler := NewServiceHandlerMock()
handler2 := NewServiceHandlerMock()
config.RegisterHandler(handler)
config.RegisterHandler(handler2)
serviceUpdate1 := CreateServiceUpdate(ADD, &api.Service{
go config.Run(stopCh)
service1 := &api.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}},
})
serviceUpdate2 := CreateServiceUpdate(ADD, &api.Service{
}
service2 := &api.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 20}}},
})
channelOne <- serviceUpdate1
channelTwo <- serviceUpdate2
services := []api.Service{*serviceUpdate2.Service, *serviceUpdate1.Service}
}
fakeWatch.Add(service1)
fakeWatch.Add(service2)
services := []api.Service{*service2, *service1}
handler.ValidateServices(t, services)
handler2.ValidateServices(t, services)
}