From c3e9467b635fcfb413a29169080630a425623852 Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Tue, 11 Apr 2017 09:49:47 +0200 Subject: [PATCH] Edge-based winuserspace proxy --- cmd/kube-proxy/app/server.go | 2 +- pkg/proxy/winuserspace/proxier_test.go | 159 ++++++++++------------ pkg/proxy/winuserspace/roundrobin.go | 149 +++++++++++++------- pkg/proxy/winuserspace/roundrobin_test.go | 91 ++++++------- 4 files changed, 209 insertions(+), 192 deletions(-) diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index f4615be728..d15969124a 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -261,7 +261,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err // our config.EndpointsConfigHandler. loadBalancer := winuserspace.NewLoadBalancerRR() // set EndpointsHandler to our loadBalancer - endpointsHandler = loadBalancer + endpointsEventHandler = loadBalancer proxierUserspace, err := winuserspace.NewProxier( loadBalancer, net.ParseIP(config.BindAddress), diff --git a/pkg/proxy/winuserspace/proxier_test.go b/pkg/proxy/winuserspace/proxier_test.go index 59bb6667b7..9334880a1d 100644 --- a/pkg/proxy/winuserspace/proxier_test.go +++ b/pkg/proxy/winuserspace/proxier_test.go @@ -216,14 +216,12 @@ func getPortNum(t *testing.T, addr string) int { func TestTCPProxy(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]*api.Endpoints{ - { - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, - }}, - }, + lb.OnEndpointsAdd(&api.Endpoints{ + ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, + }}, }) listenIP := "0.0.0.0" @@ -245,14 +243,12 @@ func TestTCPProxy(t *testing.T) { func TestUDPProxy(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]*api.Endpoints{ - { - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}}, - }}, - }, + lb.OnEndpointsAdd(&api.Endpoints{ + ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}}, + }}, }) listenIP := "0.0.0.0" @@ -274,14 +270,12 @@ func TestUDPProxy(t *testing.T) { func TestUDPProxyTimeout(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]*api.Endpoints{ - { - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}}, - }}, - }, + lb.OnEndpointsAdd(&api.Endpoints{ + ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}}, + }}, }) listenIP := "0.0.0.0" @@ -308,19 +302,20 @@ func TestMultiPortProxy(t *testing.T) { lb := NewLoadBalancerRR() serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo-p"}, Port: "p"} serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo-q"}, Port: "q"} - lb.OnEndpointsUpdate([]*api.Endpoints{{ + lb.OnEndpointsAdd(&api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, Ports: []api.EndpointPort{{Name: "p", Protocol: "TCP", Port: tcpServerPort}}, }}, - }, { + }) + lb.OnEndpointsAdd(&api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: serviceQ.Name, Namespace: serviceQ.Namespace}, Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, Ports: []api.EndpointPort{{Name: "q", Protocol: "UDP", Port: udpServerPort}}, }}, - }}) + }) listenIP := "0.0.0.0" p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest) @@ -410,14 +405,12 @@ func stopProxyByName(proxier *Proxier, service ServicePortPortalName) error { func TestTCPProxyStop(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]*api.Endpoints{ - { - ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, - }}, - }, + lb.OnEndpointsAdd(&api.Endpoints{ + ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, + }}, }) listenIP := "0.0.0.0" @@ -456,14 +449,12 @@ func TestTCPProxyStop(t *testing.T) { func TestUDPProxyStop(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]*api.Endpoints{ - { - ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}}, - }}, - }, + lb.OnEndpointsAdd(&api.Endpoints{ + ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}}, + }}, }) listenIP := "0.0.0.0" @@ -496,14 +487,12 @@ func TestUDPProxyStop(t *testing.T) { func TestTCPProxyUpdateDelete(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]*api.Endpoints{ - { - ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, - }}, - }, + lb.OnEndpointsAdd(&api.Endpoints{ + ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, + }}, }) listenIP := "0.0.0.0" @@ -536,14 +525,12 @@ func TestTCPProxyUpdateDelete(t *testing.T) { func TestUDPProxyUpdateDelete(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]*api.Endpoints{ - { - ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}}, - }}, - }, + lb.OnEndpointsAdd(&api.Endpoints{ + ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}}, + }}, }) listenIP := "0.0.0.0" @@ -582,7 +569,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, }}, } - lb.OnEndpointsUpdate([]*api.Endpoints{endpoint}) + lb.OnEndpointsAdd(endpoint) listenIP := "0.0.0.0" p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest) @@ -610,7 +597,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { waitForNumProxyLoops(t, p, 0) // need to add endpoint here because it got clean up during service delete - lb.OnEndpointsUpdate([]*api.Endpoints{endpoint}) + lb.OnEndpointsAdd(endpoint) p.OnServiceUpdate([]*api.Service{{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{ @@ -637,7 +624,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}}, }}, } - lb.OnEndpointsUpdate([]*api.Endpoints{endpoint}) + lb.OnEndpointsAdd(endpoint) listenIP := "0.0.0.0" p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest) @@ -665,7 +652,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { waitForNumProxyLoops(t, p, 0) // need to add endpoint here because it got clean up during service delete - lb.OnEndpointsUpdate([]*api.Endpoints{endpoint}) + lb.OnEndpointsAdd(endpoint) p.OnServiceUpdate([]*api.Service{{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{ @@ -685,14 +672,12 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { func TestTCPProxyUpdatePort(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]*api.Endpoints{ - { - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, - }}, - }, + lb.OnEndpointsAdd(&api.Endpoints{ + ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, + }}, }) listenIP := "0.0.0.0" @@ -735,14 +720,12 @@ func TestTCPProxyUpdatePort(t *testing.T) { func TestUDPProxyUpdatePort(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]*api.Endpoints{ - { - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}}, - }}, - }, + lb.OnEndpointsAdd(&api.Endpoints{ + ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}}, + }}, }) listenIP := "0.0.0.0" @@ -782,14 +765,12 @@ func TestUDPProxyUpdatePort(t *testing.T) { func TestProxyUpdatePublicIPs(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]*api.Endpoints{ - { - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, - }}, - }, + lb.OnEndpointsAdd(&api.Endpoints{ + ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, + }}, }) listenIP := "0.0.0.0" @@ -843,7 +824,7 @@ func TestProxyUpdatePortal(t *testing.T) { Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, }}, } - lb.OnEndpointsUpdate([]*api.Endpoints{endpoint}) + lb.OnEndpointsAdd(endpoint) listenIP := "0.0.0.0" p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest) @@ -894,7 +875,7 @@ func TestProxyUpdatePortal(t *testing.T) { Protocol: "TCP", }}}, }}) - lb.OnEndpointsUpdate([]*api.Endpoints{endpoint}) + lb.OnEndpointsAdd(endpoint) svcInfo, exists = p.getServiceInfo(servicePortPortalName) if !exists { t.Fatalf("service with ClusterIP set not found in the proxy") diff --git a/pkg/proxy/winuserspace/roundrobin.go b/pkg/proxy/winuserspace/roundrobin.go index c7daa68c82..ff2026bd8f 100644 --- a/pkg/proxy/winuserspace/roundrobin.go +++ b/pkg/proxy/winuserspace/roundrobin.go @@ -233,65 +233,91 @@ func (lb *LoadBalancerRR) updateAffinityMap(svcPort proxy.ServicePortName, newEn } } -// OnEndpointsUpdate manages the registered service endpoints. -// Registered endpoints are updated if found in the update set or -// unregistered if missing from the update set. -func (lb *LoadBalancerRR) OnEndpointsUpdate(allEndpoints []*api.Endpoints) { - registeredEndpoints := make(map[proxy.ServicePortName]bool) +// buildPortsToEndpointsMap builds a map of portname -> all ip:ports for that +// portname. Explode Endpoints.Subsets[*] into this structure. +func buildPortsToEndpointsMap(endpoints *api.Endpoints) map[string][]hostPortPair { + portsToEndpoints := map[string][]hostPortPair{} + for i := range endpoints.Subsets { + ss := &endpoints.Subsets[i] + for i := range ss.Ports { + port := &ss.Ports[i] + for i := range ss.Addresses { + addr := &ss.Addresses[i] + portsToEndpoints[port.Name] = append(portsToEndpoints[port.Name], hostPortPair{addr.IP, int(port.Port)}) + // Ignore the protocol field - we'll get that from the Service objects. + } + } + } + return portsToEndpoints +} + +func (lb *LoadBalancerRR) OnEndpointsAdd(endpoints *api.Endpoints) { + portsToEndpoints := buildPortsToEndpointsMap(endpoints) + lb.lock.Lock() defer lb.lock.Unlock() - // Update endpoints for services. - for i := range allEndpoints { - // svcEndpoints should NOT be modified. - svcEndpoints := allEndpoints[i] + for portname := range portsToEndpoints { + svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname} + newEndpoints := flattenValidEndpoints(portsToEndpoints[portname]) + state, exists := lb.services[svcPort] - // We need to build a map of portname -> all ip:ports for that - // portname. Explode Endpoints.Subsets[*] into this structure. - portsToEndpoints := map[string][]hostPortPair{} - for i := range svcEndpoints.Subsets { - ss := &svcEndpoints.Subsets[i] - for i := range ss.Ports { - port := &ss.Ports[i] - for i := range ss.Addresses { - addr := &ss.Addresses[i] - portsToEndpoints[port.Name] = append(portsToEndpoints[port.Name], hostPortPair{addr.IP, int(port.Port)}) - // Ignore the protocol field - we'll get that from the Service objects. - } - } - } + if !exists || state == nil || len(newEndpoints) > 0 { + glog.V(1).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcPort, newEndpoints) + lb.updateAffinityMap(svcPort, newEndpoints) + // OnEndpointsAdd can be called without NewService being called externally. + // To be safe we will call it here. A new service will only be created + // if one does not already exist. The affinity will be updated + // later, once NewService is called. + state = lb.newServiceInternal(svcPort, api.ServiceAffinity(""), 0) + state.endpoints = slice.ShuffleStrings(newEndpoints) - for portname := range portsToEndpoints { - svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: svcEndpoints.Namespace, Name: svcEndpoints.Name}, Port: portname} - state, exists := lb.services[svcPort] - curEndpoints := []string{} - if state != nil { - curEndpoints = state.endpoints - } - newEndpoints := flattenValidEndpoints(portsToEndpoints[portname]) - - if !exists || state == nil || len(curEndpoints) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpoints), newEndpoints) { - glog.V(1).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcPort, newEndpoints) - lb.updateAffinityMap(svcPort, newEndpoints) - // OnEndpointsUpdate can be called without NewService being called externally. - // To be safe we will call it here. A new service will only be created - // if one does not already exist. The affinity will be updated - // later, once NewService is called. - state = lb.newServiceInternal(svcPort, api.ServiceAffinity(""), 0) - state.endpoints = slice.ShuffleStrings(newEndpoints) - - // Reset the round-robin index. - state.index = 0 - } - registeredEndpoints[svcPort] = true + // Reset the round-robin index. + state.index = 0 } } - // Remove endpoints missing from the update. - for k := range lb.services { - if _, exists := registeredEndpoints[k]; !exists { - glog.V(2).Infof("LoadBalancerRR: Removing endpoints for %s", k) +} + +func (lb *LoadBalancerRR) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) { + portsToEndpoints := buildPortsToEndpointsMap(endpoints) + oldPortsToEndpoints := buildPortsToEndpointsMap(oldEndpoints) + registeredEndpoints := make(map[proxy.ServicePortName]bool) + + lb.lock.Lock() + defer lb.lock.Unlock() + + for portname := range portsToEndpoints { + svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname} + newEndpoints := flattenValidEndpoints(portsToEndpoints[portname]) + state, exists := lb.services[svcPort] + + curEndpoints := []string{} + if state != nil { + curEndpoints = state.endpoints + } + + if !exists || state == nil || len(curEndpoints) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpoints), newEndpoints) { + glog.V(1).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcPort, newEndpoints) + lb.updateAffinityMap(svcPort, newEndpoints) + // OnEndpointsUpdate can be called without NewService being called externally. + // To be safe we will call it here. A new service will only be created + // if one does not already exist. The affinity will be updated + // later, once NewService is called. + state = lb.newServiceInternal(svcPort, api.ServiceAffinity(""), 0) + state.endpoints = slice.ShuffleStrings(newEndpoints) + + // Reset the round-robin index. + state.index = 0 + } + registeredEndpoints[svcPort] = true + } + + for portname := range oldPortsToEndpoints { + svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname} + if _, exists := registeredEndpoints[svcPort]; !exists { + glog.V(2).Infof("LoadBalancerRR: Removing endpoints for %s", svcPort) // Reset but don't delete. - state := lb.services[k] + state := lb.services[svcPort] state.endpoints = []string{} state.index = 0 state.affinity.affinityMap = map[string]*affinityState{} @@ -299,6 +325,27 @@ func (lb *LoadBalancerRR) OnEndpointsUpdate(allEndpoints []*api.Endpoints) { } } +func (lb *LoadBalancerRR) OnEndpointsDelete(endpoints *api.Endpoints) { + portsToEndpoints := buildPortsToEndpointsMap(endpoints) + + lb.lock.Lock() + defer lb.lock.Unlock() + + for portname := range portsToEndpoints { + svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname} + glog.V(2).Infof("LoadBalancerRR: Removing endpoints for %s", svcPort) + // If the service is still around, reset but don't delete. + if state, ok := lb.services[svcPort]; ok { + state.endpoints = []string{} + state.index = 0 + state.affinity.affinityMap = map[string]*affinityState{} + } + } +} + +func (lb *LoadBalancerRR) OnEndpointsSynced() { +} + // Tests whether two slices are equivalent. This sorts both slices in-place. func slicesEquiv(lhs, rhs []string) bool { if len(lhs) != len(rhs) { diff --git a/pkg/proxy/winuserspace/roundrobin_test.go b/pkg/proxy/winuserspace/roundrobin_test.go index 0e4356050a..b334a3ccc2 100644 --- a/pkg/proxy/winuserspace/roundrobin_test.go +++ b/pkg/proxy/winuserspace/roundrobin_test.go @@ -67,8 +67,6 @@ func TestFilterWorks(t *testing.T) { func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) { loadBalancer := NewLoadBalancerRR() - var endpoints []*api.Endpoints - loadBalancer.OnEndpointsUpdate(endpoints) service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "does-not-exist"} endpoint, err := loadBalancer.NextEndpoint(service, nil, false) if err == nil { @@ -106,15 +104,14 @@ func TestLoadBalanceWorksWithSingleEndpoint(t *testing.T) { if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - endpoints := make([]*api.Endpoints, 1) - endpoints[0] = &api.Endpoints{ + endpoints := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "endpoint1"}}, Ports: []api.EndpointPort{{Name: "p", Port: 40}}, }}, } - loadBalancer.OnEndpointsUpdate(endpoints) + loadBalancer.OnEndpointsAdd(endpoints) expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil) expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil) expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil) @@ -144,15 +141,14 @@ func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) { if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - endpoints := make([]*api.Endpoints, 1) - endpoints[0] = &api.Endpoints{ + endpoints := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "endpoint"}}, Ports: []api.EndpointPort{{Name: "p", Port: 1}, {Name: "p", Port: 2}, {Name: "p", Port: 3}}, }}, } - loadBalancer.OnEndpointsUpdate(endpoints) + loadBalancer.OnEndpointsAdd(endpoints) shuffledEndpoints := loadBalancer.services[service].endpoints if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") { @@ -172,8 +168,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsMultiplePorts(t *testing.T) { if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - endpoints := make([]*api.Endpoints, 1) - endpoints[0] = &api.Endpoints{ + endpoints := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: []api.EndpointSubset{ { @@ -186,7 +181,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsMultiplePorts(t *testing.T) { }, }, } - loadBalancer.OnEndpointsUpdate(endpoints) + loadBalancer.OnEndpointsAdd(endpoints) shuffledEndpoints := loadBalancer.services[serviceP].endpoints if !stringsInSlice(shuffledEndpoints, "endpoint1:1", "endpoint2:1", "endpoint3:3") { @@ -215,8 +210,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - endpoints := make([]*api.Endpoints, 1) - endpoints[0] = &api.Endpoints{ + endpointsv1 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: []api.EndpointSubset{ { @@ -233,7 +227,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { }, }, } - loadBalancer.OnEndpointsUpdate(endpoints) + loadBalancer.OnEndpointsAdd(endpointsv1) shuffledEndpoints := loadBalancer.services[serviceP].endpoints if !stringsInSlice(shuffledEndpoints, "endpoint1:1", "endpoint2:2", "endpoint3:3") { @@ -255,7 +249,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { // Then update the configuration with one fewer endpoints, make sure // we start in the beginning again - endpoints[0] = &api.Endpoints{ + endpointsv2 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: []api.EndpointSubset{ { @@ -268,7 +262,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { }, }, } - loadBalancer.OnEndpointsUpdate(endpoints) + loadBalancer.OnEndpointsUpdate(endpointsv1, endpointsv2) shuffledEndpoints = loadBalancer.services[serviceP].endpoints if !stringsInSlice(shuffledEndpoints, "endpoint4:4", "endpoint5:5") { @@ -289,8 +283,8 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[1], nil) // Clear endpoints - endpoints[0] = &api.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: nil} - loadBalancer.OnEndpointsUpdate(endpoints) + endpointsv3 := &api.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: nil} + loadBalancer.OnEndpointsUpdate(endpointsv2, endpointsv3) endpoint, err = loadBalancer.NextEndpoint(serviceP, nil, false) if err == nil || len(endpoint) != 0 { @@ -306,8 +300,7 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) { if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - endpoints := make([]*api.Endpoints, 2) - endpoints[0] = &api.Endpoints{ + endpoints1 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: fooServiceP.Name, Namespace: fooServiceP.Namespace}, Subsets: []api.EndpointSubset{ { @@ -316,7 +309,7 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) { }, }, } - endpoints[1] = &api.Endpoints{ + endpoints2 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: barServiceP.Name, Namespace: barServiceP.Namespace}, Subsets: []api.EndpointSubset{ { @@ -325,7 +318,8 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) { }, }, } - loadBalancer.OnEndpointsUpdate(endpoints) + loadBalancer.OnEndpointsAdd(endpoints1) + loadBalancer.OnEndpointsAdd(endpoints2) shuffledFooEndpoints := loadBalancer.services[fooServiceP].endpoints expectEndpoint(t, loadBalancer, fooServiceP, shuffledFooEndpoints[0], nil) expectEndpoint(t, loadBalancer, fooServiceP, shuffledFooEndpoints[1], nil) @@ -341,7 +335,7 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) { expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[1], nil) // Then update the configuration by removing foo - loadBalancer.OnEndpointsUpdate(endpoints[1:]) + loadBalancer.OnEndpointsDelete(endpoints1) endpoint, err = loadBalancer.NextEndpoint(fooServiceP, nil, false) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") @@ -364,8 +358,7 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) { // Call NewService() before OnEndpointsUpdate() loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0) - endpoints := make([]*api.Endpoints, 1) - endpoints[0] = &api.Endpoints{ + endpoints := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ {Addresses: []api.EndpointAddress{{IP: "endpoint1"}}, Ports: []api.EndpointPort{{Port: 1}}}, @@ -373,7 +366,7 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) { {Addresses: []api.EndpointAddress{{IP: "endpoint3"}}, Ports: []api.EndpointPort{{Port: 3}}}, }, } - loadBalancer.OnEndpointsUpdate(endpoints) + loadBalancer.OnEndpointsAdd(endpoints) client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} @@ -420,15 +413,14 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledSecond(t *testing.T) { } // Call OnEndpointsUpdate() before NewService() - endpoints := make([]*api.Endpoints, 1) - endpoints[0] = &api.Endpoints{ + endpoints := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ {Addresses: []api.EndpointAddress{{IP: "endpoint1"}}, Ports: []api.EndpointPort{{Port: 1}}}, {Addresses: []api.EndpointAddress{{IP: "endpoint2"}}, Ports: []api.EndpointPort{{Port: 2}}}, }, } - loadBalancer.OnEndpointsUpdate(endpoints) + loadBalancer.OnEndpointsAdd(endpoints) loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0) client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} @@ -482,8 +474,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { } loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0) - endpoints := make([]*api.Endpoints, 1) - endpoints[0] = &api.Endpoints{ + endpointsv1 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ { @@ -492,7 +483,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { }, }, } - loadBalancer.OnEndpointsUpdate(endpoints) + loadBalancer.OnEndpointsAdd(endpointsv1) shuffledEndpoints := loadBalancer.services[service].endpoints expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1) client1Endpoint := shuffledEndpoints[0] @@ -503,7 +494,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3) client3Endpoint := shuffledEndpoints[2] - endpoints[0] = &api.Endpoints{ + endpointsv2 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ { @@ -512,7 +503,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { }, }, } - loadBalancer.OnEndpointsUpdate(endpoints) + loadBalancer.OnEndpointsUpdate(endpointsv1, endpointsv2) shuffledEndpoints = loadBalancer.services[service].endpoints if client1Endpoint == "endpoint:3" { client1Endpoint = shuffledEndpoints[0] @@ -525,7 +516,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { expectEndpoint(t, loadBalancer, service, client2Endpoint, client2) expectEndpoint(t, loadBalancer, service, client3Endpoint, client3) - endpoints[0] = &api.Endpoints{ + endpointsv3 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ { @@ -534,7 +525,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { }, }, } - loadBalancer.OnEndpointsUpdate(endpoints) + loadBalancer.OnEndpointsUpdate(endpointsv2, endpointsv3) shuffledEndpoints = loadBalancer.services[service].endpoints expectEndpoint(t, loadBalancer, service, client1Endpoint, client1) expectEndpoint(t, loadBalancer, service, client2Endpoint, client2) @@ -556,8 +547,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { } loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0) - endpoints := make([]*api.Endpoints, 1) - endpoints[0] = &api.Endpoints{ + endpointsv1 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ { @@ -566,7 +556,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { }, }, } - loadBalancer.OnEndpointsUpdate(endpoints) + loadBalancer.OnEndpointsAdd(endpointsv1) shuffledEndpoints := loadBalancer.services[service].endpoints expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1) @@ -577,7 +567,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2) // Then update the configuration with one fewer endpoints, make sure // we start in the beginning again - endpoints[0] = &api.Endpoints{ + endpointsv2 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ { @@ -586,7 +576,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { }, }, } - loadBalancer.OnEndpointsUpdate(endpoints) + loadBalancer.OnEndpointsUpdate(endpointsv1, endpointsv2) shuffledEndpoints = loadBalancer.services[service].endpoints expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2) @@ -596,8 +586,8 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2) // Clear endpoints - endpoints[0] = &api.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: nil} - loadBalancer.OnEndpointsUpdate(endpoints) + endpointsv3 := &api.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: nil} + loadBalancer.OnEndpointsUpdate(endpointsv2, endpointsv3) endpoint, err = loadBalancer.NextEndpoint(service, nil, false) if err == nil || len(endpoint) != 0 { @@ -616,8 +606,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) { t.Errorf("Didn't fail with non-existent service") } loadBalancer.NewService(fooService, api.ServiceAffinityClientIP, 0) - endpoints := make([]*api.Endpoints, 2) - endpoints[0] = &api.Endpoints{ + endpoints1 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: fooService.Name, Namespace: fooService.Namespace}, Subsets: []api.EndpointSubset{ { @@ -628,7 +617,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) { } barService := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "bar"}, Port: ""} loadBalancer.NewService(barService, api.ServiceAffinityClientIP, 0) - endpoints[1] = &api.Endpoints{ + endpoints2 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: barService.Name, Namespace: barService.Namespace}, Subsets: []api.EndpointSubset{ { @@ -637,7 +626,8 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) { }, }, } - loadBalancer.OnEndpointsUpdate(endpoints) + loadBalancer.OnEndpointsAdd(endpoints1) + loadBalancer.OnEndpointsAdd(endpoints2) shuffledFooEndpoints := loadBalancer.services[fooService].endpoints expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1) @@ -659,7 +649,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) { expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], client2) // Then update the configuration by removing foo - loadBalancer.OnEndpointsUpdate(endpoints[1:]) + loadBalancer.OnEndpointsDelete(endpoints1) endpoint, err = loadBalancer.NextEndpoint(fooService, nil, false) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") @@ -685,8 +675,7 @@ func TestStickyLoadBalanceWorksWithEndpointFails(t *testing.T) { // Call NewService() before OnEndpointsUpdate() loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0) - endpoints := make([]*api.Endpoints, 1) - endpoints[0] = &api.Endpoints{ + endpoints := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ {Addresses: []api.EndpointAddress{{IP: "endpoint1"}}, Ports: []api.EndpointPort{{Port: 1}}}, @@ -694,7 +683,7 @@ func TestStickyLoadBalanceWorksWithEndpointFails(t *testing.T) { {Addresses: []api.EndpointAddress{{IP: "endpoint3"}}, Ports: []api.EndpointPort{{Port: 3}}}, }, } - loadBalancer.OnEndpointsUpdate(endpoints) + loadBalancer.OnEndpointsAdd(endpoints) client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}