diff --git a/pkg/kubemark/hollow_proxy.go b/pkg/kubemark/hollow_proxy.go index 441fbc27c9..84d47db70d 100644 --- a/pkg/kubemark/hollow_proxy.go +++ b/pkg/kubemark/hollow_proxy.go @@ -73,7 +73,7 @@ func NewHollowProxyOrDie( } proxyconfig.NewSourceAPI( client.Core().RESTClient(), - 30*time.Second, + 15*time.Minute, serviceConfig.Channel("api"), endpointsConfig.Channel("api"), ) diff --git a/pkg/proxy/config/BUILD b/pkg/proxy/config/BUILD index dbadeb10dc..cbc2acedfe 100644 --- a/pkg/proxy/config/BUILD +++ b/pkg/proxy/config/BUILD @@ -24,6 +24,8 @@ go_library( "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/fields", "//vendor:k8s.io/apimachinery/pkg/types", + "//vendor:k8s.io/apimachinery/pkg/util/runtime", + "//vendor:k8s.io/apimachinery/pkg/util/wait", "//vendor:k8s.io/client-go/tools/cache", ], ) @@ -38,6 +40,7 @@ go_test( "//vendor:k8s.io/apimachinery/pkg/api/equality", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/runtime", + "//vendor:k8s.io/apimachinery/pkg/util/wait", "//vendor:k8s.io/apimachinery/pkg/watch", "//vendor:k8s.io/client-go/tools/cache", ], diff --git a/pkg/proxy/config/api.go b/pkg/proxy/config/api.go index 0ea295fcf2..227785a4c2 100644 --- a/pkg/proxy/config/api.go +++ b/pkg/proxy/config/api.go @@ -17,59 +17,148 @@ limitations under the License. package config import ( + "fmt" "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/api" ) // NewSourceAPI creates config source that watches for changes to the services and endpoints. func NewSourceAPI(c cache.Getter, period time.Duration, servicesChan chan<- ServiceUpdate, endpointsChan chan<- EndpointsUpdate) { + stopCh := wait.NeverStop + servicesLW := cache.NewListWatchFromClient(c, "services", metav1.NamespaceAll, fields.Everything()) - cache.NewReflector(servicesLW, &api.Service{}, NewServiceStore(nil, servicesChan), period).Run() + serviceController := NewServiceController(servicesLW, period, servicesChan) + go serviceController.Run(stopCh) endpointsLW := cache.NewListWatchFromClient(c, "endpoints", metav1.NamespaceAll, fields.Everything()) - cache.NewReflector(endpointsLW, &api.Endpoints{}, NewEndpointsStore(nil, endpointsChan), period).Run() -} + endpointsController := NewEndpointsController(endpointsLW, period, endpointsChan) + go endpointsController.Run(stopCh) -// NewServiceStore creates an undelta store that expands updates to the store into -// ServiceUpdate events on the channel. If no store is passed, a default store will -// be initialized. Allows reuse of a cache store across multiple components. -func NewServiceStore(store cache.Store, ch chan<- ServiceUpdate) cache.Store { - fn := func(objs []interface{}) { - var services []api.Service - for _, o := range objs { - services = append(services, *(o.(*api.Service))) - } - ch <- ServiceUpdate{Op: SET, Services: services} - } - if store == nil { - store = cache.NewStore(cache.MetaNamespaceKeyFunc) - } - return &cache.UndeltaStore{ - Store: store, - PushFunc: fn, + if !cache.WaitForCacheSync(stopCh, serviceController.HasSynced, endpointsController.HasSynced) { + utilruntime.HandleError(fmt.Errorf("source controllers not synced")) } } -// NewEndpointsStore creates an undelta store that expands updates to the store into -// EndpointsUpdate events on the channel. If no store is passed, a default store will -// be initialized. Allows reuse of a cache store across multiple components. -func NewEndpointsStore(store cache.Store, ch chan<- EndpointsUpdate) cache.Store { - fn := func(objs []interface{}) { - var endpoints []api.Endpoints - for _, o := range objs { - endpoints = append(endpoints, *(o.(*api.Endpoints))) +func sendAddService(servicesChan chan<- ServiceUpdate) func(obj interface{}) { + return func(obj interface{}) { + service, ok := obj.(*api.Service) + if !ok { + utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Service: %v", obj)) + return } - ch <- EndpointsUpdate{Op: SET, Endpoints: endpoints} - } - if store == nil { - store = cache.NewStore(cache.MetaNamespaceKeyFunc) - } - return &cache.UndeltaStore{ - Store: store, - PushFunc: fn, + servicesChan <- ServiceUpdate{Op: ADD, Service: service} } } + +func sendUpdateService(servicesChan chan<- ServiceUpdate) func(oldObj, newObj interface{}) { + return func(_, newObj interface{}) { + service, ok := newObj.(*api.Service) + if !ok { + utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Service: %v", newObj)) + return + } + servicesChan <- ServiceUpdate{Op: UPDATE, Service: service} + } +} + +func sendDeleteService(servicesChan chan<- ServiceUpdate) func(obj interface{}) { + return func(obj interface{}) { + var service *api.Service + switch t := obj.(type) { + case *api.Service: + service = t + case cache.DeletedFinalStateUnknown: + var ok bool + service, ok = t.Obj.(*api.Service) + if !ok { + utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Service: %v", t.Obj)) + return + } + default: + utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Service: %v", t)) + return + } + servicesChan <- ServiceUpdate{Op: REMOVE, Service: service} + } +} + +func sendAddEndpoints(endpointsChan chan<- EndpointsUpdate) func(obj interface{}) { + return func(obj interface{}) { + endpoints, ok := obj.(*api.Endpoints) + if !ok { + utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Endpoints: %v", obj)) + return + } + endpointsChan <- EndpointsUpdate{Op: ADD, Endpoints: endpoints} + } +} + +func sendUpdateEndpoints(endpointsChan chan<- EndpointsUpdate) func(oldObj, newObj interface{}) { + return func(_, newObj interface{}) { + endpoints, ok := newObj.(*api.Endpoints) + if !ok { + utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Endpoints: %v", newObj)) + return + } + endpointsChan <- EndpointsUpdate{Op: UPDATE, Endpoints: endpoints} + } +} + +func sendDeleteEndpoints(endpointsChan chan<- EndpointsUpdate) func(obj interface{}) { + return func(obj interface{}) { + var endpoints *api.Endpoints + switch t := obj.(type) { + case *api.Endpoints: + endpoints = t + case cache.DeletedFinalStateUnknown: + var ok bool + endpoints, ok = t.Obj.(*api.Endpoints) + if !ok { + utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Endpoints: %v", t.Obj)) + return + } + default: + utilruntime.HandleError(fmt.Errorf("cannot convert to *api.Endpoints: %v", obj)) + return + } + endpointsChan <- EndpointsUpdate{Op: REMOVE, Endpoints: endpoints} + } +} + +// NewServiceController creates a controller that is watching services and sending +// updates into ServiceUpdate channel. +func NewServiceController(lw cache.ListerWatcher, period time.Duration, ch chan<- ServiceUpdate) cache.Controller { + _, serviceController := cache.NewInformer( + lw, + &api.Service{}, + period, + cache.ResourceEventHandlerFuncs{ + AddFunc: sendAddService(ch), + UpdateFunc: sendUpdateService(ch), + DeleteFunc: sendDeleteService(ch), + }, + ) + return serviceController +} + +// NewEndpointsController creates a controller that is watching endpoints and sending +// updates into EndpointsUpdate channel. +func NewEndpointsController(lw cache.ListerWatcher, period time.Duration, ch chan<- EndpointsUpdate) cache.Controller { + _, endpointsController := cache.NewInformer( + lw, + &api.Endpoints{}, + period, + cache.ResourceEventHandlerFuncs{ + AddFunc: sendAddEndpoints(ch), + UpdateFunc: sendUpdateEndpoints(ch), + DeleteFunc: sendDeleteEndpoints(ch), + }, + ) + return endpointsController +} diff --git a/pkg/proxy/config/api_test.go b/pkg/proxy/config/api_test.go index 5429325adc..0ed9914932 100644 --- a/pkg/proxy/config/api_test.go +++ b/pkg/proxy/config/api_test.go @@ -23,6 +23,7 @@ import ( apiequality "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/cache" "k8s.io/kubernetes/pkg/api" @@ -63,24 +64,16 @@ func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) { ch := make(chan ServiceUpdate) - cache.NewReflector(lw, &api.Service{}, NewServiceStore(nil, ch), 30*time.Second).Run() + serviceController := NewServiceController(lw, 30*time.Second, ch) + go serviceController.Run(wait.NeverStop) + // Add the first service + fakeWatch.Add(service1v1) got, ok := <-ch if !ok { t.Errorf("Unable to read from channel when expected") } - expected := ServiceUpdate{Op: SET, Services: []api.Service{}} - if !apiequality.Semantic.DeepEqual(expected, got) { - t.Errorf("Expected %#v; Got %#v", expected, got) - } - - // Add the first service - fakeWatch.Add(service1v1) - got, ok = <-ch - if !ok { - t.Errorf("Unable to read from channel when expected") - } - expected = ServiceUpdate{Op: SET, Services: []api.Service{*service1v1}} + expected := ServiceUpdate{Op: ADD, Service: service1v1} if !apiequality.Semantic.DeepEqual(expected, got) { t.Errorf("Expected %#v; Got %#v", expected, got) } @@ -92,11 +85,10 @@ func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) { t.Errorf("Unable to read from channel when expected") } // Could be sorted either of these two ways: - expectedA := ServiceUpdate{Op: SET, Services: []api.Service{*service1v1, *service2}} - expectedB := ServiceUpdate{Op: SET, Services: []api.Service{*service2, *service1v1}} + expected = ServiceUpdate{Op: ADD, Service: service2} - if !apiequality.Semantic.DeepEqual(expectedA, got) && !apiequality.Semantic.DeepEqual(expectedB, got) { - t.Errorf("Expected %#v or %#v, Got %#v", expectedA, expectedB, got) + if !apiequality.Semantic.DeepEqual(expected, got) { + t.Errorf("Expected %#v, Got %#v", expected, got) } // Modify service1 @@ -105,11 +97,10 @@ func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) { if !ok { t.Errorf("Unable to read from channel when expected") } - expectedA = ServiceUpdate{Op: SET, Services: []api.Service{*service1v2, *service2}} - expectedB = ServiceUpdate{Op: SET, Services: []api.Service{*service2, *service1v2}} + expected = ServiceUpdate{Op: UPDATE, Service: service1v2} - if !apiequality.Semantic.DeepEqual(expectedA, got) && !apiequality.Semantic.DeepEqual(expectedB, got) { - t.Errorf("Expected %#v or %#v, Got %#v", expectedA, expectedB, got) + if !apiequality.Semantic.DeepEqual(expected, got) { + t.Errorf("Expected %#v, Got %#v", expected, got) } // Delete service1 @@ -118,7 +109,7 @@ func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) { if !ok { t.Errorf("Unable to read from channel when expected") } - expected = ServiceUpdate{Op: SET, Services: []api.Service{*service2}} + expected = ServiceUpdate{Op: REMOVE, Service: service1v2} if !apiequality.Semantic.DeepEqual(expected, got) { t.Errorf("Expected %#v, Got %#v", expected, got) } @@ -129,7 +120,7 @@ func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) { if !ok { t.Errorf("Unable to read from channel when expected") } - expected = ServiceUpdate{Op: SET, Services: []api.Service{}} + expected = ServiceUpdate{Op: REMOVE, Service: service2} if !apiequality.Semantic.DeepEqual(expected, got) { t.Errorf("Expected %#v, Got %#v", expected, got) } @@ -174,24 +165,16 @@ func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) { ch := make(chan EndpointsUpdate) - cache.NewReflector(lw, &api.Endpoints{}, NewEndpointsStore(nil, ch), 30*time.Second).Run() + endpointsController := NewEndpointsController(lw, 30*time.Second, ch) + go endpointsController.Run(wait.NeverStop) + // Add the first endpoints + fakeWatch.Add(endpoints1v1) got, ok := <-ch if !ok { t.Errorf("Unable to read from channel when expected") } - expected := EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{}} - if !apiequality.Semantic.DeepEqual(expected, got) { - t.Errorf("Expected %#v; Got %#v", expected, got) - } - - // Add the first endpoints - fakeWatch.Add(endpoints1v1) - got, ok = <-ch - if !ok { - t.Errorf("Unable to read from channel when expected") - } - expected = EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{*endpoints1v1}} + expected := EndpointsUpdate{Op: ADD, Endpoints: endpoints1v1} if !apiequality.Semantic.DeepEqual(expected, got) { t.Errorf("Expected %#v; Got %#v", expected, got) } @@ -203,11 +186,10 @@ func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) { t.Errorf("Unable to read from channel when expected") } // Could be sorted either of these two ways: - expectedA := EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{*endpoints1v1, *endpoints2}} - expectedB := EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{*endpoints2, *endpoints1v1}} + expected = EndpointsUpdate{Op: ADD, Endpoints: endpoints2} - if !apiequality.Semantic.DeepEqual(expectedA, got) && !apiequality.Semantic.DeepEqual(expectedB, got) { - t.Errorf("Expected %#v or %#v, Got %#v", expectedA, expectedB, got) + if !apiequality.Semantic.DeepEqual(expected, got) { + t.Errorf("Expected %#v, Got %#v", expected, got) } // Modify endpoints1 @@ -216,11 +198,10 @@ func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) { if !ok { t.Errorf("Unable to read from channel when expected") } - expectedA = EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{*endpoints1v2, *endpoints2}} - expectedB = EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{*endpoints2, *endpoints1v2}} + expected = EndpointsUpdate{Op: UPDATE, Endpoints: endpoints1v2} - if !apiequality.Semantic.DeepEqual(expectedA, got) && !apiequality.Semantic.DeepEqual(expectedB, got) { - t.Errorf("Expected %#v or %#v, Got %#v", expectedA, expectedB, got) + if !apiequality.Semantic.DeepEqual(expected, got) { + t.Errorf("Expected %#v, Got %#v", expected, got) } // Delete endpoints1 @@ -229,7 +210,7 @@ func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) { if !ok { t.Errorf("Unable to read from channel when expected") } - expected = EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{*endpoints2}} + expected = EndpointsUpdate{Op: REMOVE, Endpoints: endpoints1v2} if !apiequality.Semantic.DeepEqual(expected, got) { t.Errorf("Expected %#v, Got %#v", expected, got) } @@ -240,7 +221,7 @@ func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) { if !ok { t.Errorf("Unable to read from channel when expected") } - expected = EndpointsUpdate{Op: SET, Endpoints: []api.Endpoints{}} + expected = EndpointsUpdate{Op: REMOVE, Endpoints: endpoints2} if !apiequality.Semantic.DeepEqual(expected, got) { t.Errorf("Expected %#v, Got %#v", expected, got) } diff --git a/pkg/proxy/config/config.go b/pkg/proxy/config/config.go index 85c6adadd4..1e13acda23 100644 --- a/pkg/proxy/config/config.go +++ b/pkg/proxy/config/config.go @@ -31,28 +31,22 @@ type Operation int // These are the available operation types. const ( - SET Operation = iota - ADD + ADD Operation = iota + UPDATE REMOVE ) // ServiceUpdate describes an operation of services, sent on the channel. -// You can add or remove single services by sending an array of size one and Op == ADD|REMOVE. -// For setting the state of the system to a given state for this source configuration, set Services as desired and Op to SET, -// which will reset the system state to that specified in this operation for this source channel. -// To remove all services, set Services to empty array and Op to SET +// You can add, update or remove single service by setting Op == ADD|UPDATE|REMOVE. type ServiceUpdate struct { - Services []api.Service - Op Operation + Service *api.Service + Op Operation } // EndpointsUpdate describes an operation of endpoints, sent on the channel. -// You can add or remove single endpoints by sending an array of size one and Op == ADD|REMOVE. -// For setting the state of the system to a given state for this source configuration, set Endpoints as desired and Op to SET, -// which will reset the system state to that specified in this operation for this source channel. -// To remove all endpoints, set Endpoints to empty array and Op to SET +// You can add, update or remove single endpoints by setting Op == ADD|UPDATE|REMOVE. type EndpointsUpdate struct { - Endpoints []api.Endpoints + Endpoints *api.Endpoints Op Operation } @@ -87,7 +81,7 @@ func NewEndpointsConfig() *EndpointsConfig { // pending interrupt, but don't want to drop them if the handler is doing // work. updates := make(chan struct{}, 1) - store := &endpointsStore{updates: updates, endpoints: make(map[string]map[types.NamespacedName]api.Endpoints)} + store := &endpointsStore{updates: updates, endpoints: make(map[string]map[types.NamespacedName]*api.Endpoints)} mux := config.NewMux(store) bcaster := config.NewBroadcaster() go watchForUpdates(bcaster, store, updates) @@ -118,7 +112,7 @@ func (c *EndpointsConfig) Config() []api.Endpoints { type endpointsStore struct { endpointLock sync.RWMutex - endpoints map[string]map[types.NamespacedName]api.Endpoints + endpoints map[string]map[types.NamespacedName]*api.Endpoints updates chan<- struct{} } @@ -126,36 +120,28 @@ func (s *endpointsStore) Merge(source string, change interface{}) error { s.endpointLock.Lock() endpoints := s.endpoints[source] if endpoints == nil { - endpoints = make(map[types.NamespacedName]api.Endpoints) + endpoints = make(map[types.NamespacedName]*api.Endpoints) } update := change.(EndpointsUpdate) switch update.Op { - case ADD: + case ADD, UPDATE: glog.V(5).Infof("Adding new endpoint from source %s : %s", source, spew.Sdump(update.Endpoints)) - for _, value := range update.Endpoints { - name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name} - endpoints[name] = value - } + name := types.NamespacedName{Namespace: update.Endpoints.Namespace, Name: update.Endpoints.Name} + endpoints[name] = update.Endpoints case REMOVE: - glog.V(5).Infof("Removing an endpoint %s", spew.Sdump(update)) - for _, value := range update.Endpoints { - name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name} - delete(endpoints, name) - } - case SET: - glog.V(5).Infof("Setting endpoints %s", spew.Sdump(update)) - // Clear the old map entries by just creating a new map - endpoints = make(map[types.NamespacedName]api.Endpoints) - for _, value := range update.Endpoints { - name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name} - endpoints[name] = value - } + glog.V(5).Infof("Removing an endpoint %s", spew.Sdump(update.Endpoints)) + name := types.NamespacedName{Namespace: update.Endpoints.Namespace, Name: update.Endpoints.Name} + delete(endpoints, name) default: glog.V(4).Infof("Received invalid update type: %s", spew.Sdump(update)) } s.endpoints[source] = endpoints s.endpointLock.Unlock() if s.updates != nil { + // TODO: We should not broadcase the signal, until the state is fully + // populated (i.e. until initial LIST of the underlying reflector is + // propagated here). + // // Since we record the snapshot before sending this signal, it's // possible that the consumer ends up performing an extra update. select { @@ -173,7 +159,7 @@ func (s *endpointsStore) MergedState() interface{} { endpoints := make([]api.Endpoints, 0) for _, sourceEndpoints := range s.endpoints { for _, value := range sourceEndpoints { - endpoints = append(endpoints, value) + endpoints = append(endpoints, *value) } } return endpoints @@ -195,7 +181,7 @@ func NewServiceConfig() *ServiceConfig { // pending interrupt, but don't want to drop them if the handler is doing // work. updates := make(chan struct{}, 1) - store := &serviceStore{updates: updates, services: make(map[string]map[types.NamespacedName]api.Service)} + store := &serviceStore{updates: updates, services: make(map[string]map[types.NamespacedName]*api.Service)} mux := config.NewMux(store) bcaster := config.NewBroadcaster() go watchForUpdates(bcaster, store, updates) @@ -226,7 +212,7 @@ func (c *ServiceConfig) Config() []api.Service { type serviceStore struct { serviceLock sync.RWMutex - services map[string]map[types.NamespacedName]api.Service + services map[string]map[types.NamespacedName]*api.Service updates chan<- struct{} } @@ -234,36 +220,28 @@ func (s *serviceStore) Merge(source string, change interface{}) error { s.serviceLock.Lock() services := s.services[source] if services == nil { - services = make(map[types.NamespacedName]api.Service) + services = make(map[types.NamespacedName]*api.Service) } update := change.(ServiceUpdate) switch update.Op { - case ADD: - glog.V(5).Infof("Adding new service from source %s : %s", source, spew.Sdump(update.Services)) - for _, value := range update.Services { - name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name} - services[name] = value - } + case ADD, UPDATE: + glog.V(5).Infof("Adding new service from source %s : %s", source, spew.Sdump(update.Service)) + name := types.NamespacedName{Namespace: update.Service.Namespace, Name: update.Service.Name} + services[name] = update.Service case REMOVE: - glog.V(5).Infof("Removing a service %s", spew.Sdump(update)) - for _, value := range update.Services { - name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name} - delete(services, name) - } - case SET: - glog.V(5).Infof("Setting services %s", spew.Sdump(update)) - // Clear the old map entries by just creating a new map - services = make(map[types.NamespacedName]api.Service) - for _, value := range update.Services { - name := types.NamespacedName{Namespace: value.Namespace, Name: value.Name} - services[name] = value - } + glog.V(5).Infof("Removing a service %s", spew.Sdump(update.Service)) + name := types.NamespacedName{Namespace: update.Service.Namespace, Name: update.Service.Name} + delete(services, name) default: glog.V(4).Infof("Received invalid update type: %s", spew.Sdump(update)) } s.services[source] = services s.serviceLock.Unlock() if s.updates != nil { + // TODO: We should not broadcase the signal, until the state is fully + // populated (i.e. until initial LIST of the underlying reflector is + // propagated here). + // // Since we record the snapshot before sending this signal, it's // possible that the consumer ends up performing an extra update. select { @@ -281,7 +259,7 @@ func (s *serviceStore) MergedState() interface{} { services := make([]api.Service, 0) for _, sourceServices := range s.services { for _, value := range sourceServices { - services = append(services, value) + services = append(services, *value) } } return services diff --git a/pkg/proxy/config/config_test.go b/pkg/proxy/config/config_test.go index 5be9f21dd9..74f0bc19c2 100644 --- a/pkg/proxy/config/config_test.go +++ b/pkg/proxy/config/config_test.go @@ -130,22 +130,12 @@ func (h *EndpointsHandlerMock) ValidateEndpoints(t *testing.T, expectedEndpoints } } -func CreateServiceUpdate(op Operation, services ...api.Service) ServiceUpdate { - ret := ServiceUpdate{Op: op} - ret.Services = make([]api.Service, len(services)) - for i, value := range services { - ret.Services[i] = value - } - return ret +func CreateServiceUpdate(op Operation, service *api.Service) ServiceUpdate { + return ServiceUpdate{Op: op, Service: service} } -func CreateEndpointsUpdate(op Operation, endpoints ...api.Endpoints) EndpointsUpdate { - ret := EndpointsUpdate{Op: op} - ret.Endpoints = make([]api.Endpoints, len(endpoints)) - for i, value := range endpoints { - ret.Endpoints[i] = value - } - return ret +func CreateEndpointsUpdate(op Operation, endpoints *api.Endpoints) EndpointsUpdate { + return EndpointsUpdate{Op: op, Endpoints: endpoints} } func TestNewServiceAddedAndNotified(t *testing.T) { @@ -153,13 +143,12 @@ func TestNewServiceAddedAndNotified(t *testing.T) { channel := config.Channel("one") handler := NewServiceHandlerMock() config.RegisterHandler(handler) - serviceUpdate := CreateServiceUpdate(ADD, api.Service{ + serviceUpdate := CreateServiceUpdate(ADD, &api.Service{ ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}}, }) channel <- serviceUpdate - handler.ValidateServices(t, serviceUpdate.Services) - + handler.ValidateServices(t, []api.Service{*serviceUpdate.Service}) } func TestServiceAddedRemovedSetAndNotified(t *testing.T) { @@ -167,34 +156,26 @@ func TestServiceAddedRemovedSetAndNotified(t *testing.T) { channel := config.Channel("one") handler := NewServiceHandlerMock() config.RegisterHandler(handler) - serviceUpdate := CreateServiceUpdate(ADD, api.Service{ + serviceUpdate := CreateServiceUpdate(ADD, &api.Service{ ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}}, }) channel <- serviceUpdate - handler.ValidateServices(t, serviceUpdate.Services) + handler.ValidateServices(t, []api.Service{*serviceUpdate.Service}) - serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{ + serviceUpdate2 := CreateServiceUpdate(ADD, &api.Service{ ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 20}}}, }) channel <- serviceUpdate2 - services := []api.Service{serviceUpdate2.Services[0], serviceUpdate.Services[0]} + services := []api.Service{*serviceUpdate2.Service, *serviceUpdate.Service} handler.ValidateServices(t, services) - serviceUpdate3 := CreateServiceUpdate(REMOVE, api.Service{ + serviceUpdate3 := CreateServiceUpdate(REMOVE, &api.Service{ ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, }) channel <- serviceUpdate3 - services = []api.Service{serviceUpdate2.Services[0]} - handler.ValidateServices(t, services) - - serviceUpdate4 := CreateServiceUpdate(SET, api.Service{ - ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foobar"}, - Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 99}}}, - }) - channel <- serviceUpdate4 - services = []api.Service{serviceUpdate4.Services[0]} + services = []api.Service{*serviceUpdate2.Service} handler.ValidateServices(t, services) } @@ -207,17 +188,17 @@ func TestNewMultipleSourcesServicesAddedAndNotified(t *testing.T) { } handler := NewServiceHandlerMock() config.RegisterHandler(handler) - serviceUpdate1 := CreateServiceUpdate(ADD, api.Service{ + serviceUpdate1 := CreateServiceUpdate(ADD, &api.Service{ ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}}, }) - serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{ + serviceUpdate2 := CreateServiceUpdate(ADD, &api.Service{ ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 20}}}, }) channelOne <- serviceUpdate1 channelTwo <- serviceUpdate2 - services := []api.Service{serviceUpdate2.Services[0], serviceUpdate1.Services[0]} + services := []api.Service{*serviceUpdate2.Service, *serviceUpdate1.Service} handler.ValidateServices(t, services) } @@ -229,17 +210,17 @@ func TestNewMultipleSourcesServicesMultipleHandlersAddedAndNotified(t *testing.T handler2 := NewServiceHandlerMock() config.RegisterHandler(handler) config.RegisterHandler(handler2) - serviceUpdate1 := CreateServiceUpdate(ADD, api.Service{ + serviceUpdate1 := CreateServiceUpdate(ADD, &api.Service{ ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 10}}}, }) - serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{ + serviceUpdate2 := CreateServiceUpdate(ADD, &api.Service{ ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, Spec: api.ServiceSpec{Ports: []api.ServicePort{{Protocol: "TCP", Port: 20}}}, }) channelOne <- serviceUpdate1 channelTwo <- serviceUpdate2 - services := []api.Service{serviceUpdate2.Services[0], serviceUpdate1.Services[0]} + services := []api.Service{*serviceUpdate2.Service, *serviceUpdate1.Service} handler.ValidateServices(t, services) handler2.ValidateServices(t, services) } @@ -252,14 +233,14 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddedAndNotified(t *testing. handler2 := NewEndpointsHandlerMock() config.RegisterHandler(handler) config.RegisterHandler(handler2) - endpointsUpdate1 := CreateEndpointsUpdate(ADD, api.Endpoints{ + endpointsUpdate1 := CreateEndpointsUpdate(ADD, &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "1.1.1.1"}, {IP: "2.2.2.2"}}, Ports: []api.EndpointPort{{Port: 80}}, }}, }) - endpointsUpdate2 := CreateEndpointsUpdate(ADD, api.Endpoints{ + endpointsUpdate2 := CreateEndpointsUpdate(ADD, &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "3.3.3.3"}, {IP: "4.4.4.4"}}, @@ -269,7 +250,7 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddedAndNotified(t *testing. channelOne <- endpointsUpdate1 channelTwo <- endpointsUpdate2 - endpoints := []api.Endpoints{endpointsUpdate2.Endpoints[0], endpointsUpdate1.Endpoints[0]} + endpoints := []api.Endpoints{*endpointsUpdate2.Endpoints, *endpointsUpdate1.Endpoints} handler.ValidateEndpoints(t, endpoints) handler2.ValidateEndpoints(t, endpoints) } @@ -282,14 +263,14 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t handler2 := NewEndpointsHandlerMock() config.RegisterHandler(handler) config.RegisterHandler(handler2) - endpointsUpdate1 := CreateEndpointsUpdate(ADD, api.Endpoints{ + endpointsUpdate1 := CreateEndpointsUpdate(ADD, &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "1.1.1.1"}, {IP: "2.2.2.2"}}, Ports: []api.EndpointPort{{Port: 80}}, }}, }) - endpointsUpdate2 := CreateEndpointsUpdate(ADD, api.Endpoints{ + endpointsUpdate2 := CreateEndpointsUpdate(ADD, &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "3.3.3.3"}, {IP: "4.4.4.4"}}, @@ -299,12 +280,12 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t channelOne <- endpointsUpdate1 channelTwo <- endpointsUpdate2 - endpoints := []api.Endpoints{endpointsUpdate2.Endpoints[0], endpointsUpdate1.Endpoints[0]} + endpoints := []api.Endpoints{*endpointsUpdate2.Endpoints, *endpointsUpdate1.Endpoints} handler.ValidateEndpoints(t, endpoints) handler2.ValidateEndpoints(t, endpoints) // Add one more - endpointsUpdate3 := CreateEndpointsUpdate(ADD, api.Endpoints{ + endpointsUpdate3 := CreateEndpointsUpdate(ADD, &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foobar"}, Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "5.5.5.5"}, {IP: "6.6.6.6"}}, @@ -312,12 +293,12 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t }}, }) channelTwo <- endpointsUpdate3 - endpoints = []api.Endpoints{endpointsUpdate2.Endpoints[0], endpointsUpdate1.Endpoints[0], endpointsUpdate3.Endpoints[0]} + endpoints = []api.Endpoints{*endpointsUpdate2.Endpoints, *endpointsUpdate1.Endpoints, *endpointsUpdate3.Endpoints} handler.ValidateEndpoints(t, endpoints) handler2.ValidateEndpoints(t, endpoints) // Update the "foo" service with new endpoints - endpointsUpdate1 = CreateEndpointsUpdate(ADD, api.Endpoints{ + endpointsUpdate1 = CreateEndpointsUpdate(ADD, &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "7.7.7.7"}}, @@ -325,15 +306,15 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t }}, }) channelOne <- endpointsUpdate1 - endpoints = []api.Endpoints{endpointsUpdate2.Endpoints[0], endpointsUpdate1.Endpoints[0], endpointsUpdate3.Endpoints[0]} + endpoints = []api.Endpoints{*endpointsUpdate2.Endpoints, *endpointsUpdate1.Endpoints, *endpointsUpdate3.Endpoints} handler.ValidateEndpoints(t, endpoints) handler2.ValidateEndpoints(t, endpoints) // Remove "bar" service - endpointsUpdate2 = CreateEndpointsUpdate(REMOVE, api.Endpoints{ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"}}) + endpointsUpdate2 = CreateEndpointsUpdate(REMOVE, &api.Endpoints{ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"}}) channelTwo <- endpointsUpdate2 - endpoints = []api.Endpoints{endpointsUpdate1.Endpoints[0], endpointsUpdate3.Endpoints[0]} + endpoints = []api.Endpoints{*endpointsUpdate1.Endpoints, *endpointsUpdate3.Endpoints} handler.ValidateEndpoints(t, endpoints) handler2.ValidateEndpoints(t, endpoints) }