proxy/userspace: respect minSyncInterval and simplify locking

The userspace proxy does not have any ratelimiting and when many
services are used will hammer iptables every time a service or
endpoint change occurs. Instead build up a map of changed
services and process all those changes at once instead of each
time an event comes in. This also ensures that no long-running
processing happens in the same call chain as the OnService*
calls as this blocks other handlers attached to the proxy's
parent ServiceConfig object for long periods of time.

Locking can also now be simplified as the only accesses to the
proxy's serviceMap happen from syncProxyRules(). So instead of
locking in many functions just lock once in syncProxyRules()
like the other proxies do.

https://bugzilla.redhat.com/show_bug.cgi?id=1590589
https://bugzilla.redhat.com/show_bug.cgi?id=1689690
k3s-v1.15.3
Dan Williams 2019-03-29 09:25:21 -05:00
parent cf7225f561
commit 8cf0076e23
3 changed files with 357 additions and 93 deletions

View File

@ -23,6 +23,7 @@ go_library(
"//pkg/proxy:go_default_library",
"//pkg/proxy/config:go_default_library",
"//pkg/proxy/util:go_default_library",
"//pkg/util/async:go_default_library",
"//pkg/util/conntrack:go_default_library",
"//pkg/util/iptables:go_default_library",
"//pkg/util/slice:go_default_library",
@ -86,6 +87,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/utils/exec:go_default_library",
"//vendor/k8s.io/utils/exec/testing:go_default_library",
],

View File

