Merge pull request #42108 from wojtek-t/reduce_kube_proxy_cpu_usage

Automatic merge from submit-queue (batch tested with PRs 40746, 41699, 42108, 42174, 42093)

Switch kube-proxy to informers & save 2/3 of cpu & memory of non-iptables related code.

Fix #42000

This PR should be no-op from the behavior perspective.
It is changing KubeProxy to use standard "informer" framework instead of combination of reflector + undelta store.

This is significantly reducing CPU usage of kube-proxy and number of memory allocations.
Previously, on every endpoints/service update, we were copying __all__ endpoints/services at least 3 times, now it is once (which should also be removed in the future).

In Kubemark-500, hollow-proxies were processing backlog from load test for an hour after the test was finishing. With this change, it is keeping up with the load.

@thockin @ncdc @derekwaynecarr
pull/6/head
Kubernetes Submit Queue 2017-02-28 07:51:28 -08:00 committed by GitHub
commit 49e80116b7
6 changed files with 222 additions and 190 deletions

View File

@ -73,7 +73,7 @@ func NewHollowProxyOrDie(
}
proxyconfig.NewSourceAPI(
client.Core().RESTClient(),
30*time.Second,
15*time.Minute,
serviceConfig.Channel("api"),
endpointsConfig.Channel("api"),
)

View File

@ -24,6 +24,8 @@ go_library(
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/fields",
"//vendor:k8s.io/apimachinery/pkg/types",
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/client-go/tools/cache",
],
)
@ -38,6 +40,7 @@ go_test(
"//vendor:k8s.io/apimachinery/pkg/api/equality",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/apimachinery/pkg/watch",
"//vendor:k8s.io/client-go/tools/cache",
],

View File

