From d18a03b07f0b76f92f962154807aaddee447c750 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Wed, 28 Apr 2021 14:27:55 -0400 Subject: [PATCH] connect/proxy: fix a number of problems with Listener We noticed that TestUpstreamListener would deadlock sometimes when run with the race detector. While debugging this issue I found and fixed the following problems. 1. the net.Listener was not being closed properly when Listener.Stop was called. This caused the Listener.Serve goroutine to run forever. Fixed by storing a reference to net.Listener and closing it properly when Listener.Stop is called. 2. call connWG.Add in the correct place. WaitGroup.Add must be called before starting a goroutine, not from inside the goroutine. 3. Set metrics config EnableRuntimeMetrics to `false` so that we don't start a background goroutine in each test for no reason. There is no way to shutdown this goroutine, and it was an added distraction while debugging these timeouts. 5. two tests were calling require.NoError from a goroutine. require.NoError calls t.FailNow, which MUST be called from the main test goroutine. Instead use t.Errorf, which can be called from other goroutines and will still fail the test. 6. `assertCurrentGaugeValue` wass breaking out of a for loop, which would cause the `RWMutex.RUnlock` to be missed. Fixed by calling unlock before `break`. The core issue of a deadlock was fixed by https://github.com/armon/go-metrics/pull/124. --- connect/proxy/listener.go | 21 +++++++++++++-------- connect/proxy/listener_test.go | 12 ++++++++---- 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/connect/proxy/listener.go b/connect/proxy/listener.go index 56e19297ed..ea84493db5 100644 --- a/connect/proxy/listener.go +++ b/connect/proxy/listener.go @@ -10,10 +10,11 @@ import ( "time" metrics "github.com/armon/go-metrics" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/connect" "github.com/hashicorp/consul/ipaddr" - "github.com/hashicorp/go-hclog" ) const ( @@ -44,6 +45,7 @@ type Listener struct { // `connection refused`. Retry loops and sleeps are unpleasant workarounds and // this is cheap and correct. listeningChan chan struct{} + listener net.Listener logger hclog.Logger @@ -136,14 +138,15 @@ func (l *Listener) Serve() error { return errors.New("serve called on a closed listener") } - listen, err := l.listenFunc() + var err error + l.listener, err = l.listenFunc() if err != nil { return err } close(l.listeningChan) for { - conn, err := listen.Accept() + conn, err := l.listener.Accept() if err != nil { if atomic.LoadInt32(&l.stopFlag) == 1 { return nil @@ -151,6 +154,7 @@ func (l *Listener) Serve() error { return err } + l.connWG.Add(1) go l.handleConn(conn) } } @@ -158,6 +162,8 @@ func (l *Listener) Serve() error { // handleConn is the internal connection handler goroutine. func (l *Listener) handleConn(src net.Conn) { defer src.Close() + // Make sure Listener.Close waits for this conn to be cleaned up. + defer l.connWG.Done() dst, err := l.dialFunc() if err != nil { @@ -169,11 +175,6 @@ func (l *Listener) handleConn(src net.Conn) { // it closes. defer l.trackConn()() - // Make sure Close() waits for this conn to be cleaned up. Note defer is - // before conn.Close() so runs after defer conn.Close(). - l.connWG.Add(1) - defer l.connWG.Done() - // Note no need to defer dst.Close() since conn handles that for us. conn := NewConn(src, dst) defer conn.Close() @@ -246,6 +247,10 @@ func (l *Listener) Close() error { close(l.stopChan) // Wait for all conns to close l.connWG.Wait() + + if l.listener != nil { + l.listener.Close() + } } return nil } diff --git a/connect/proxy/listener_test.go b/connect/proxy/listener_test.go index fe842c4f6d..83cb399877 100644 --- a/connect/proxy/listener_test.go +++ b/connect/proxy/listener_test.go @@ -26,6 +26,7 @@ func testSetupMetrics(t *testing.T) *metrics.InmemSink { s := metrics.NewInmemSink(10*time.Second, 300*time.Second) cfg := metrics.DefaultConfig("consul.proxy.test") cfg.EnableHostname = false + cfg.EnableRuntimeMetrics = false metrics.NewGlobal(cfg, s) return s } @@ -45,6 +46,7 @@ func assertCurrentGaugeValue(t *testing.T, sink *metrics.InmemSink, currentInterval.RLock() if len(currentInterval.Gauges) > 0 { got = currentInterval.Gauges[name].Value + currentInterval.RUnlock() break } currentInterval.RUnlock() @@ -132,8 +134,9 @@ func TestPublicListener(t *testing.T) { // Run proxy go func() { - err := l.Serve() - require.NoError(t, err) + if err := l.Serve(); err != nil { + t.Errorf("failed to listen: %v", err.Error()) + } }() defer l.Close() l.Wait() @@ -200,8 +203,9 @@ func TestUpstreamListener(t *testing.T) { // Run proxy go func() { - err := l.Serve() - require.NoError(t, err) + if err := l.Serve(); err != nil { + t.Errorf("failed to listen: %v", err.Error()) + } }() defer l.Close() l.Wait()