@ -19,6 +19,7 @@ package userspace
import (
"fmt"
"net"
"reflect"
"strconv"
"strings"
"sync"
@ -35,6 +36,7 @@ import (
"k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/proxy"
utilproxy "k8s.io/kubernetes/pkg/proxy/util"
"k8s.io/kubernetes/pkg/util/async"
"k8s.io/kubernetes/pkg/util/conntrack"
"k8s.io/kubernetes/pkg/util/iptables"
utilexec "k8s.io/utils/exec"
@ -91,6 +93,19 @@ func logTimeout(err error) bool {
// ProxySocketFunc is a function which constructs a ProxySocket from a protocol, ip, and port
type ProxySocketFunc func(protocol v1.Protocol, ip net.IP, port int) (ProxySocket, error)
const numBurstSyncs int = 2
type serviceChange struct {
current *v1.Service
previous *v1.Service
}
// Interface for async runner; abstracted for testing
type asyncRunnerInterface interface {
Run()
Loop(<-chan struct{})
}
// Proxier is a simple proxy for TCP connections between a localhost:lport
// and services that provide the actual implementations.
type Proxier struct {
@ -98,7 +113,7 @@ type Proxier struct {
mu sync.Mutex // protects serviceMap
serviceMap map[proxy.ServicePortName]*ServiceInfo
syncPeriod time.Duration
minSyncPeriod time.Duration // unused atm, but plumbed through
minSyncPeriod time.Duration
udpIdleTimeout time.Duration
portMapMutex sync.Mutex
portMap map[portMapKey]*portMapValue
@ -115,6 +130,10 @@ type Proxier struct {
endpointsSynced int32
servicesSynced int32
initialized int32
// protects serviceChanges
serviceChangesLock sync.Mutex
serviceChanges map[types.NamespacedName]*serviceChange // map of service changes
syncRunner asyncRunnerInterface // governs calls to syncProxyRules
stopChan chan struct{}
}
@ -210,12 +229,12 @@ func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables
if err := iptablesFlush(iptables); err != nil {
return nil, fmt.Errorf("failed to flush iptables: %v", err)
}
return &Proxier{
proxier := &Proxier{
loadBalancer: loadBalancer,
serviceMap: make(map[proxy.ServicePortName]*ServiceInfo),
serviceChanges: make(map[types.NamespacedName]*serviceChange),
portMap: make(map[portMapKey]*portMapValue),
syncPeriod: syncPeriod,
// plumbed through if needed, not used atm.
minSyncPeriod: minSyncPeriod,
udpIdleTimeout: udpIdleTimeout,
listenIP: listenIP,
@ -225,7 +244,10 @@ func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables
makeProxySocket: makeProxySocket,
exec: exec,
stopChan: make(chan struct{}),
}, nil
}
klog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, numBurstSyncs)
proxier.syncRunner = async.NewBoundedFrequencyRunner("userspace-proxy-sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, numBurstSyncs)
return proxier, nil
}
// CleanupLeftovers removes all iptables rules and chains created by the Proxier
@ -299,14 +321,13 @@ func CleanupLeftovers(ipt iptables.Interface) (encounteredError bool) {
// shutdown closes all service port proxies and returns from the proxy's
// sync loop. Used from testcases.
func (proxier *Proxier) shutdown() {
defer proxier.cleanupStaleStickySessions()
proxier.mu.Lock()
defer proxier.mu.Unlock()
for serviceName, info := range proxier.serviceMap {
proxier.stopProxyInternal(serviceName, info)
proxier.stopProxy(serviceName, info)
}
proxier.cleanupStaleStickySessions()
close(proxier.stopChan)
}
@ -314,34 +335,52 @@ func (proxier *Proxier) isInitialized() bool {
return atomic.LoadInt32(&proxier.initialized) > 0
}
// Sync is called to immediately synchronize the proxier state to iptables
// Sync is called to synchronize the proxier state to iptables as soon as possible.
func (proxier *Proxier) Sync() {
proxier.syncRunner.Run()
}
func (proxier *Proxier) syncProxyRules() {
start := time.Now()
defer func() {
klog.V(2).Infof("userspace syncProxyRules took %v", time.Since(start))
}()
// don't sync rules till we've received services and endpoints
if !proxier.isInitialized() {
klog.V(2).Info("Not syncing userspace proxy until Services and Endpoints have been received from master")
return
}
if err := iptablesInit(proxier.iptables); err != nil {
klog.Errorf("Failed to ensure iptables: %v", err)
}
proxier.serviceChangesLock.Lock()
changes := proxier.serviceChanges
proxier.serviceChanges = make(map[types.NamespacedName]*serviceChange)
proxier.serviceChangesLock.Unlock()
proxier.mu.Lock()
defer proxier.mu.Unlock()
klog.V(2).Infof("userspace proxy: processing %d service events", len(changes))
for _, change := range changes {
existingPorts := proxier.mergeService(change.current)
proxier.unmergeService(change.previous, existingPorts)
}
proxier.ensurePortals()
proxier.cleanupStaleStickySessions()
}
// SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return.
func (proxier *Proxier) SyncLoop() {
t := time.NewTicker(proxier.syncPeriod)
defer t.Stop()
for {
select {
case <-t.C:
klog.V(6).Infof("Periodic sync")
proxier.Sync()
case <-proxier.stopChan:
return
}
}
proxier.syncRunner.Loop(proxier.stopChan)
}
// Ensure that portals exist for all services.
func (proxier *Proxier) ensurePortals() {
proxier.mu.Lock()
defer proxier.mu.Unlock()
// NB: This does not remove rules that should not be present.
for name, info := range proxier.serviceMap {
err := proxier.openPortal(name, info)
@ -353,22 +392,12 @@ func (proxier *Proxier) ensurePortals() {
// clean up any stale sticky session records in the hash map.
func (proxier *Proxier) cleanupStaleStickySessions() {
proxier.mu.Lock()
defer proxier.mu.Unlock()
for name := range proxier.serviceMap {
proxier.loadBalancer.CleanupStaleStickySessions(name)
}
}
// This assumes proxier.mu is not locked.
func (proxier *Proxier) stopProxy(service proxy.ServicePortName, info *ServiceInfo) error {
proxier.mu.Lock()
defer proxier.mu.Unlock()
return proxier.stopProxyInternal(service, info)
}
// This assumes proxier.mu is locked.
func (proxier *Proxier) stopProxyInternal(service proxy.ServicePortName, info *ServiceInfo) error {
delete(proxier.serviceMap, service)
info.setAlive(false)
err := info.socket.Close()
@ -384,16 +413,18 @@ func (proxier *Proxier) getServiceInfo(service proxy.ServicePortName) (*ServiceI
return info, ok
}
func (proxier *Proxier) setServiceInfo(service proxy.ServicePortName, info *ServiceInfo) {
// addServiceOnPort lockes the proxy before calling addServiceOnPortInternal.
// Used from testcases.
func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol v1.Protocol, proxyPort int, timeout time.Duration) (*ServiceInfo, error) {
proxier.mu.Lock()
defer proxier.mu.Unlock()
proxier.serviceMap[service] = info
return proxier.addServiceOnPortInternal(service, protocol, proxyPort, timeout)
}
// addServiceOnPort starts listening for a new service, returning the ServiceInfo.
// addServiceOnPortInternal starts listening for a new service, returning the ServiceInfo.
// Pass proxyPort=0 to allocate a random port. The timeout only applies to UDP
// connections, for now.
func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol v1.Protocol, proxyPort int, timeout time.Duration) (*ServiceInfo, error) {
func (proxier *Proxier) addServiceOnPortInternal(service proxy.ServicePortName, protocol v1.Protocol, proxyPort int, timeout time.Duration) (*ServiceInfo, error) {
sock, err := proxier.makeProxySocket(protocol, proxier.listenIP, proxyPort)
if err != nil {
return nil, err
@ -417,7 +448,7 @@ func (proxier *Proxier) addServiceOnPort(service proxy.ServicePortName, protocol
socket: sock,
sessionAffinityType: v1.ServiceAffinityNone, // default
}
proxier.setServiceInfo(service, si)
proxier.serviceMap[service] = si
klog.V(2).Infof("Proxying for service %q on %s port %d", service, protocol, portNum)
go func(service proxy.ServicePortName, proxier *Proxier) {
@ -444,7 +475,7 @@ func (proxier *Proxier) mergeService(service *v1.Service) sets.String {
servicePort := &service.Spec.Ports[i]
serviceName := proxy.ServicePortName{NamespacedName: svcName, Port: servicePort.Name}
existingPorts.Insert(servicePort.Name)
info, exists := proxier.getServiceInfo(serviceName)
info, exists := proxier.serviceMap[serviceName]
// TODO: check health of the socket? What if ProxyLoop exited?
if exists && sameConfig(info, service, servicePort) {
// Nothing changed.
@ -467,7 +498,7 @@ func (proxier *Proxier) mergeService(service *v1.Service) sets.String {
serviceIP := net.ParseIP(service.Spec.ClusterIP)
klog.V(1).Infof("Adding new service %q at %s/%s", serviceName, net.JoinHostPort(serviceIP.String(), strconv.Itoa(int(servicePort.Port))), servicePort.Protocol)
info, err = proxier.addServiceOnPort(serviceName, servicePort.Protocol, proxyPort, proxier.udpIdleTimeout)
info, err = proxier.addServiceOnPortInternal(serviceName, servicePort.Protocol, proxyPort, proxier.udpIdleTimeout)
if err != nil {
klog.Errorf("Failed to start proxy for %q: %v", serviceName, err)
continue
@ -504,10 +535,7 @@ func (proxier *Proxier) unmergeService(service *v1.Service, existingPorts sets.S
klog.V(3).Infof("Skipping service %s due to clusterIP = %q", svcName, service.Spec.ClusterIP)
return
}
staleUDPServices := sets.NewString()
proxier.mu.Lock()
defer proxier.mu.Unlock()
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]
if existingPorts.Has(servicePort.Name) {
@ -529,7 +557,7 @@ func (proxier *Proxier) unmergeService(service *v1.Service, existingPorts sets.S
if err := proxier.closePortal(serviceName, info); err != nil {
klog.Errorf("Failed to close portal for %q: %v", serviceName, err)
}
if err := proxier.stopProxyInternal(serviceName, info); err != nil {
if err := proxier.stopProxy(serviceName, info); err != nil {
klog.Errorf("Failed to stop service %q: %v", serviceName, err)
}
proxier.loadBalancer.DeleteService(serviceName)
@ -541,17 +569,50 @@ func (proxier *Proxier) unmergeService(service *v1.Service, existingPorts sets.S
}
}
func (proxier *Proxier) serviceChange(previous, current *v1.Service, detail string) {
var svcName types.NamespacedName
if current != nil {
svcName = types.NamespacedName{Namespace: current.Namespace, Name: current.Name}
} else {
svcName = types.NamespacedName{Namespace: previous.Namespace, Name: previous.Name}
}
klog.V(4).Infof("userspace proxy: %s for %s", detail, svcName)
proxier.serviceChangesLock.Lock()
defer proxier.serviceChangesLock.Unlock()
change, exists := proxier.serviceChanges[svcName]
if !exists {
// change.previous is only set for new changes. We must keep
// the oldest service info (or nil) because correct unmerging
// depends on the next update/del after a merge, not subsequent
// updates.
change = &serviceChange{previous: previous}
proxier.serviceChanges[svcName] = change
}
// Always use the most current service (or nil) as change.current
change.current = current
if reflect.DeepEqual(change.previous, change.current) {
// collapsed change had no effect
delete(proxier.serviceChanges, svcName)
} else if proxier.isInitialized() {
// change will have an effect, ask the proxy to sync
proxier.syncRunner.Run()
}
}
func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
_ = proxier.mergeService(service)
proxier.serviceChange(nil, service, "OnServiceAdd")
}
func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
existingPorts := proxier.mergeService(service)
proxier.unmergeService(oldService, existingPorts)
proxier.serviceChange(oldService, service, "OnServiceUpdate")
}
func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
proxier.unmergeService(service, sets.NewString())
proxier.serviceChange(service, nil, "OnServiceDelete")
}
func (proxier *Proxier) OnServiceSynced() {
@ -563,6 +624,11 @@ func (proxier *Proxier) OnServiceSynced() {
if atomic.LoadInt32(&proxier.endpointsSynced) > 0 {
atomic.StoreInt32(&proxier.initialized, 1)
}
// Must sync from a goroutine to avoid blocking the
// service event handler on startup with large numbers
// of initial objects
go proxier.syncProxyRules()
}
func (proxier *Proxier) OnEndpointsAdd(endpoints *v1.Endpoints) {
@ -587,6 +653,11 @@ func (proxier *Proxier) OnEndpointsSynced() {
if atomic.LoadInt32(&proxier.servicesSynced) > 0 {
atomic.StoreInt32(&proxier.initialized, 1)
}
// Must sync from a goroutine to avoid blocking the
// service event handler on startup with large numbers
// of initial objects
go proxier.syncProxyRules()
}
func sameConfig(info *ServiceInfo, service *v1.Service, port *v1.ServicePort) bool {

View File

@ -24,6 +24,7 @@ import (
"net/http/httptest"
"net/url"
"os"
"reflect"
"strconv"
"sync/atomic"
"testing"
@ -33,6 +34,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/proxy"
ipttest "k8s.io/kubernetes/pkg/util/iptables/testing"
"k8s.io/utils/exec"
@ -86,6 +88,16 @@ func waitForClosedPortUDP(p *Proxier, proxyPort int) error {
return fmt.Errorf("port %d still open", proxyPort)
}
func waitForServiceInfo(p *Proxier, service proxy.ServicePortName) (*ServiceInfo, bool) {
var svcInfo *ServiceInfo
var exists bool
wait.PollImmediate(50*time.Millisecond, 3*time.Second, func() (bool, error) {
svcInfo, exists = p.getServiceInfo(service)
return exists, nil
})
return svcInfo, exists
}
// udpEchoServer is a simple echo server in UDP, intended for testing the proxy.
type udpEchoServer struct {
net.PacketConn
@ -225,6 +237,15 @@ func waitForNumProxyClients(t *testing.T, s *ServiceInfo, want int, timeout time
t.Errorf("expected %d ProxyClients live, got %d", want, got)
}
func startProxier(p *Proxier, t *testing.T) {
go func() {
p.SyncLoop()
}()
waitForNumProxyLoops(t, p, 0)
p.OnServiceSynced()
p.OnEndpointsSynced()
}
func TestTCPProxy(t *testing.T) {
lb := NewLoadBalancerRR()
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
@ -242,7 +263,7 @@ func TestTCPProxy(t *testing.T) {
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
startProxier(p, t)
defer p.shutdown()
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
@ -270,7 +291,7 @@ func TestUDPProxy(t *testing.T) {
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
startProxier(p, t)
defer p.shutdown()
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
@ -298,7 +319,7 @@ func TestUDPProxyTimeout(t *testing.T) {
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
startProxier(p, t)
defer p.shutdown()
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
@ -338,7 +359,7 @@ func TestMultiPortProxy(t *testing.T) {
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
startProxier(p, t)
defer p.shutdown()
svcInfoP, err := p.addServiceOnPort(serviceP, "TCP", 0, time.Second)
@ -368,7 +389,7 @@ func TestMultiPortOnServiceAdd(t *testing.T) {
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
startProxier(p, t)
defer p.shutdown()
p.OnServiceAdd(&v1.Service{
@ -384,7 +405,7 @@ func TestMultiPortOnServiceAdd(t *testing.T) {
}}},
})
waitForNumProxyLoops(t, p, 2)
svcInfo, exists := p.getServiceInfo(serviceP)
svcInfo, exists := waitForServiceInfo(p, serviceP)
if !exists {
t.Fatalf("can't find serviceInfo for %s", serviceP)
}
@ -392,7 +413,7 @@ func TestMultiPortOnServiceAdd(t *testing.T) {
t.Errorf("unexpected serviceInfo for %s: %#v", serviceP, svcInfo)
}
svcInfo, exists = p.getServiceInfo(serviceQ)
svcInfo, exists = waitForServiceInfo(p, serviceQ)
if !exists {
t.Fatalf("can't find serviceInfo for %s", serviceQ)
}
@ -408,7 +429,9 @@ func TestMultiPortOnServiceAdd(t *testing.T) {
// Helper: Stops the proxy for the named service.
func stopProxyByName(proxier *Proxier, service proxy.ServicePortName) error {
info, found := proxier.getServiceInfo(service)
proxier.mu.Lock()
defer proxier.mu.Unlock()
info, found := proxier.serviceMap[service]
if !found {
return fmt.Errorf("unknown service: %s", service)
}
@ -432,7 +455,7 @@ func TestTCPProxyStop(t *testing.T) {
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
startProxier(p, t)
defer p.shutdown()
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
@ -477,7 +500,7 @@ func TestUDPProxyStop(t *testing.T) {
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
startProxier(p, t)
defer p.shutdown()
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
@ -501,9 +524,9 @@ func TestUDPProxyStop(t *testing.T) {
func TestTCPProxyUpdateDelete(t *testing.T) {
lb := NewLoadBalancerRR()
service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
servicePortName := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"}
lb.OnEndpointsAdd(&v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name},
ObjectMeta: metav1.ObjectMeta{Namespace: servicePortName.Namespace, Name: servicePortName.Name},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "127.0.0.1"}},
Ports: []v1.EndpointPort{{Name: "p", Port: tcpServerPort}},
@ -516,29 +539,22 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
startProxier(p, t)
defer p.shutdown()
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
if err != nil {
t.Fatalf("error adding new service: %#v", err)
}
conn, err := net.Dial("tcp", joinHostPort("", svcInfo.proxyPort))
if err != nil {
t.Fatalf("error connecting to proxy: %v", err)
}
conn.Close()
waitForNumProxyLoops(t, p, 1)
p.OnServiceDelete(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace},
service := &v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: servicePortName.Name, Namespace: servicePortName.Namespace},
Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{
Name: "p",
Port: int32(svcInfo.proxyPort),
Port: 9997,
Protocol: "TCP",
}}},
})
if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
}
p.OnServiceAdd(service)
waitForNumProxyLoops(t, p, 1)
p.OnServiceDelete(service)
if err := waitForClosedPortTCP(p, int(service.Spec.Ports[0].Port)); err != nil {
t.Fatalf(err.Error())
}
waitForNumProxyLoops(t, p, 0)
@ -561,7 +577,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
startProxier(p, t)
defer p.shutdown()
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
@ -607,7 +623,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
startProxier(p, t)
defer p.shutdown()
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
@ -644,7 +660,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
Protocol: "TCP",
}}},
})
svcInfo, exists := p.getServiceInfo(service)
svcInfo, exists := waitForServiceInfo(p, service)
if !exists {
t.Fatalf("can't find serviceInfo for %s", service)
}
@ -670,7 +686,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
startProxier(p, t)
defer p.shutdown()
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
@ -707,7 +723,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
Protocol: "UDP",
}}},
})
svcInfo, exists := p.getServiceInfo(service)
svcInfo, exists := waitForServiceInfo(p, service)
if !exists {
t.Fatalf("can't find serviceInfo")
}
@ -732,7 +748,7 @@ func TestTCPProxyUpdatePort(t *testing.T) {
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
startProxier(p, t)
defer p.shutdown()
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
@ -754,7 +770,7 @@ func TestTCPProxyUpdatePort(t *testing.T) {
if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error())
}
svcInfo, exists := p.getServiceInfo(service)
svcInfo, exists := waitForServiceInfo(p, service)
if !exists {
t.Fatalf("can't find serviceInfo")
}
@ -781,7 +797,7 @@ func TestUDPProxyUpdatePort(t *testing.T) {
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
startProxier(p, t)
defer p.shutdown()
svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second)
@ -802,7 +818,7 @@ func TestUDPProxyUpdatePort(t *testing.T) {
if err := waitForClosedPortUDP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error())
}
svcInfo, exists := p.getServiceInfo(service)
svcInfo, exists := waitForServiceInfo(p, service)
if !exists {
t.Fatalf("can't find serviceInfo")
}
@ -827,7 +843,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) {
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
startProxier(p, t)
defer p.shutdown()
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
@ -853,7 +869,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) {
if err := waitForClosedPortTCP(p, svcInfo.proxyPort); err != nil {
t.Fatalf(err.Error())
}
svcInfo, exists := p.getServiceInfo(service)
svcInfo, exists := waitForServiceInfo(p, service)
if !exists {
t.Fatalf("can't find serviceInfo")
}
@ -881,7 +897,7 @@ func TestProxyUpdatePortal(t *testing.T) {
if err != nil {
t.Fatal(err)
}
waitForNumProxyLoops(t, p, 0)
startProxier(p, t)
defer p.shutdown()
svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second)
@ -909,7 +925,16 @@ func TestProxyUpdatePortal(t *testing.T) {
}}},
}
p.OnServiceUpdate(svcv0, svcv1)
_, exists := p.getServiceInfo(service)
// Wait for the service to be removed because it had an empty ClusterIP
var exists bool
for i := 0; i < 50; i++ {
_, exists = p.getServiceInfo(service)
if !exists {
break
}
time.Sleep(50 * time.Millisecond)
}
if exists {
t.Fatalf("service with empty ClusterIP should not be included in the proxy")
}
@ -938,7 +963,7 @@ func TestProxyUpdatePortal(t *testing.T) {
}
p.OnServiceUpdate(svcv2, svcv3)
lb.OnEndpointsAdd(endpoint)
svcInfo, exists = p.getServiceInfo(service)
svcInfo, exists = waitForServiceInfo(p, service)
if !exists {
t.Fatalf("service with ClusterIP set not found in the proxy")
}
@ -946,6 +971,172 @@ func TestProxyUpdatePortal(t *testing.T) {
waitForNumProxyLoops(t, p, 1)
}
type fakeRunner struct{}
// assert fakeAsyncRunner is a ProxyProvider
var _ asyncRunnerInterface = &fakeRunner{}
func (f fakeRunner) Run() {
}
func (f fakeRunner) Loop(stop <-chan struct{}) {
}
func TestOnServiceAddChangeMap(t *testing.T) {
fexec := makeFakeExec()
// Use long minSyncPeriod so we can test that immediate syncs work
p, err := createProxier(NewLoadBalancerRR(), net.ParseIP("0.0.0.0"), ipttest.NewFake(), fexec, net.ParseIP("127.0.0.1"), nil, time.Minute, time.Minute, udpIdleTimeoutForTest, newProxySocket)
if err != nil {
t.Fatal(err)
}
// Fake out sync runner
p.syncRunner = fakeRunner{}
serviceMeta := metav1.ObjectMeta{Namespace: "testnamespace", Name: "testname"}
service := &v1.Service{
ObjectMeta: serviceMeta,
Spec: v1.ServiceSpec{ClusterIP: "1.2.3.4", Ports: []v1.ServicePort{{
Name: "p",
Port: 99,
Protocol: "TCP",
}}},
}
serviceUpdate := &v1.Service{
ObjectMeta: serviceMeta,
Spec: v1.ServiceSpec{ClusterIP: "1.2.3.5", Ports: []v1.ServicePort{{
Name: "p",
Port: 100,
Protocol: "TCP",
}}},
}
serviceUpdate2 := &v1.Service{
ObjectMeta: serviceMeta,
Spec: v1.ServiceSpec{ClusterIP: "1.2.3.6", Ports: []v1.ServicePort{{
Name: "p",
Port: 101,
Protocol: "TCP",
}}},
}
type onServiceTest struct {
detail string
changes []serviceChange
expectedChange *serviceChange
}
tests := []onServiceTest{
{
detail: "add",
changes: []serviceChange{
{current: service},
},
expectedChange: &serviceChange{
current: service,
},
},
{
detail: "add+update=add",
changes: []serviceChange{
{current: service},
{
previous: service,
current: serviceUpdate,
},
},
expectedChange: &serviceChange{
current: serviceUpdate,
},
},
{
detail: "add+del=none",
changes: []serviceChange{
{current: service},
{previous: service},
},
},
{
detail: "update+update=update",
changes: []serviceChange{
{
previous: service,
current: serviceUpdate,
},
{
previous: serviceUpdate,
current: serviceUpdate2,
},
},
expectedChange: &serviceChange{
previous: service,
current: serviceUpdate2,
},
},
{
detail: "update+del=del",
changes: []serviceChange{
{
previous: service,
current: serviceUpdate,
},
{previous: serviceUpdate},
},
// change collapsing always keeps the oldest service
// info since correct unmerging depends on the least
// recent update, not the most current.
expectedChange: &serviceChange{
previous: service,
},
},
{
detail: "del+add=update",
changes: []serviceChange{
{previous: service},
{current: serviceUpdate},
},
expectedChange: &serviceChange{
previous: service,
current: serviceUpdate,
},
},
}
for _, test := range tests {
for _, change := range test.changes {
p.serviceChange(change.previous, change.current, test.detail)
}
if test.expectedChange != nil {
if len(p.serviceChanges) != 1 {
t.Fatalf("[%s] expected 1 service change but found %d", test.detail, len(p.serviceChanges))
}
expectedService := test.expectedChange.current
if expectedService == nil {
expectedService = test.expectedChange.previous
}
svcName := types.NamespacedName{Namespace: expectedService.Namespace, Name: expectedService.Name}
change, ok := p.serviceChanges[svcName]
if !ok {
t.Fatalf("[%s] did not find service change for %v", test.detail, svcName)
}
if !reflect.DeepEqual(change.previous, test.expectedChange.previous) {
t.Fatalf("[%s] change previous service and expected previous service don't match\nchange: %+v\nexp: %+v", test.detail, change.previous, test.expectedChange.previous)
}
if !reflect.DeepEqual(change.current, test.expectedChange.current) {
t.Fatalf("[%s] change current service and expected current service don't match\nchange: %+v\nexp: %+v", test.detail, change.current, test.expectedChange.current)
}
} else {
if len(p.serviceChanges) != 0 {
t.Fatalf("[%s] expected no service changes but found %d", test.detail, len(p.serviceChanges))
}
}
}
}
func makeFakeExec() *fakeexec.FakeExec {
fcmd := fakeexec.FakeCmd{
CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{