@ -17,59 +17,148 @@ limitations under the License.
package config
import (
"fmt"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/api"
)
// NewSourceAPI creates config source that watches for changes to the services and endpoints.
func NewSourceAPI(c cache.Getter, period time.Duration, servicesChan chan<- ServiceUpdate, endpointsChan chan<- EndpointsUpdate) {
stopCh := wait.NeverStop
servicesLW := cache.NewListWatchFromClient(c, "services", metav1.NamespaceAll, fields.Everything())
cache.NewReflector(servicesLW, &api.Service{}, NewServiceStore(nil, servicesChan), period).Run()
serviceController := NewServiceController(servicesLW, period, servicesChan)
go serviceController.Run(stopCh)
endpointsLW := cache.NewListWatchFromClient(c, "endpoints", metav1.NamespaceAll, fields.Everything())
cache.NewReflector(endpointsLW, &api.Endpoints{}, NewEndpointsStore(nil, endpointsChan), period).Run()
}
endpointsController := NewEndpointsController(endpointsLW, period, endpointsChan)
go endpointsController.Run(stopCh)
// NewServiceStore creates an undelta store that expands updates to the store into
// ServiceUpdate events on the channel. If no store is passed, a default store will
// be initialized. Allows reuse of a cache store across multiple components.
func NewServiceStore(store cache.Store, ch chan<- ServiceUpdate) cache.Store {
fn := func(objs []interface{}) {
var services []api.Service
for _, o := range objs {
services = append(services, *(o.(*api.Service)))
}
ch <- ServiceUpdate{Op: SET, Services: services}
}
if store == nil {
store = cache.NewStore(cache.MetaNamespaceKeyFunc)
}
return &cache.UndeltaStore{
Store: store,
PushFunc: fn,
if !cache.WaitForCacheSync(stopCh, serviceController.HasSynced, endpointsController.HasSynced) {
utilruntime.HandleError(fmt.Errorf("source controllers not synced"))
}
}
// NewEndpointsStore creates an undelta store that expands updates to the store into
// EndpointsUpdate events on the channel. If no store is passed, a default store will
// be initialized. Allows reuse of a cache store across multiple components.
func NewEndpointsStore(store cache.Store, ch chan<- EndpointsUpdate) cache.Store {
fn := func(objs []interface{}) {
var endpoints []api.Endpoints
for _, o := range objs {
endpoints = append(endpoints, *(o.(*api.Endpoints)))
func sendAddService(servicesChan chan<- ServiceUpdate) func(obj interface{}) {
return func(obj interface{}) {
service, ok := obj.(*api.Service)
if !ok {
utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Service: %v", obj))
return
}
ch <- EndpointsUpdate{Op: SET, Endpoints: endpoints}
}
if store == nil {
store = cache.NewStore(cache.MetaNamespaceKeyFunc)
}
return &cache.UndeltaStore{
Store: store,
PushFunc: fn,
servicesChan <- ServiceUpdate{Op: ADD, Service: service}
}
}
func sendUpdateService(servicesChan chan<- ServiceUpdate) func(oldObj, newObj interface{}) {
return func(_, newObj interface{}) {
service, ok := newObj.(*api.Service)
if !ok {
utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Service: %v", newObj))
return
}
servicesChan <- ServiceUpdate{Op: UPDATE, Service: service}
}
}
func sendDeleteService(servicesChan chan<- ServiceUpdate) func(obj interface{}) {
return func(obj interface{}) {
var service *api.Service
switch t := obj.(type) {
case *api.Service:
service = t
case cache.DeletedFinalStateUnknown:
var ok bool
service, ok = t.Obj.(*api.Service)
if !ok {
utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Service: %v", t.Obj))
return
}
default:
utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Service: %v", t))
return
}
servicesChan <- ServiceUpdate{Op: REMOVE, Service: service}
}
}
func sendAddEndpoints(endpointsChan chan<- EndpointsUpdate) func(obj interface{}) {
return func(obj interface{}) {
endpoints, ok := obj.(*api.Endpoints)
if !ok {
utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Endpoints: %v", obj))
return
}
endpointsChan <- EndpointsUpdate{Op: ADD, Endpoints: endpoints}
}
}
func sendUpdateEndpoints(endpointsChan chan<- EndpointsUpdate) func(oldObj, newObj interface{}) {
return func(_, newObj interface{}) {
endpoints, ok := newObj.(*api.Endpoints)
if !ok {
utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Endpoints: %v", newObj))
return
}
endpointsChan <- EndpointsUpdate{Op: UPDATE, Endpoints: endpoints}
}
}
func sendDeleteEndpoints(endpointsChan chan<- EndpointsUpdate) func(obj interface{}) {
return func(obj interface{}) {
var endpoints *api.Endpoints
switch t := obj.(type) {
case *api.Endpoints:
endpoints = t
case cache.DeletedFinalStateUnknown:
var ok bool
endpoints, ok = t.Obj.(*api.Endpoints)
if !ok {
utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Endpoints: %v", t.Obj))
return
}
default:
utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Endpoints: %v", obj))
return
}
endpointsChan <- EndpointsUpdate{Op: REMOVE, Endpoints: endpoints}
}
}
// NewServiceController creates a controller that is watching services and sending
// updates into ServiceUpdate channel.
func NewServiceController(lw cache.ListerWatcher, period time.Duration, ch chan<- ServiceUpdate) cache.Controller {
_, serviceController := cache.NewInformer(
lw,
&api.Service{},
period,
cache.ResourceEventHandlerFuncs{
AddFunc: sendAddService(ch),
UpdateFunc: sendUpdateService(ch),
DeleteFunc: sendDeleteService(ch),
},
)
return serviceController
}
// NewEndpointsController creates a controller that is watching endpoints and sending
// updates into EndpointsUpdate channel.
func NewEndpointsController(lw cache.ListerWatcher, period time.Duration, ch chan<- EndpointsUpdate) cache.Controller {
_, endpointsController := cache.NewInformer(
lw,
&api.Endpoints{},
period,
cache.ResourceEventHandlerFuncs{
AddFunc: sendAddEndpoints(ch),
UpdateFunc: sendUpdateEndpoints(ch),
DeleteFunc: sendDeleteEndpoints(ch),
},
)
return endpointsController
}

View File

@ -23,6 +23,7 @@ import (
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/api"
@ -63,24 +64,16 @@ func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) {
ch := make(chan ServiceUpdate)
cache.NewReflector(lw, &api.Service{}, NewServiceStore(nil, ch), 30*time.Second).Run()
serviceController := NewServiceController(lw, 30*time.Second, ch)
go serviceController.Run(wait.NeverStop)
// Add the first service
fakeWatch.Add(service1v1)
got, ok := <-ch
if !ok {
t.Errorf("Unable to read from channel when expected")
}
expected := ServiceUpdate{Op: SET, Services: []api.Service{}}
if !apiequality.Semantic.DeepEqual(expected, got) {
t.Errorf("Expected %#v; Got %#v", expected, got)
}
// Add the first service
fakeWatch.Add(service1v1)
got, ok = <-ch
if !ok {
t.Errorf("Unable to read from channel when expected")
}
expected = ServiceUpdate{Op: SET, Services: []api.Service{*service1v1}}
expected := ServiceUpdate{Op: ADD, Service: service1v1}
if !apiequality.Semantic.DeepEqual(expected, got) {
t.Errorf("Expected %#v; Got %#v", expected, got)
}
@ -92,11 +85,10 @@ func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) {
t.Errorf("Unable to read from channel when expected")
}
// Could be sorted either of these two ways:
expectedA := ServiceUpdate{Op: SET, Services: []api.Service{*service1v1, *service2}}
expectedB := ServiceUpdate{Op: SET, Services: []api.Service{*service2, *service1v1}}
expected = ServiceUpdate{Op: ADD, Service: service2}
if !apiequality.Semantic.DeepEqual(expectedA, got) && !apiequality.Semantic.DeepEqual(expectedB, got) {
t.Errorf("Expected %#v or %#v, Got %#v", expectedA, expectedB, got)
if !apiequality.Semantic.DeepEqual(expected, got) {
t.Errorf("Expected %#v, Got %#v", expected, got)
}
// Modify service1
@ -105,11 +97,10 @@ func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) {
if !ok {
t.Errorf("Unable to read from channel when expected")
}
expectedA = ServiceUpdate{Op: SET, Services: []api.Service{*service1v2, *service2}}
expectedB = ServiceUpdate{Op: SET, Services: []api.Service{*service2, *service1v2}}
expected = ServiceUpdate{Op: UPDATE, Service: service1v2}
if !apiequality.Semantic.DeepEqual(expectedA, got) && !apiequality.Semantic.DeepEqual(expectedB, got) {
t.Errorf("Expected %#v or %#v, Got %#v", expectedA, expectedB, got)
if !apiequality.Semantic.DeepEqual(expected, got) {
t.Errorf("Expected %#v, Got %#v", expected, got)
}
// Delete service1
@ -118,7 +109,7 @@ func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) {
if !ok {
t.Errorf("Unable to read from channel when expected")
}
expected = ServiceUpdate{Op: SET, Services: []api.Service{*service2}}
expected = ServiceUpdate{Op: REMOVE, Service: service1v2}
if !apiequality.Semantic.DeepEqual(expected, got) {
t.Errorf("Expected %#v, Got %#v", expected, got)
}
@ -129,7 +120,7 @@ func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) {
if !ok {
t.Errorf("Unable to read from channel when expected")
}
expected = ServiceUpdate{Op: SET, Services: []api.Service{}}
expected = ServiceUpdate{Op: REMOVE, Service: service2}
if !apiequality.Semantic.DeepEqual(expected, got) {
t.Errorf("Expected %#v, Got %#v", expected, got)
}
@ -174,24 +165,16 @@ func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) {
ch := make(chan EndpointsUpdate)
cache.NewReflector(lw, &api.Endpoints{}, NewEndpointsStore(nil, ch), 30*time.Second).Run()
endpointsController := NewEndpointsController(lw, 30*time.Second, ch)
go endpointsController.Run(wait.NeverStop)
// Add the first endpoints
fakeWatch.Add(endpoints1v1)
got, ok := <-ch
if !ok {
t.Errorf("Unable to read from channel when expected")
}
expected := EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{}}
if !apiequality.Semantic.DeepEqual(expected, got) {
t.Errorf("Expected %#v; Got %#v", expected, got)
}
// Add the first endpoints
fakeWatch.Add(endpoints1v1)
got, ok = <-ch
if !ok {
t.Errorf("Unable to read from channel when expected")
}
expected = EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{*endpoints1v1}}
expected := EndpointsUpdate{Op: ADD, Endpoints: endpoints1v1}
if !apiequality.Semantic.DeepEqual(expected, got) {
t.Errorf("Expected %#v; Got %#v", expected, got)
}
@ -203,11 +186,10 @@ func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) {
t.Errorf("Unable to read from channel when expected")
}
// Could be sorted either of these two ways:
expectedA := EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{*endpoints1v1, *endpoints2}}
expectedB := EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{*endpoints2, *endpoints1v1}}
expected = EndpointsUpdate{Op: ADD, Endpoints: endpoints2}
if !apiequality.Semantic.DeepEqual(expectedA, got) && !apiequality.Semantic.DeepEqual(expectedB, got) {
t.Errorf("Expected %#v or %#v, Got %#v", expectedA, expectedB, got)
if !apiequality.Semantic.DeepEqual(expected, got) {
t.Errorf("Expected %#v, Got %#v", expected, got)
}
// Modify endpoints1
@ -216,11 +198,10 @@ func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) {
if !ok {
t.Errorf("Unable to read from channel when expected")
}
expectedA = EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{*endpoints1v2, *endpoints2}}
expectedB = EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{*endpoints2, *endpoints1v2}}
expected = EndpointsUpdate{Op: UPDATE, Endpoints: endpoints1v2}
if !apiequality.Semantic.DeepEqual(expectedA, got) && !apiequality.Semantic.DeepEqual(expectedB, got) {
t.Errorf("Expected %#v or %#v, Got %#v", expectedA, expectedB, got)
if !apiequality.Semantic.DeepEqual(expected, got) {
t.Errorf("Expected %#v, Got %#v", expected, got)
}
// Delete endpoints1
@ -229,7 +210,7 @@ func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) {
if !ok {
t.Errorf("Unable to read from channel when expected")
}
expected = EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{*endpoints2}}
expected = EndpointsUpdate{Op: REMOVE, Endpoints: endpoints1v2}
if !apiequality.Semantic.DeepEqual(expected, got) {
t.Errorf("Expected %#v, Got %#v", expected, got)
}
@ -240,7 +221,7 @@ func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) {
if !ok {
t.Errorf("Unable to read from channel when expected")
}
expected = EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{}}
expected = EndpointsUpdate{Op: REMOVE, Endpoints: endpoints2}
if !apiequality.Semantic.DeepEqual(expected, got) {
t.Errorf("Expected %#v, Got %#v", expected, got)
}

