From ddab79a233fc159633ae710d50f406f06c69681a Mon Sep 17 00:00:00 2001 From: Dan Williams Date: Sun, 31 Mar 2019 14:41:11 -0500 Subject: [PATCH] proxy/userspace: add proxy shutdown function and use in testcases If a testcase does time out and 'go test' prints the call stack, make sure everything from previous tests is cleaned up so the call stack is easier to understand. --- pkg/proxy/userspace/proxier.go | 27 ++++++++++++++++++++++++--- pkg/proxy/userspace/proxier_test.go | 15 +++++++++++++++ 2 files changed, 39 insertions(+), 3 deletions(-) diff --git a/pkg/proxy/userspace/proxier.go b/pkg/proxy/userspace/proxier.go index 52f8586b65..0f57ce81bd 100644 --- a/pkg/proxy/userspace/proxier.go +++ b/pkg/proxy/userspace/proxier.go @@ -109,6 +109,8 @@ type Proxier struct { proxyPorts PortAllocator makeProxySocket ProxySocketFunc exec utilexec.Interface + + stopChan chan struct{} } // assert Proxier is a ProxyProvider @@ -216,6 +218,7 @@ func createProxier(loadBalancer LoadBalancer, listenIP net.IP, iptables iptables proxyPorts: proxyPorts, makeProxySocket: makeProxySocket, exec: exec, + stopChan: make(chan struct{}), }, nil } @@ -287,6 +290,20 @@ func CleanupLeftovers(ipt iptables.Interface) (encounteredError bool) { return encounteredError } +// shutdown closes all service port proxies and returns from the proxy's +// sync loop. Used from testcases. +func (proxier *Proxier) shutdown() { + defer proxier.cleanupStaleStickySessions() + + proxier.mu.Lock() + defer proxier.mu.Unlock() + + for serviceName, info := range proxier.serviceMap { + proxier.stopProxyInternal(serviceName, info) + } + close(proxier.stopChan) +} + // Sync is called to immediately synchronize the proxier state to iptables func (proxier *Proxier) Sync() { if err := iptablesInit(proxier.iptables); err != nil { @@ -301,9 +318,13 @@ func (proxier *Proxier) SyncLoop() { t := time.NewTicker(proxier.syncPeriod) defer t.Stop() for { - <-t.C - klog.V(6).Infof("Periodic sync") - proxier.Sync() + select { + case <-t.C: + klog.V(6).Infof("Periodic sync") + proxier.Sync() + case <-proxier.stopChan: + return + } } } diff --git a/pkg/proxy/userspace/proxier_test.go b/pkg/proxy/userspace/proxier_test.go index cee7eec411..9488c1bb21 100644 --- a/pkg/proxy/userspace/proxier_test.go +++ b/pkg/proxy/userspace/proxier_test.go @@ -243,6 +243,7 @@ func TestTCPProxy(t *testing.T) { t.Fatal(err) } waitForNumProxyLoops(t, p, 0) + defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) if err != nil { @@ -270,6 +271,7 @@ func TestUDPProxy(t *testing.T) { t.Fatal(err) } waitForNumProxyLoops(t, p, 0) + defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) if err != nil { @@ -297,6 +299,7 @@ func TestUDPProxyTimeout(t *testing.T) { t.Fatal(err) } waitForNumProxyLoops(t, p, 0) + defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) if err != nil { @@ -336,6 +339,7 @@ func TestMultiPortProxy(t *testing.T) { t.Fatal(err) } waitForNumProxyLoops(t, p, 0) + defer p.shutdown() svcInfoP, err := p.addServiceOnPort(serviceP, "TCP", 0, time.Second) if err != nil { @@ -365,6 +369,7 @@ func TestMultiPortOnServiceAdd(t *testing.T) { t.Fatal(err) } waitForNumProxyLoops(t, p, 0) + defer p.shutdown() p.OnServiceAdd(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, @@ -428,6 +433,7 @@ func TestTCPProxyStop(t *testing.T) { t.Fatal(err) } waitForNumProxyLoops(t, p, 0) + defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) if err != nil { @@ -472,6 +478,7 @@ func TestUDPProxyStop(t *testing.T) { t.Fatal(err) } waitForNumProxyLoops(t, p, 0) + defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) if err != nil { @@ -510,6 +517,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) { t.Fatal(err) } waitForNumProxyLoops(t, p, 0) + defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) if err != nil { @@ -554,6 +562,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) { t.Fatal(err) } waitForNumProxyLoops(t, p, 0) + defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) if err != nil { @@ -599,6 +608,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { t.Fatal(err) } waitForNumProxyLoops(t, p, 0) + defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) if err != nil { @@ -661,6 +671,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { t.Fatal(err) } waitForNumProxyLoops(t, p, 0) + defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) if err != nil { @@ -722,6 +733,7 @@ func TestTCPProxyUpdatePort(t *testing.T) { t.Fatal(err) } waitForNumProxyLoops(t, p, 0) + defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) if err != nil { @@ -770,6 +782,7 @@ func TestUDPProxyUpdatePort(t *testing.T) { t.Fatal(err) } waitForNumProxyLoops(t, p, 0) + defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "UDP", 0, time.Second) if err != nil { @@ -815,6 +828,7 @@ func TestProxyUpdatePublicIPs(t *testing.T) { t.Fatal(err) } waitForNumProxyLoops(t, p, 0) + defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) if err != nil { @@ -868,6 +882,7 @@ func TestProxyUpdatePortal(t *testing.T) { t.Fatal(err) } waitForNumProxyLoops(t, p, 0) + defer p.shutdown() svcInfo, err := p.addServiceOnPort(service, "TCP", 0, time.Second) if err != nil {