diff --git a/api/agent_test.go b/api/agent_test.go index e29643b45c..00caaf8880 100644 --- a/api/agent_test.go +++ b/api/agent_test.go @@ -1124,12 +1124,8 @@ func TestAPI_AgentConnectProxyConfig(t *testing.T) { t.Parallel() c, s := makeClientWithConfig(t, nil, func(c *testutil.TestServerConfig) { // Force auto port range to 1 port so we have deterministic response. - c.Connect = map[string]interface{}{ - "proxy_defaults": map[string]interface{}{ - "bind_min_port": 20000, - "bind_max_port": 20000, - }, - } + c.Ports.ProxyMinPort = 20000 + c.Ports.ProxyMaxPort = 20000 }) defer s.Stop() @@ -1165,6 +1161,10 @@ func TestAPI_AgentConnectProxyConfig(t *testing.T) { "bind_port": float64(20000), "foo": "bar", "local_service_address": "127.0.0.1:8000", + "telemetry": map[string]interface{}{ + "FilterDefault": true, + "MetricsPrefix": "consul.proxy.foo", + }, }, } require.Equal(t, expectConfig, config) diff --git a/connect/proxy/config.go b/connect/proxy/config.go index eee7c8ee82..d98c2d633d 100644 --- a/connect/proxy/config.go +++ b/connect/proxy/config.go @@ -37,8 +37,8 @@ type Config struct { // Upstreams configures outgoing proxies for remote connect services. Upstreams []UpstreamConfig `json:"upstreams" hcl:"upstreams"` - // Telemetry stores configuration to configure go-metrics. It is typically - // passed the Telemetry block from the agent's config verbatim so that the + // Telemetry stores configuration for go-metrics. It is typically populated + // from the agent's runtime config via the proxy config endpoint so that the // proxy will log metrics to the same location(s) as the agent. Telemetry map[string]interface{} } diff --git a/connect/proxy/config_test.go b/connect/proxy/config_test.go index 34080fb5a6..0f92bac38f 100644 --- a/connect/proxy/config_test.go +++ b/connect/proxy/config_test.go @@ -175,6 +175,10 @@ func TestAgentConfigWatcher(t *testing.T) { ConnectTimeoutMs: 10000, // from applyDefaults }, }, + Telemetry: map[string]interface{}{ + "FilterDefault": true, + "MetricsPrefix": "consul.proxy.web", + }, } assert.Equal(t, expectCfg, cfg) diff --git a/connect/proxy/listener.go b/connect/proxy/listener.go index a6b9a75e85..7e5d0f87e8 100644 --- a/connect/proxy/listener.go +++ b/connect/proxy/listener.go @@ -160,6 +160,11 @@ func (l *Listener) handleConn(src net.Conn) { // it closes. defer l.trackConn()() + // Make sure Close() waits for this conn to be cleaned up. Note defer is + // before conn.Close() so runs after defer conn.Close(). + l.connWG.Add(1) + defer l.connWG.Done() + // Note no need to defer dst.Close() since conn handles that for us. conn := NewConn(src, dst) defer conn.Close() @@ -171,7 +176,6 @@ func (l *Listener) handleConn(src net.Conn) { err = conn.CopyBytes() if err != nil { l.logger.Printf("[ERR] connection failed: %s", err) - return } close(connStop) }() @@ -215,13 +219,11 @@ func (l *Listener) handleConn(src net.Conn) { // trackConn increments the count of active conns and returns a func() that can // be deferred on to decrement the counter again on connection close. func (l *Listener) trackConn() func() { - l.connWG.Add(1) c := atomic.AddInt64(&l.activeConns, 1) metrics.SetGaugeWithLabels([]string{l.metricPrefix, "conns"}, float32(c), l.metricLabels) return func() { - l.connWG.Done() c := atomic.AddInt64(&l.activeConns, -1) metrics.SetGaugeWithLabels([]string{l.metricPrefix, "conns"}, float32(c), l.metricLabels) diff --git a/connect/proxy/listener_test.go b/connect/proxy/listener_test.go index 773e3a2db2..17f6c379e0 100644 --- a/connect/proxy/listener_test.go +++ b/connect/proxy/listener_test.go @@ -20,8 +20,8 @@ import ( ) func testSetupMetrics(t *testing.T) *metrics.InmemSink { - // Record for ages so we can be confident that our assertions won't fail on - // silly long test runs due to dropped data. + // Record for ages (5 mins) so we can be confident that our assertions won't + // fail on silly long test runs due to dropped data. s := metrics.NewInmemSink(10*time.Second, 300*time.Second) cfg := metrics.DefaultConfig("consul.proxy.test") cfg.EnableHostname = false @@ -88,7 +88,7 @@ func assertAllTimeCounterValue(t *testing.T, sink *metrics.InmemSink, } func TestPublicListener(t *testing.T) { - t.Parallel() + // Can't enable t.Parallel since we rely on the global metrics instance. ca := agConnect.TestCA(t, nil) ports := freeport.GetT(t, 1) @@ -141,7 +141,7 @@ func TestPublicListener(t *testing.T) { } func TestUpstreamListener(t *testing.T) { - t.Parallel() + // Can't enable t.Parallel since we rely on the global metrics instance. ca := agConnect.TestCA(t, nil) ports := freeport.GetT(t, 1) @@ -188,6 +188,7 @@ func TestUpstreamListener(t *testing.T) { conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", cfg.LocalBindAddress, cfg.LocalBindPort)) require.NoError(t, err) + TestEchoConn(t, conn, "") // Check active conn is tracked in gauges diff --git a/testutil/server.go b/testutil/server.go index e80b0e7fd2..193a856a34 100644 --- a/testutil/server.go +++ b/testutil/server.go @@ -43,12 +43,14 @@ type TestPerformanceConfig struct { // TestPortConfig configures the various ports used for services // provided by the Consul server. type TestPortConfig struct { - DNS int `json:"dns,omitempty"` - HTTP int `json:"http,omitempty"` - HTTPS int `json:"https,omitempty"` - SerfLan int `json:"serf_lan,omitempty"` - SerfWan int `json:"serf_wan,omitempty"` - Server int `json:"server,omitempty"` + DNS int `json:"dns,omitempty"` + HTTP int `json:"http,omitempty"` + HTTPS int `json:"https,omitempty"` + SerfLan int `json:"serf_lan,omitempty"` + SerfWan int `json:"serf_wan,omitempty"` + Server int `json:"server,omitempty"` + ProxyMinPort int `json:"proxy_min_port,omitempty"` + ProxyMaxPort int `json:"proxy_max_port,omitempty"` } // TestAddressConfig contains the bind addresses for various