View File

@ -31,28 +31,22 @@ type Operation int
// These are the available operation types.
const (
SET Operation = iota
ADD
ADD Operation = iota
UPDATE
REMOVE
)
// ServiceUpdate describes an operation of services, sent on the channel.
// You can add or remove single services by sending an array of size one and Op == ADD|REMOVE.
// For setting the state of the system to a given state for this source configuration, set Services as desired and Op to SET,
// which will reset the system state to that specified in this operation for this source channel.
// To remove all services, set Services to empty array and Op to SET
// You can add, update or remove single service by setting Op == ADD|UPDATE|REMOVE.
type ServiceUpdate struct {
Services []api.Service
Op Operation
Service *api.Service
Op Operation
}
// EndpointsUpdate describes an operation of endpoints, sent on the channel.
// You can add or remove single endpoints by sending an array of size one and Op == ADD|REMOVE.
// For setting the state of the system to a given state for this source configuration, set Endpoints as desired and Op to SET,
// which will reset the system state to that specified in this operation for this source channel.
// To remove all endpoints, set Endpoints to empty array and Op to SET
// You can add, update or remove single endpoints by setting Op == ADD|UPDATE|REMOVE.
type EndpointsUpdate struct {
Endpoints []api.Endpoints
Endpoints *api.Endpoints
Op Operation
}
@ -87,7 +81,7 @@ func NewEndpointsConfig() *EndpointsConfig {
// pending interrupt, but don't want to drop them if the handler is doing
// work.
updates := make(chan struct{}, 1)
store := &endpointsStore{updates: updates, endpoints: make(map[string]map[types.NamespacedName]api.Endpoints)}
store := &endpointsStore{updates: updates, endpoints: make(map[string]map[types.NamespacedName]*api.Endpoints)}
mux := config.NewMux(store)
bcaster := config.NewBroadcaster()
go watchForUpdates(bcaster, store, updates)
@ -118,7 +112,7 @@ func (c *EndpointsConfig) Config() []api.Endpoints {
type endpointsStore struct {
endpointLock sync.RWMutex
endpoints map[string]map[types.NamespacedName]api.Endpoints
endpoints map[string]map[types.NamespacedName]*api.Endpoints
updates chan<- struct{}
}
@ -126,36 +120,28 @@ func (s *endpointsStore) Merge(source string, change interface{}) error {
s.endpointLock.Lock()
endpoints := s.endpoints[source]
if endpoints == nil {
endpoints = make(map[types.NamespacedName]api.Endpoints)
endpoints = make(map[types.NamespacedName]*api.Endpoints)
}
update := change.(EndpointsUpdate)
switch update.Op {
case ADD:
case ADD, UPDATE:
glog.V(5).Infof("Adding new endpoint from source %s : %s", source, spew.Sdump(update.Endpoints))
for _, value := range update.Endpoints {
name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name}
endpoints[name] = value
}
name := types.NamespacedName{Namespace: update.Endpoints.Namespace, Name: update.Endpoints.Name}
endpoints[name] = update.Endpoints
case REMOVE:
glog.V(5).Infof("Removing an endpoint %s", spew.Sdump(update))
for _, value := range update.Endpoints {
name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name}
delete(endpoints, name)
}
case SET:
glog.V(5).Infof("Setting endpoints %s", spew.Sdump(update))
// Clear the old map entries by just creating a new map
endpoints = make(map[types.NamespacedName]api.Endpoints)
for _, value := range update.Endpoints {
name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name}
endpoints[name] = value
}
glog.V(5).Infof("Removing an endpoint %s", spew.Sdump(update.Endpoints))
name := types.NamespacedName{Namespace: update.Endpoints.Namespace, Name: update.Endpoints.Name}
delete(endpoints, name)
default:
glog.V(4).Infof("Received invalid update type: %s", spew.Sdump(update))
}
s.endpoints[source] = endpoints
s.endpointLock.Unlock()
if s.updates != nil {
// TODO: We should not broadcase the signal, until the state is fully
// populated (i.e. until initial LIST of the underlying reflector is
// propagated here).
//
// Since we record the snapshot before sending this signal, it's
// possible that the consumer ends up performing an extra update.
select {
@ -173,7 +159,7 @@ func (s *endpointsStore) MergedState() interface{} {
endpoints := make([]api.Endpoints, 0)
for _, sourceEndpoints := range s.endpoints {
for _, value := range sourceEndpoints {
endpoints = append(endpoints, value)
endpoints = append(endpoints, *value)
}
}
return endpoints
@ -195,7 +181,7 @@ func NewServiceConfig() *ServiceConfig {
// pending interrupt, but don't want to drop them if the handler is doing
// work.
updates := make(chan struct{}, 1)
store := &serviceStore{updates: updates, services: make(map[string]map[types.NamespacedName]api.Service)}
store := &serviceStore{updates: updates, services: make(map[string]map[types.NamespacedName]*api.Service)}
mux := config.NewMux(store)
bcaster := config.NewBroadcaster()
go watchForUpdates(bcaster, store, updates)
@ -226,7 +212,7 @@ func (c *ServiceConfig) Config() []api.Service {
type serviceStore struct {
serviceLock sync.RWMutex
services map[string]map[types.NamespacedName]api.Service
services map[string]map[types.NamespacedName]*api.Service
updates chan<- struct{}
}
@ -234,36 +220,28 @@ func (s *serviceStore) Merge(source string, change interface{}) error {
s.serviceLock.Lock()
services := s.services[source]
if services == nil {
services = make(map[types.NamespacedName]api.Service)
services = make(map[types.NamespacedName]*api.Service)
}
update := change.(ServiceUpdate)
switch update.Op {
case ADD:
glog.V(5).Infof("Adding new service from source %s : %s", source, spew.Sdump(update.Services))
for _, value := range update.Services {
name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name}
services[name] = value
}
case ADD, UPDATE:
glog.V(5).Infof("Adding new service from source %s : %s", source, spew.Sdump(update.Service))
name := types.NamespacedName{Namespace: update.Service.Namespace, Name: update.Service.Name}
services[name] = update.Service
case REMOVE:
glog.V(5).Infof("Removing a service %s", spew.Sdump(update))
for _, value := range update.Services {
name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name}
delete(services, name)
}
case SET:
glog.V(5).Infof("Setting services %s", spew.Sdump(update))
// Clear the old map entries by just creating a new map
services = make(map[types.NamespacedName]api.Service)
for _, value := range update.Services {
name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name}
services[name] = value
}
glog.V(5).Infof("Removing a service %s", spew.Sdump(update.Service))
name := types.NamespacedName{Namespace: update.Service.Namespace, Name: update.Service.Name}
delete(services, name)
default:
glog.V(4).Infof("Received invalid update type: %s", spew.Sdump(update))
}
s.services[source] = services
s.serviceLock.Unlock()
if s.updates != nil {
// TODO: We should not broadcase the signal, until the state is fully
// populated (i.e. until initial LIST of the underlying reflector is
// propagated here).
//
// Since we record the snapshot before sending this signal, it's
// possible that the consumer ends up performing an extra update.
select {
@ -281,7 +259,7 @@ func (s *serviceStore) MergedState() interface{} {
services := make([]api.Service, 0)
for _, sourceServices := range s.services {
for _, value := range sourceServices {
services = append(services, value)
services = append(services, *value)
}
}
return services

View File

@ -130,22 +130,12 @@ func (h *EndpointsHandlerMock) ValidateEndpoints(t *testing.T, expectedEndpoints
}
}
func CreateServiceUpdate(op Operation, services ...api.Service) ServiceUpdate {
ret := ServiceUpdate{Op: op}
ret.Services = make([]api.Service, len(services))
for i, value := range services {
ret.Services[i] = value
}
return ret
func CreateServiceUpdate(op Operation, service *api.Service) ServiceUpdate {
return ServiceUpdate{Op: op, Service: service}
}
func CreateEndpointsUpdate(op Operation, endpoints ...api.Endpoints) EndpointsUpdate {
ret := EndpointsUpdate{Op: op}
ret.Endpoints = make([]api.Endpoints, len(endpoints))
for i, value := range endpoints {
ret.Endpoints[i] = value
}
return ret
func CreateEndpointsUpdate(op Operation, endpoints *api.Endpoints) EndpointsUpdate {
return EndpointsUpdate{Op: op, Endpoints: endpoints}
}
func TestNewServiceAddedAndNotified(t *testing.T) {
@ -153,13 +143,12 @@ func TestNewServiceAddedAndNotified(t *testing.T) {
channel := config.Channel("one")
handler := NewServiceHandlerMock()
config.RegisterHandler(handler)
serviceUpdate := CreateServiceUpdate(ADD, api.Service{
serviceUpdate := CreateServiceUpdate(ADD, &api.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}},
})
channel <- serviceUpdate
handler.ValidateServices(t, serviceUpdate.Services)
handler.ValidateServices(t, []api.Service{*serviceUpdate.Service})
}
func TestServiceAddedRemovedSetAndNotified(t *testing.T) {
@ -167,34 +156,26 @@ func TestServiceAddedRemovedSetAndNotified(t *testing.T) {
channel := config.Channel("one")
handler := NewServiceHandlerMock()
config.RegisterHandler(handler)
serviceUpdate := CreateServiceUpdate(ADD, api.Service{
serviceUpdate := CreateServiceUpdate(ADD, &api.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}},
})
channel <- serviceUpdate
handler.ValidateServices(t, serviceUpdate.Services)
handler.ValidateServices(t, []api.Service{*serviceUpdate.Service})
serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{
serviceUpdate2 := CreateServiceUpdate(ADD, &api.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 20}}},
})
channel <- serviceUpdate2
services := []api.Service{serviceUpdate2.Services[0], serviceUpdate.Services[0]}
services := []api.Service{*serviceUpdate2.Service, *serviceUpdate.Service}
handler.ValidateServices(t, services)
serviceUpdate3 := CreateServiceUpdate(REMOVE, api.Service{
serviceUpdate3 := CreateServiceUpdate(REMOVE, &api.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
})
channel <- serviceUpdate3
services = []api.Service{serviceUpdate2.Services[0]}
handler.ValidateServices(t, services)
serviceUpdate4 := CreateServiceUpdate(SET, api.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foobar"},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 99}}},
})
channel <- serviceUpdate4
services = []api.Service{serviceUpdate4.Services[0]}
services = []api.Service{*serviceUpdate2.Service}
handler.ValidateServices(t, services)
}
@ -207,17 +188,17 @@ func TestNewMultipleSourcesServicesAddedAndNotified(t *testing.T) {
}
handler := NewServiceHandlerMock()
config.RegisterHandler(handler)
serviceUpdate1 := CreateServiceUpdate(ADD, api.Service{
serviceUpdate1 := CreateServiceUpdate(ADD, &api.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}},
})
serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{
serviceUpdate2 := CreateServiceUpdate(ADD, &api.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 20}}},
})
channelOne <- serviceUpdate1
channelTwo <- serviceUpdate2
services := []api.Service{serviceUpdate2.Services[0], serviceUpdate1.Services[0]}
services := []api.Service{*serviceUpdate2.Service, *serviceUpdate1.Service}
handler.ValidateServices(t, services)
}
@ -229,17 +210,17 @@ func TestNewMultipleSourcesServicesMultipleHandlersAddedAndNotified(t *testing.T
handler2 := NewServiceHandlerMock()
config.RegisterHandler(handler)
config.RegisterHandler(handler2)
serviceUpdate1 := CreateServiceUpdate(ADD, api.Service{
serviceUpdate1 := CreateServiceUpdate(ADD, &api.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}},
})
serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{
serviceUpdate2 := CreateServiceUpdate(ADD, &api.Service{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 20}}},
})
channelOne <- serviceUpdate1
channelTwo <- serviceUpdate2
services := []api.Service{serviceUpdate2.Services[0], serviceUpdate1.Services[0]}
services := []api.Service{*serviceUpdate2.Service, *serviceUpdate1.Service}
handler.ValidateServices(t, services)
handler2.ValidateServices(t, services)
}
@ -252,14 +233,14 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddedAndNotified(t *testing.
handler2 := NewEndpointsHandlerMock()
config.RegisterHandler(handler)
config.RegisterHandler(handler2)
endpointsUpdate1 := CreateEndpointsUpdate(ADD, api.Endpoints{
endpointsUpdate1 := CreateEndpointsUpdate(ADD, &api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.1.1.1"}, {IP: "2.2.2.2"}},
Ports: []api.EndpointPort{{Port: 80}},
}},
})
endpointsUpdate2 := CreateEndpointsUpdate(ADD, api.Endpoints{
endpointsUpdate2 := CreateEndpointsUpdate(ADD, &api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "3.3.3.3"}, {IP: "4.4.4.4"}},
@ -269,7 +250,7 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddedAndNotified(t *testing.
channelOne <- endpointsUpdate1
channelTwo <- endpointsUpdate2
endpoints := []api.Endpoints{endpointsUpdate2.Endpoints[0], endpointsUpdate1.Endpoints[0]}
endpoints := []api.Endpoints{*endpointsUpdate2.Endpoints, *endpointsUpdate1.Endpoints}
handler.ValidateEndpoints(t, endpoints)
handler2.ValidateEndpoints(t, endpoints)
}
@ -282,14 +263,14 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t
handler2 := NewEndpointsHandlerMock()
config.RegisterHandler(handler)
config.RegisterHandler(handler2)
endpointsUpdate1 := CreateEndpointsUpdate(ADD, api.Endpoints{
endpointsUpdate1 := CreateEndpointsUpdate(ADD, &api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.1.1.1"}, {IP: "2.2.2.2"}},
Ports: []api.EndpointPort{{Port: 80}},
}},
})
endpointsUpdate2 := CreateEndpointsUpdate(ADD, api.Endpoints{
endpointsUpdate2 := CreateEndpointsUpdate(ADD, &api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "3.3.3.3"}, {IP: "4.4.4.4"}},
@ -299,12 +280,12 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t
channelOne <- endpointsUpdate1
channelTwo <- endpointsUpdate2
endpoints := []api.Endpoints{endpointsUpdate2.Endpoints[0], endpointsUpdate1.Endpoints[0]}
endpoints := []api.Endpoints{*endpointsUpdate2.Endpoints, *endpointsUpdate1.Endpoints}
handler.ValidateEndpoints(t, endpoints)
handler2.ValidateEndpoints(t, endpoints)
// Add one more
endpointsUpdate3 := CreateEndpointsUpdate(ADD, api.Endpoints{
endpointsUpdate3 := CreateEndpointsUpdate(ADD, &api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foobar"},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "5.5.5.5"}, {IP: "6.6.6.6"}},
@ -312,12 +293,12 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t
}},
})
channelTwo <- endpointsUpdate3
endpoints = []api.Endpoints{endpointsUpdate2.Endpoints[0], endpointsUpdate1.Endpoints[0], endpointsUpdate3.Endpoints[0]}
endpoints = []api.Endpoints{*endpointsUpdate2.Endpoints, *endpointsUpdate1.Endpoints, *endpointsUpdate3.Endpoints}
handler.ValidateEndpoints(t, endpoints)
handler2.ValidateEndpoints(t, endpoints)
// Update the "foo" service with new endpoints
endpointsUpdate1 = CreateEndpointsUpdate(ADD, api.Endpoints{
endpointsUpdate1 = CreateEndpointsUpdate(ADD, &api.Endpoints{
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "7.7.7.7"}},
@ -325,15 +306,15 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t
}},
})
channelOne <- endpointsUpdate1
endpoints = []api.Endpoints{endpointsUpdate2.Endpoints[0], endpointsUpdate1.Endpoints[0], endpointsUpdate3.Endpoints[0]}
endpoints = []api.Endpoints{*endpointsUpdate2.Endpoints, *endpointsUpdate1.Endpoints, *endpointsUpdate3.Endpoints}
handler.ValidateEndpoints(t, endpoints)
handler2.ValidateEndpoints(t, endpoints)
// Remove "bar" service
endpointsUpdate2 = CreateEndpointsUpdate(REMOVE, api.Endpoints{ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"}})
endpointsUpdate2 = CreateEndpointsUpdate(REMOVE, &api.Endpoints{ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"}})
channelTwo <- endpointsUpdate2
endpoints = []api.Endpoints{endpointsUpdate1.Endpoints[0], endpointsUpdate3.Endpoints[0]}
endpoints = []api.Endpoints{*endpointsUpdate1.Endpoints, *endpointsUpdate3.Endpoints}
handler.ValidateEndpoints(t, endpoints)
handler2.ValidateEndpoints(t, endpoints)
}