From 2a6083c3e73ac997a164d481f2857a17a6d1a8d1 Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Thu, 9 Mar 2017 16:42:45 +0100 Subject: [PATCH] Avoid copying endpoints object in kube-proxy --- pkg/kubemark/hollow_proxy.go | 4 +- pkg/proxy/config/api_test.go | 8 +-- pkg/proxy/config/config.go | 15 ++++-- pkg/proxy/config/config_test.go | 22 ++++---- pkg/proxy/iptables/proxier.go | 19 ++++--- pkg/proxy/iptables/proxier_test.go | 54 ++++++++++---------- pkg/proxy/userspace/proxier_test.go | 40 +++++++-------- pkg/proxy/userspace/roundrobin.go | 5 +- pkg/proxy/userspace/roundrobin_test.go | 62 +++++++++++------------ pkg/proxy/winuserspace/proxier_test.go | 40 +++++++-------- pkg/proxy/winuserspace/roundrobin.go | 5 +- pkg/proxy/winuserspace/roundrobin_test.go | 62 +++++++++++------------ 12 files changed, 176 insertions(+), 160 deletions(-) diff --git a/pkg/kubemark/hollow_proxy.go b/pkg/kubemark/hollow_proxy.go index 84d47db70d..7de8cba4b5 100644 --- a/pkg/kubemark/hollow_proxy.go +++ b/pkg/kubemark/hollow_proxy.go @@ -40,8 +40,8 @@ type HollowProxy struct { type FakeProxyHandler struct{} -func (*FakeProxyHandler) OnServiceUpdate(services []api.Service) {} -func (*FakeProxyHandler) OnEndpointsUpdate(endpoints []api.Endpoints) {} +func (*FakeProxyHandler) OnServiceUpdate(services []api.Service) {} +func (*FakeProxyHandler) OnEndpointsUpdate(endpoints []*api.Endpoints) {} type FakeProxier struct{} diff --git a/pkg/proxy/config/api_test.go b/pkg/proxy/config/api_test.go index b187b3d37e..4f5cf4c016 100644 --- a/pkg/proxy/config/api_test.go +++ b/pkg/proxy/config/api_test.go @@ -250,15 +250,15 @@ func (s *svcHandler) OnServiceUpdate(services []api.Service) { type epsHandler struct { t *testing.T - expected []api.Endpoints + expected []*api.Endpoints done func() } -func newEpsHandler(t *testing.T, eps []api.Endpoints, done func()) *epsHandler { +func newEpsHandler(t *testing.T, eps []*api.Endpoints, done func()) *epsHandler { return &epsHandler{t: t, expected: eps, done: done} } -func (e *epsHandler) OnEndpointsUpdate(endpoints []api.Endpoints) { +func (e *epsHandler) OnEndpointsUpdate(endpoints []*api.Endpoints) { defer e.done() sort.Sort(sortedEndpoints(endpoints)) if !reflect.DeepEqual(e.expected, endpoints) { @@ -290,7 +290,7 @@ func TestInitialSync(t *testing.T) { epsConfig := NewEndpointsConfig() svcHandler := newSvcHandler(t, []api.Service{*svc2, *svc1}, wg.Done) svcConfig.RegisterHandler(svcHandler) - epsHandler := newEpsHandler(t, []api.Endpoints{*eps2, *eps1}, wg.Done) + epsHandler := newEpsHandler(t, []*api.Endpoints{eps2, eps1}, wg.Done) epsConfig.RegisterHandler(epsHandler) // Setup fake api client. diff --git a/pkg/proxy/config/config.go b/pkg/proxy/config/config.go index cdd894d71a..2487f57951 100644 --- a/pkg/proxy/config/config.go +++ b/pkg/proxy/config/config.go @@ -63,7 +63,14 @@ type EndpointsConfigHandler interface { // OnEndpointsUpdate gets called when endpoints configuration is changed for a given // service on any of the configuration sources. An example is when a new // service comes up, or when containers come up or down for an existing service. - OnEndpointsUpdate(endpoints []api.Endpoints) + // + // NOTE: For efficiency, endpoints are being passed by reference, thus, + // OnEndpointsUpdate should NOT modify pointers of a given slice. + // Those endpoints objects are shared with other layers of the system and + // are guaranteed to be immutable with the assumption that are also + // not mutated by those handlers. Make a deep copy if you need to modify + // them in your code. + OnEndpointsUpdate(endpoints []*api.Endpoints) } // EndpointsConfig tracks a set of endpoints configurations. @@ -93,7 +100,7 @@ func NewEndpointsConfig() *EndpointsConfig { func (c *EndpointsConfig) RegisterHandler(handler EndpointsConfigHandler) { c.bcaster.Add(config.ListenerFunc(func(instance interface{}) { glog.V(3).Infof("Calling handler.OnEndpointsUpdate()") - handler.OnEndpointsUpdate(instance.([]api.Endpoints)) + handler.OnEndpointsUpdate(instance.([]*api.Endpoints)) })) } @@ -158,10 +165,10 @@ func (s *endpointsStore) Merge(source string, change interface{}) error { func (s *endpointsStore) MergedState() interface{} { s.endpointLock.RLock() defer s.endpointLock.RUnlock() - endpoints := make([]api.Endpoints, 0) + endpoints := make([]*api.Endpoints, 0) for _, sourceEndpoints := range s.endpoints { for _, value := range sourceEndpoints { - endpoints = append(endpoints, *value) + endpoints = append(endpoints, value) } } return endpoints diff --git a/pkg/proxy/config/config_test.go b/pkg/proxy/config/config_test.go index cdb2edad94..6be9c37855 100644 --- a/pkg/proxy/config/config_test.go +++ b/pkg/proxy/config/config_test.go @@ -83,7 +83,7 @@ func (h *ServiceHandlerMock) ValidateServices(t *testing.T, expectedServices []a } } -type sortedEndpoints []api.Endpoints +type sortedEndpoints []*api.Endpoints func (s sortedEndpoints) Len() int { return len(s) @@ -96,24 +96,24 @@ func (s sortedEndpoints) Less(i, j int) bool { } type EndpointsHandlerMock struct { - updated chan []api.Endpoints + updated chan []*api.Endpoints waits int } func NewEndpointsHandlerMock() *EndpointsHandlerMock { - return &EndpointsHandlerMock{updated: make(chan []api.Endpoints, 5)} + return &EndpointsHandlerMock{updated: make(chan []*api.Endpoints, 5)} } -func (h *EndpointsHandlerMock) OnEndpointsUpdate(endpoints []api.Endpoints) { +func (h *EndpointsHandlerMock) OnEndpointsUpdate(endpoints []*api.Endpoints) { sort.Sort(sortedEndpoints(endpoints)) h.updated <- endpoints } -func (h *EndpointsHandlerMock) ValidateEndpoints(t *testing.T, expectedEndpoints []api.Endpoints) { +func (h *EndpointsHandlerMock) ValidateEndpoints(t *testing.T, expectedEndpoints []*api.Endpoints) { // We might get 1 or more updates for N endpoint updates, because we // over write older snapshots of endpoints from the producer go-routine // if the consumer falls behind. Unittests will hard timeout in 5m. - var endpoints []api.Endpoints + var endpoints []*api.Endpoints for { select { case endpoints = <-h.updated: @@ -254,7 +254,7 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddedAndNotified(t *testing. channelOne <- endpointsUpdate1 channelTwo <- endpointsUpdate2 - endpoints := []api.Endpoints{*endpointsUpdate2.Endpoints, *endpointsUpdate1.Endpoints} + endpoints := []*api.Endpoints{endpointsUpdate2.Endpoints, endpointsUpdate1.Endpoints} handler.ValidateEndpoints(t, endpoints) handler2.ValidateEndpoints(t, endpoints) } @@ -285,7 +285,7 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t channelOne <- endpointsUpdate1 channelTwo <- endpointsUpdate2 - endpoints := []api.Endpoints{*endpointsUpdate2.Endpoints, *endpointsUpdate1.Endpoints} + endpoints := []*api.Endpoints{endpointsUpdate2.Endpoints, endpointsUpdate1.Endpoints} handler.ValidateEndpoints(t, endpoints) handler2.ValidateEndpoints(t, endpoints) @@ -298,7 +298,7 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t }}, }) channelTwo <- endpointsUpdate3 - endpoints = []api.Endpoints{*endpointsUpdate2.Endpoints, *endpointsUpdate1.Endpoints, *endpointsUpdate3.Endpoints} + endpoints = []*api.Endpoints{endpointsUpdate2.Endpoints, endpointsUpdate1.Endpoints, endpointsUpdate3.Endpoints} handler.ValidateEndpoints(t, endpoints) handler2.ValidateEndpoints(t, endpoints) @@ -311,7 +311,7 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t }}, }) channelOne <- endpointsUpdate1 - endpoints = []api.Endpoints{*endpointsUpdate2.Endpoints, *endpointsUpdate1.Endpoints, *endpointsUpdate3.Endpoints} + endpoints = []*api.Endpoints{endpointsUpdate2.Endpoints, endpointsUpdate1.Endpoints, endpointsUpdate3.Endpoints} handler.ValidateEndpoints(t, endpoints) handler2.ValidateEndpoints(t, endpoints) @@ -319,7 +319,7 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t endpointsUpdate2 = CreateEndpointsUpdate(REMOVE, &api.Endpoints{ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"}}) channelTwo <- endpointsUpdate2 - endpoints = []api.Endpoints{*endpointsUpdate1.Endpoints, *endpointsUpdate3.Endpoints} + endpoints = []*api.Endpoints{endpointsUpdate1.Endpoints, endpointsUpdate3.Endpoints} handler.ValidateEndpoints(t, endpoints) handler2.ValidateEndpoints(t, endpoints) } diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index b2f6c0b012..f79f9053a0 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -197,9 +197,14 @@ type Proxier struct { serviceMap proxyServiceMap endpointsMap proxyEndpointMap portsMap map[localPort]closeable - haveReceivedServiceUpdate bool // true once we've seen an OnServiceUpdate event - allEndpoints []api.Endpoints // nil until we have seen an OnEndpointsUpdate event - throttle flowcontrol.RateLimiter + haveReceivedServiceUpdate bool // true once we've seen an OnServiceUpdate event + // allEndpoints should never be modified by proxier - the pointers + // are shared with higher layers of kube-proxy. They are guaranteed + // to not be modified in the meantime, but also require to be not + // modified by Proxier. + // nil until we have seen an OnEndpointsUpdate event. + allEndpoints []*api.Endpoints + throttle flowcontrol.RateLimiter // These are effectively const and do not need the mutex to be held. syncPeriod time.Duration @@ -559,7 +564,7 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) { } // OnEndpointsUpdate takes in a slice of updated endpoints. -func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) { +func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []*api.Endpoints) { proxier.mu.Lock() defer proxier.mu.Unlock() if proxier.allEndpoints == nil { @@ -580,7 +585,7 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []api.Endpoints) { } // Convert a slice of api.Endpoints objects into a map of service-port -> endpoints. -func updateEndpoints(allEndpoints []api.Endpoints, curMap proxyEndpointMap, hostname string, +func updateEndpoints(allEndpoints []*api.Endpoints, curMap proxyEndpointMap, hostname string, healthChecker healthChecker) (newMap proxyEndpointMap, staleSet map[endpointServicePair]bool) { // return values @@ -589,7 +594,7 @@ func updateEndpoints(allEndpoints []api.Endpoints, curMap proxyEndpointMap, host // Update endpoints for services. for i := range allEndpoints { - accumulateEndpointsMap(&allEndpoints[i], hostname, curMap, &newMap) + accumulateEndpointsMap(allEndpoints[i], hostname, curMap, &newMap) } // Check stale connections against endpoints missing from the update. // TODO: we should really only mark a connection stale if the proto was UDP @@ -630,6 +635,8 @@ func updateEndpoints(allEndpoints []api.Endpoints, curMap proxyEndpointMap, host // scope - it only knows one Endpoints, but sees the whole current map. That // cleanup has to be done above. // +// NOTE: endpoints object should NOT be modified. +// // TODO: this could be simplified: // - hostPortInfo and endpointsInfo overlap too much // - the test for this is overlapped by the test for updateEndpoints diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index b3ac6390db..abb6a41b66 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -369,7 +369,7 @@ func NewFakeProxier(ipt utiliptables.Interface) *Proxier { serviceMap: make(map[proxy.ServicePortName]*serviceInfo), iptables: ipt, clusterCIDR: "10.0.0.0/24", - allEndpoints: []api.Endpoints{}, + allEndpoints: []*api.Endpoints{}, haveReceivedServiceUpdate: true, hostname: testHostname, portsMap: make(map[localPort]closeable), @@ -570,7 +570,7 @@ func TestClusterIPEndpointsJump(t *testing.T) { ip := "10.180.0.1" port := 80 ep := fmt.Sprintf("%s:%d", ip, port) - allEndpoints := []api.Endpoints{ + allEndpoints := []*api.Endpoints{ makeTestEndpoints("ns1", svcName, func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -624,7 +624,7 @@ func TestLoadBalancer(t *testing.T) { ip := "10.180.0.1" port := 80 - fp.allEndpoints = []api.Endpoints{ + fp.allEndpoints = []*api.Endpoints{ makeTestEndpoints("ns1", svcName, func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -669,7 +669,7 @@ func TestNodePort(t *testing.T) { ip := "10.180.0.1" port := 80 - fp.allEndpoints = []api.Endpoints{ + fp.allEndpoints = []*api.Endpoints{ makeTestEndpoints("ns1", svcName, func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -732,7 +732,7 @@ func TestOnlyLocalLoadBalancing(t *testing.T) { port := 80 nonLocalEp := fmt.Sprintf("%s:%d", ip1, port) localEp := fmt.Sprintf("%s:%d", ip2, port) - allEndpoints := []api.Endpoints{ + allEndpoints := []*api.Endpoints{ makeTestEndpoints("ns1", svcName, func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -810,7 +810,7 @@ func onlyLocalNodePorts(t *testing.T, fp *Proxier, ipt *iptablestest.FakeIPTable port := 80 nonLocalEp := fmt.Sprintf("%s:%d", ip1, port) localEp := fmt.Sprintf("%s:%d", ip2, port) - allEndpoints := []api.Endpoints{ + allEndpoints := []*api.Endpoints{ makeTestEndpoints("ns1", svcName, func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -1147,7 +1147,7 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { // This is a coarse test, but it offers some modicum of confidence as the code is evolved. func Test_accumulateEndpointsMap(t *testing.T) { testCases := []struct { - newEndpoints api.Endpoints + newEndpoints *api.Endpoints oldEndpoints map[proxy.ServicePortName][]*endpointsInfo expectedNew map[proxy.ServicePortName][]*endpointsInfo }{{ @@ -1356,7 +1356,7 @@ func Test_accumulateEndpointsMap(t *testing.T) { for tci, tc := range testCases { // outputs newEndpoints := make(proxyEndpointMap) - accumulateEndpointsMap(&tc.newEndpoints, "host", tc.oldEndpoints, &newEndpoints) + accumulateEndpointsMap(tc.newEndpoints, "host", tc.oldEndpoints, &newEndpoints) if len(newEndpoints) != len(tc.expectedNew) { t.Errorf("[%d] expected %d new, got %d: %v", tci, len(tc.expectedNew), len(newEndpoints), spew.Sdump(newEndpoints)) @@ -1375,14 +1375,14 @@ func Test_accumulateEndpointsMap(t *testing.T) { } } -func makeTestEndpoints(namespace, name string, eptFunc func(*api.Endpoints)) api.Endpoints { - ept := api.Endpoints{ +func makeTestEndpoints(namespace, name string, eptFunc func(*api.Endpoints)) *api.Endpoints { + ept := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: namespace, }, } - eptFunc(&ept) + eptFunc(ept) return ept } @@ -1398,19 +1398,19 @@ func makeServicePortName(ns, name, port string) proxy.ServicePortName { func Test_updateEndpoints(t *testing.T) { testCases := []struct { - newEndpoints []api.Endpoints + newEndpoints []*api.Endpoints oldEndpoints map[proxy.ServicePortName][]*endpointsInfo expectedResult map[proxy.ServicePortName][]*endpointsInfo expectedStale []endpointServicePair }{{ // Case[0]: nothing - newEndpoints: []api.Endpoints{}, + newEndpoints: []*api.Endpoints{}, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{}, expectedStale: []endpointServicePair{}, }, { // Case[1]: no change, unnamed port - newEndpoints: []api.Endpoints{ + newEndpoints: []*api.Endpoints{ makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -1435,7 +1435,7 @@ func Test_updateEndpoints(t *testing.T) { expectedStale: []endpointServicePair{}, }, { // Case[2]: no change, named port - newEndpoints: []api.Endpoints{ + newEndpoints: []*api.Endpoints{ makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -1461,7 +1461,7 @@ func Test_updateEndpoints(t *testing.T) { expectedStale: []endpointServicePair{}, }, { // Case[3]: no change, multiple subsets - newEndpoints: []api.Endpoints{ + newEndpoints: []*api.Endpoints{ makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -1501,7 +1501,7 @@ func Test_updateEndpoints(t *testing.T) { expectedStale: []endpointServicePair{}, }, { // Case[4]: no change, multiple subsets, multiple ports - newEndpoints: []api.Endpoints{ + newEndpoints: []*api.Endpoints{ makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -1550,7 +1550,7 @@ func Test_updateEndpoints(t *testing.T) { expectedStale: []endpointServicePair{}, }, { // Case[5]: no change, multiple endpoints, subsets, IPs, and ports - newEndpoints: []api.Endpoints{ + newEndpoints: []*api.Endpoints{ makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -1652,7 +1652,7 @@ func Test_updateEndpoints(t *testing.T) { expectedStale: []endpointServicePair{}, }, { // Case[6]: add an Endpoints - newEndpoints: []api.Endpoints{ + newEndpoints: []*api.Endpoints{ makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -1673,7 +1673,7 @@ func Test_updateEndpoints(t *testing.T) { expectedStale: []endpointServicePair{}, }, { // Case[7]: remove an Endpoints - newEndpoints: []api.Endpoints{ /* empty */ }, + newEndpoints: []*api.Endpoints{ /* empty */ }, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{ makeServicePortName("ns1", "ep1", ""): { {"1.1.1.1:11", false}, @@ -1686,7 +1686,7 @@ func Test_updateEndpoints(t *testing.T) { }}, }, { // Case[8]: add an IP and port - newEndpoints: []api.Endpoints{ + newEndpoints: []*api.Endpoints{ makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -1722,7 +1722,7 @@ func Test_updateEndpoints(t *testing.T) { expectedStale: []endpointServicePair{}, }, { // Case[9]: remove an IP and port - newEndpoints: []api.Endpoints{ + newEndpoints: []*api.Endpoints{ makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -1762,7 +1762,7 @@ func Test_updateEndpoints(t *testing.T) { }}, }, { // Case[10]: add a subset - newEndpoints: []api.Endpoints{ + newEndpoints: []*api.Endpoints{ makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -1799,7 +1799,7 @@ func Test_updateEndpoints(t *testing.T) { expectedStale: []endpointServicePair{}, }, { // Case[11]: remove a subset - newEndpoints: []api.Endpoints{ + newEndpoints: []*api.Endpoints{ makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -1831,7 +1831,7 @@ func Test_updateEndpoints(t *testing.T) { }}, }, { // Case[12]: rename a port - newEndpoints: []api.Endpoints{ + newEndpoints: []*api.Endpoints{ makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -1860,7 +1860,7 @@ func Test_updateEndpoints(t *testing.T) { }}, }, { // Case[13]: renumber a port - newEndpoints: []api.Endpoints{ + newEndpoints: []*api.Endpoints{ makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ @@ -1889,7 +1889,7 @@ func Test_updateEndpoints(t *testing.T) { }}, }, { // Case[14]: complex add and remove - newEndpoints: []api.Endpoints{ + newEndpoints: []*api.Endpoints{ makeTestEndpoints("ns1", "ep1", func(ept *api.Endpoints) { ept.Subsets = []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{ diff --git a/pkg/proxy/userspace/proxier_test.go b/pkg/proxy/userspace/proxier_test.go index ed5080c202..dfbb0c6593 100644 --- a/pkg/proxy/userspace/proxier_test.go +++ b/pkg/proxy/userspace/proxier_test.go @@ -202,7 +202,7 @@ func waitForNumProxyClients(t *testing.T, s *ServiceInfo, want int, timeout time func TestTCPProxy(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]api.Endpoints{ + lb.OnEndpointsUpdate([]*api.Endpoints{ { ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{{ @@ -232,7 +232,7 @@ func TestTCPProxy(t *testing.T) { func TestUDPProxy(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]api.Endpoints{ + lb.OnEndpointsUpdate([]*api.Endpoints{ { ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{{ @@ -262,7 +262,7 @@ func TestUDPProxy(t *testing.T) { func TestUDPProxyTimeout(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]api.Endpoints{ + lb.OnEndpointsUpdate([]*api.Endpoints{ { ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{{ @@ -297,7 +297,7 @@ func TestMultiPortProxy(t *testing.T) { lb := NewLoadBalancerRR() serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo-p"}, Port: "p"} serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo-q"}, Port: "q"} - lb.OnEndpointsUpdate([]api.Endpoints{{ + lb.OnEndpointsUpdate([]*api.Endpoints{{ ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, @@ -397,7 +397,7 @@ func stopProxyByName(proxier *Proxier, service proxy.ServicePortName) error { func TestTCPProxyStop(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]api.Endpoints{ + lb.OnEndpointsUpdate([]*api.Endpoints{ { ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, Subsets: []api.EndpointSubset{{ @@ -444,7 +444,7 @@ func TestTCPProxyStop(t *testing.T) { func TestUDPProxyStop(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]api.Endpoints{ + lb.OnEndpointsUpdate([]*api.Endpoints{ { ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, Subsets: []api.EndpointSubset{{ @@ -485,7 +485,7 @@ func TestUDPProxyStop(t *testing.T) { func TestTCPProxyUpdateDelete(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]api.Endpoints{ + lb.OnEndpointsUpdate([]*api.Endpoints{ { ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, Subsets: []api.EndpointSubset{{ @@ -525,7 +525,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) { func TestUDPProxyUpdateDelete(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]api.Endpoints{ + lb.OnEndpointsUpdate([]*api.Endpoints{ { ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, Subsets: []api.EndpointSubset{{ @@ -565,14 +565,14 @@ func TestUDPProxyUpdateDelete(t *testing.T) { func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - endpoint := api.Endpoints{ + endpoint := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, }}, } - lb.OnEndpointsUpdate([]api.Endpoints{endpoint}) + lb.OnEndpointsUpdate([]*api.Endpoints{endpoint}) fexec := makeFakeExec() @@ -601,7 +601,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { waitForNumProxyLoops(t, p, 0) // need to add endpoint here because it got clean up during service delete - lb.OnEndpointsUpdate([]api.Endpoints{endpoint}) + lb.OnEndpointsUpdate([]*api.Endpoints{endpoint}) p.OnServiceUpdate([]api.Service{{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ @@ -621,14 +621,14 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - endpoint := api.Endpoints{ + endpoint := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}}, }}, } - lb.OnEndpointsUpdate([]api.Endpoints{endpoint}) + lb.OnEndpointsUpdate([]*api.Endpoints{endpoint}) fexec := makeFakeExec() @@ -657,7 +657,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { waitForNumProxyLoops(t, p, 0) // need to add endpoint here because it got clean up during service delete - lb.OnEndpointsUpdate([]api.Endpoints{endpoint}) + lb.OnEndpointsUpdate([]*api.Endpoints{endpoint}) p.OnServiceUpdate([]api.Service{{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []api.ServicePort{{ @@ -677,7 +677,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { func TestTCPProxyUpdatePort(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]api.Endpoints{ + lb.OnEndpointsUpdate([]*api.Endpoints{ { ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{{ @@ -728,7 +728,7 @@ func TestTCPProxyUpdatePort(t *testing.T) { func TestUDPProxyUpdatePort(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]api.Endpoints{ + lb.OnEndpointsUpdate([]*api.Endpoints{ { ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{{ @@ -776,7 +776,7 @@ func TestUDPProxyUpdatePort(t *testing.T) { func TestProxyUpdatePublicIPs(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]api.Endpoints{ + lb.OnEndpointsUpdate([]*api.Endpoints{ { ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{{ @@ -831,14 +831,14 @@ func TestProxyUpdatePublicIPs(t *testing.T) { func TestProxyUpdatePortal(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - endpoint := api.Endpoints{ + endpoint := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, }}, } - lb.OnEndpointsUpdate([]api.Endpoints{endpoint}) + lb.OnEndpointsUpdate([]*api.Endpoints{endpoint}) fexec := makeFakeExec() @@ -890,7 +890,7 @@ func TestProxyUpdatePortal(t *testing.T) { Protocol: "TCP", }}}, }}) - lb.OnEndpointsUpdate([]api.Endpoints{endpoint}) + lb.OnEndpointsUpdate([]*api.Endpoints{endpoint}) svcInfo, exists = p.getServiceInfo(service) if !exists { t.Fatalf("service with ClusterIP set not found in the proxy") diff --git a/pkg/proxy/userspace/roundrobin.go b/pkg/proxy/userspace/roundrobin.go index ca0ccc6edd..5d58c3c25e 100644 --- a/pkg/proxy/userspace/roundrobin.go +++ b/pkg/proxy/userspace/roundrobin.go @@ -246,14 +246,15 @@ func (lb *LoadBalancerRR) updateAffinityMap(svcPort proxy.ServicePortName, newEn // OnEndpointsUpdate manages the registered service endpoints. // Registered endpoints are updated if found in the update set or // unregistered if missing from the update set. -func (lb *LoadBalancerRR) OnEndpointsUpdate(allEndpoints []api.Endpoints) { +func (lb *LoadBalancerRR) OnEndpointsUpdate(allEndpoints []*api.Endpoints) { registeredEndpoints := make(map[proxy.ServicePortName]bool) lb.lock.Lock() defer lb.lock.Unlock() // Update endpoints for services. for i := range allEndpoints { - svcEndpoints := &allEndpoints[i] + // svcEndpoints object should NOT be modified. + svcEndpoints := allEndpoints[i] // We need to build a map of portname -> all ip:ports for that // portname. Explode Endpoints.Subsets[*] into this structure. diff --git a/pkg/proxy/userspace/roundrobin_test.go b/pkg/proxy/userspace/roundrobin_test.go index 0e744c7ea4..732179d808 100644 --- a/pkg/proxy/userspace/roundrobin_test.go +++ b/pkg/proxy/userspace/roundrobin_test.go @@ -67,7 +67,7 @@ func TestFilterWorks(t *testing.T) { func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) { loadBalancer := NewLoadBalancerRR() - var endpoints []api.Endpoints + var endpoints []*api.Endpoints loadBalancer.OnEndpointsUpdate(endpoints) service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "does-not-exist"} endpoint, err := loadBalancer.NextEndpoint(service, nil, false) @@ -106,8 +106,8 @@ func TestLoadBalanceWorksWithSingleEndpoint(t *testing.T) { if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - endpoints := make([]api.Endpoints, 1) - endpoints[0] = api.Endpoints{ + endpoints := make([]*api.Endpoints, 1) + endpoints[0] = &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "endpoint1"}}, @@ -144,8 +144,8 @@ func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) { if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - endpoints := make([]api.Endpoints, 1) - endpoints[0] = api.Endpoints{ + endpoints := make([]*api.Endpoints, 1) + endpoints[0] = &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "endpoint"}}, @@ -172,8 +172,8 @@ func TestLoadBalanceWorksWithMultipleEndpointsMultiplePorts(t *testing.T) { if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - endpoints := make([]api.Endpoints, 1) - endpoints[0] = api.Endpoints{ + endpoints := make([]*api.Endpoints, 1) + endpoints[0] = &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: []api.EndpointSubset{ { @@ -215,8 +215,8 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - endpoints := make([]api.Endpoints, 1) - endpoints[0] = api.Endpoints{ + endpoints := make([]*api.Endpoints, 1) + endpoints[0] = &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: []api.EndpointSubset{ { @@ -255,7 +255,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { // Then update the configuration with one fewer endpoints, make sure // we start in the beginning again - endpoints[0] = api.Endpoints{ + endpoints[0] = &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: []api.EndpointSubset{ { @@ -289,7 +289,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[1], nil) // Clear endpoints - endpoints[0] = api.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: nil} + endpoints[0] = &api.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: nil} loadBalancer.OnEndpointsUpdate(endpoints) endpoint, err = loadBalancer.NextEndpoint(serviceP, nil, false) @@ -306,8 +306,8 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) { if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - endpoints := make([]api.Endpoints, 2) - endpoints[0] = api.Endpoints{ + endpoints := make([]*api.Endpoints, 2) + endpoints[0] = &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: fooServiceP.Name, Namespace: fooServiceP.Namespace}, Subsets: []api.EndpointSubset{ { @@ -316,7 +316,7 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) { }, }, } - endpoints[1] = api.Endpoints{ + endpoints[1] = &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: barServiceP.Name, Namespace: barServiceP.Namespace}, Subsets: []api.EndpointSubset{ { @@ -364,8 +364,8 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) { // Call NewService() before OnEndpointsUpdate() loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0) - endpoints := make([]api.Endpoints, 1) - endpoints[0] = api.Endpoints{ + endpoints := make([]*api.Endpoints, 1) + endpoints[0] = &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ {Addresses: []api.EndpointAddress{{IP: "endpoint1"}}, Ports: []api.EndpointPort{{Port: 1}}}, @@ -420,8 +420,8 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledSecond(t *testing.T) { } // Call OnEndpointsUpdate() before NewService() - endpoints := make([]api.Endpoints, 1) - endpoints[0] = api.Endpoints{ + endpoints := make([]*api.Endpoints, 1) + endpoints[0] = &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ {Addresses: []api.EndpointAddress{{IP: "endpoint1"}}, Ports: []api.EndpointPort{{Port: 1}}}, @@ -482,8 +482,8 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { } loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0) - endpoints := make([]api.Endpoints, 1) - endpoints[0] = api.Endpoints{ + endpoints := make([]*api.Endpoints, 1) + endpoints[0] = &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ { @@ -503,7 +503,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3) client3Endpoint := shuffledEndpoints[2] - endpoints[0] = api.Endpoints{ + endpoints[0] = &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ { @@ -525,7 +525,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { expectEndpoint(t, loadBalancer, service, client2Endpoint, client2) expectEndpoint(t, loadBalancer, service, client3Endpoint, client3) - endpoints[0] = api.Endpoints{ + endpoints[0] = &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ { @@ -556,8 +556,8 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { } loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0) - endpoints := make([]api.Endpoints, 1) - endpoints[0] = api.Endpoints{ + endpoints := make([]*api.Endpoints, 1) + endpoints[0] = &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ { @@ -577,7 +577,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2) // Then update the configuration with one fewer endpoints, make sure // we start in the beginning again - endpoints[0] = api.Endpoints{ + endpoints[0] = &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ { @@ -596,7 +596,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2) // Clear endpoints - endpoints[0] = api.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: nil} + endpoints[0] = &api.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: nil} loadBalancer.OnEndpointsUpdate(endpoints) endpoint, err = loadBalancer.NextEndpoint(service, nil, false) @@ -616,8 +616,8 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) { t.Errorf("Didn't fail with non-existent service") } loadBalancer.NewService(fooService, api.ServiceAffinityClientIP, 0) - endpoints := make([]api.Endpoints, 2) - endpoints[0] = api.Endpoints{ + endpoints := make([]*api.Endpoints, 2) + endpoints[0] = &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: fooService.Name, Namespace: fooService.Namespace}, Subsets: []api.EndpointSubset{ { @@ -628,7 +628,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) { } barService := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "bar"}, Port: ""} loadBalancer.NewService(barService, api.ServiceAffinityClientIP, 0) - endpoints[1] = api.Endpoints{ + endpoints[1] = &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: barService.Name, Namespace: barService.Namespace}, Subsets: []api.EndpointSubset{ { @@ -685,8 +685,8 @@ func TestStickyLoadBalanceWorksWithEndpointFails(t *testing.T) { // Call NewService() before OnEndpointsUpdate() loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0) - endpoints := make([]api.Endpoints, 1) - endpoints[0] = api.Endpoints{ + endpoints := make([]*api.Endpoints, 1) + endpoints[0] = &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ {Addresses: []api.EndpointAddress{{IP: "endpoint1"}}, Ports: []api.EndpointPort{{Port: 1}}}, diff --git a/pkg/proxy/winuserspace/proxier_test.go b/pkg/proxy/winuserspace/proxier_test.go index 925ea068c1..3f3cfbbd83 100644 --- a/pkg/proxy/winuserspace/proxier_test.go +++ b/pkg/proxy/winuserspace/proxier_test.go @@ -216,7 +216,7 @@ func getPortNum(t *testing.T, addr string) int { func TestTCPProxy(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]api.Endpoints{ + lb.OnEndpointsUpdate([]*api.Endpoints{ { ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{{ @@ -245,7 +245,7 @@ func TestTCPProxy(t *testing.T) { func TestUDPProxy(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]api.Endpoints{ + lb.OnEndpointsUpdate([]*api.Endpoints{ { ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{{ @@ -274,7 +274,7 @@ func TestUDPProxy(t *testing.T) { func TestUDPProxyTimeout(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]api.Endpoints{ + lb.OnEndpointsUpdate([]*api.Endpoints{ { ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{{ @@ -308,7 +308,7 @@ func TestMultiPortProxy(t *testing.T) { lb := NewLoadBalancerRR() serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo-p"}, Port: "p"} serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo-q"}, Port: "q"} - lb.OnEndpointsUpdate([]api.Endpoints{{ + lb.OnEndpointsUpdate([]*api.Endpoints{{ ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, @@ -410,7 +410,7 @@ func stopProxyByName(proxier *Proxier, service ServicePortPortalName) error { func TestTCPProxyStop(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]api.Endpoints{ + lb.OnEndpointsUpdate([]*api.Endpoints{ { ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, Subsets: []api.EndpointSubset{{ @@ -456,7 +456,7 @@ func TestTCPProxyStop(t *testing.T) { func TestUDPProxyStop(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]api.Endpoints{ + lb.OnEndpointsUpdate([]*api.Endpoints{ { ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, Subsets: []api.EndpointSubset{{ @@ -496,7 +496,7 @@ func TestUDPProxyStop(t *testing.T) { func TestTCPProxyUpdateDelete(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]api.Endpoints{ + lb.OnEndpointsUpdate([]*api.Endpoints{ { ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, Subsets: []api.EndpointSubset{{ @@ -536,7 +536,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) { func TestUDPProxyUpdateDelete(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]api.Endpoints{ + lb.OnEndpointsUpdate([]*api.Endpoints{ { ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, Subsets: []api.EndpointSubset{{ @@ -575,14 +575,14 @@ func TestUDPProxyUpdateDelete(t *testing.T) { func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - endpoint := api.Endpoints{ + endpoint := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, }}, } - lb.OnEndpointsUpdate([]api.Endpoints{endpoint}) + lb.OnEndpointsUpdate([]*api.Endpoints{endpoint}) listenIP := "0.0.0.0" p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest) @@ -610,7 +610,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { waitForNumProxyLoops(t, p, 0) // need to add endpoint here because it got clean up during service delete - lb.OnEndpointsUpdate([]api.Endpoints{endpoint}) + lb.OnEndpointsUpdate([]*api.Endpoints{endpoint}) p.OnServiceUpdate([]api.Service{{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{ @@ -630,14 +630,14 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - endpoint := api.Endpoints{ + endpoint := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}}, }}, } - lb.OnEndpointsUpdate([]api.Endpoints{endpoint}) + lb.OnEndpointsUpdate([]*api.Endpoints{endpoint}) listenIP := "0.0.0.0" p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest) @@ -665,7 +665,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { waitForNumProxyLoops(t, p, 0) // need to add endpoint here because it got clean up during service delete - lb.OnEndpointsUpdate([]api.Endpoints{endpoint}) + lb.OnEndpointsUpdate([]*api.Endpoints{endpoint}) p.OnServiceUpdate([]api.Service{{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{ @@ -685,7 +685,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { func TestTCPProxyUpdatePort(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]api.Endpoints{ + lb.OnEndpointsUpdate([]*api.Endpoints{ { ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{{ @@ -735,7 +735,7 @@ func TestTCPProxyUpdatePort(t *testing.T) { func TestUDPProxyUpdatePort(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]api.Endpoints{ + lb.OnEndpointsUpdate([]*api.Endpoints{ { ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{{ @@ -782,7 +782,7 @@ func TestUDPProxyUpdatePort(t *testing.T) { func TestProxyUpdatePublicIPs(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]api.Endpoints{ + lb.OnEndpointsUpdate([]*api.Endpoints{ { ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{{ @@ -836,14 +836,14 @@ func TestProxyUpdatePublicIPs(t *testing.T) { func TestProxyUpdatePortal(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - endpoint := api.Endpoints{ + endpoint := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, }}, } - lb.OnEndpointsUpdate([]api.Endpoints{endpoint}) + lb.OnEndpointsUpdate([]*api.Endpoints{endpoint}) listenIP := "0.0.0.0" p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest) @@ -894,7 +894,7 @@ func TestProxyUpdatePortal(t *testing.T) { Protocol: "TCP", }}}, }}) - lb.OnEndpointsUpdate([]api.Endpoints{endpoint}) + lb.OnEndpointsUpdate([]*api.Endpoints{endpoint}) svcInfo, exists = p.getServiceInfo(servicePortPortalName) if !exists { t.Fatalf("service with ClusterIP set not found in the proxy") diff --git a/pkg/proxy/winuserspace/roundrobin.go b/pkg/proxy/winuserspace/roundrobin.go index df2e5bc3be..c7daa68c82 100644 --- a/pkg/proxy/winuserspace/roundrobin.go +++ b/pkg/proxy/winuserspace/roundrobin.go @@ -236,14 +236,15 @@ func (lb *LoadBalancerRR) updateAffinityMap(svcPort proxy.ServicePortName, newEn // OnEndpointsUpdate manages the registered service endpoints. // Registered endpoints are updated if found in the update set or // unregistered if missing from the update set. -func (lb *LoadBalancerRR) OnEndpointsUpdate(allEndpoints []api.Endpoints) { +func (lb *LoadBalancerRR) OnEndpointsUpdate(allEndpoints []*api.Endpoints) { registeredEndpoints := make(map[proxy.ServicePortName]bool) lb.lock.Lock() defer lb.lock.Unlock() // Update endpoints for services. for i := range allEndpoints { - svcEndpoints := &allEndpoints[i] + // svcEndpoints should NOT be modified. + svcEndpoints := allEndpoints[i] // We need to build a map of portname -> all ip:ports for that // portname. Explode Endpoints.Subsets[*] into this structure. diff --git a/pkg/proxy/winuserspace/roundrobin_test.go b/pkg/proxy/winuserspace/roundrobin_test.go index 8983e2e92b..0e4356050a 100644 --- a/pkg/proxy/winuserspace/roundrobin_test.go +++ b/pkg/proxy/winuserspace/roundrobin_test.go @@ -67,7 +67,7 @@ func TestFilterWorks(t *testing.T) { func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) { loadBalancer := NewLoadBalancerRR() - var endpoints []api.Endpoints + var endpoints []*api.Endpoints loadBalancer.OnEndpointsUpdate(endpoints) service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "does-not-exist"} endpoint, err := loadBalancer.NextEndpoint(service, nil, false) @@ -106,8 +106,8 @@ func TestLoadBalanceWorksWithSingleEndpoint(t *testing.T) { if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - endpoints := make([]api.Endpoints, 1) - endpoints[0] = api.Endpoints{ + endpoints := make([]*api.Endpoints, 1) + endpoints[0] = &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "endpoint1"}}, @@ -144,8 +144,8 @@ func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) { if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - endpoints := make([]api.Endpoints, 1) - endpoints[0] = api.Endpoints{ + endpoints := make([]*api.Endpoints, 1) + endpoints[0] = &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "endpoint"}}, @@ -172,8 +172,8 @@ func TestLoadBalanceWorksWithMultipleEndpointsMultiplePorts(t *testing.T) { if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - endpoints := make([]api.Endpoints, 1) - endpoints[0] = api.Endpoints{ + endpoints := make([]*api.Endpoints, 1) + endpoints[0] = &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: []api.EndpointSubset{ { @@ -215,8 +215,8 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - endpoints := make([]api.Endpoints, 1) - endpoints[0] = api.Endpoints{ + endpoints := make([]*api.Endpoints, 1) + endpoints[0] = &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: []api.EndpointSubset{ { @@ -255,7 +255,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { // Then update the configuration with one fewer endpoints, make sure // we start in the beginning again - endpoints[0] = api.Endpoints{ + endpoints[0] = &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: []api.EndpointSubset{ { @@ -289,7 +289,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[1], nil) // Clear endpoints - endpoints[0] = api.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: nil} + endpoints[0] = &api.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: nil} loadBalancer.OnEndpointsUpdate(endpoints) endpoint, err = loadBalancer.NextEndpoint(serviceP, nil, false) @@ -306,8 +306,8 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) { if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - endpoints := make([]api.Endpoints, 2) - endpoints[0] = api.Endpoints{ + endpoints := make([]*api.Endpoints, 2) + endpoints[0] = &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: fooServiceP.Name, Namespace: fooServiceP.Namespace}, Subsets: []api.EndpointSubset{ { @@ -316,7 +316,7 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) { }, }, } - endpoints[1] = api.Endpoints{ + endpoints[1] = &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: barServiceP.Name, Namespace: barServiceP.Namespace}, Subsets: []api.EndpointSubset{ { @@ -364,8 +364,8 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) { // Call NewService() before OnEndpointsUpdate() loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0) - endpoints := make([]api.Endpoints, 1) - endpoints[0] = api.Endpoints{ + endpoints := make([]*api.Endpoints, 1) + endpoints[0] = &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ {Addresses: []api.EndpointAddress{{IP: "endpoint1"}}, Ports: []api.EndpointPort{{Port: 1}}}, @@ -420,8 +420,8 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledSecond(t *testing.T) { } // Call OnEndpointsUpdate() before NewService() - endpoints := make([]api.Endpoints, 1) - endpoints[0] = api.Endpoints{ + endpoints := make([]*api.Endpoints, 1) + endpoints[0] = &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ {Addresses: []api.EndpointAddress{{IP: "endpoint1"}}, Ports: []api.EndpointPort{{Port: 1}}}, @@ -482,8 +482,8 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { } loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0) - endpoints := make([]api.Endpoints, 1) - endpoints[0] = api.Endpoints{ + endpoints := make([]*api.Endpoints, 1) + endpoints[0] = &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ { @@ -503,7 +503,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3) client3Endpoint := shuffledEndpoints[2] - endpoints[0] = api.Endpoints{ + endpoints[0] = &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ { @@ -525,7 +525,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { expectEndpoint(t, loadBalancer, service, client2Endpoint, client2) expectEndpoint(t, loadBalancer, service, client3Endpoint, client3) - endpoints[0] = api.Endpoints{ + endpoints[0] = &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ { @@ -556,8 +556,8 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { } loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0) - endpoints := make([]api.Endpoints, 1) - endpoints[0] = api.Endpoints{ + endpoints := make([]*api.Endpoints, 1) + endpoints[0] = &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ { @@ -577,7 +577,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2) // Then update the configuration with one fewer endpoints, make sure // we start in the beginning again - endpoints[0] = api.Endpoints{ + endpoints[0] = &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ { @@ -596,7 +596,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2) // Clear endpoints - endpoints[0] = api.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: nil} + endpoints[0] = &api.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: nil} loadBalancer.OnEndpointsUpdate(endpoints) endpoint, err = loadBalancer.NextEndpoint(service, nil, false) @@ -616,8 +616,8 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) { t.Errorf("Didn't fail with non-existent service") } loadBalancer.NewService(fooService, api.ServiceAffinityClientIP, 0) - endpoints := make([]api.Endpoints, 2) - endpoints[0] = api.Endpoints{ + endpoints := make([]*api.Endpoints, 2) + endpoints[0] = &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: fooService.Name, Namespace: fooService.Namespace}, Subsets: []api.EndpointSubset{ { @@ -628,7 +628,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) { } barService := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "bar"}, Port: ""} loadBalancer.NewService(barService, api.ServiceAffinityClientIP, 0) - endpoints[1] = api.Endpoints{ + endpoints[1] = &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: barService.Name, Namespace: barService.Namespace}, Subsets: []api.EndpointSubset{ { @@ -685,8 +685,8 @@ func TestStickyLoadBalanceWorksWithEndpointFails(t *testing.T) { // Call NewService() before OnEndpointsUpdate() loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0) - endpoints := make([]api.Endpoints, 1) - endpoints[0] = api.Endpoints{ + endpoints := make([]*api.Endpoints, 1) + endpoints[0] = &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ {Addresses: []api.EndpointAddress{{IP: "endpoint1"}}, Ports: []api.EndpointPort{{Port: 1}}},