Browse Source

Misc rebase and test fixes

pull/4275/head
Paul Banks 7 years ago committed by Jack Pearkes
parent
commit
96c416012e
  1. 12
      api/agent_test.go
  2. 4
      connect/proxy/config.go
  3. 4
      connect/proxy/config_test.go
  4. 8
      connect/proxy/listener.go
  5. 9
      connect/proxy/listener_test.go
  6. 14
      testutil/server.go

12
api/agent_test.go

@ -1124,12 +1124,8 @@ func TestAPI_AgentConnectProxyConfig(t *testing.T) {
t.Parallel() t.Parallel()
c, s := makeClientWithConfig(t, nil, func(c *testutil.TestServerConfig) { c, s := makeClientWithConfig(t, nil, func(c *testutil.TestServerConfig) {
// Force auto port range to 1 port so we have deterministic response. // Force auto port range to 1 port so we have deterministic response.
c.Connect = map[string]interface{}{ c.Ports.ProxyMinPort = 20000
"proxy_defaults": map[string]interface{}{ c.Ports.ProxyMaxPort = 20000
"bind_min_port": 20000,
"bind_max_port": 20000,
},
}
}) })
defer s.Stop() defer s.Stop()
@ -1165,6 +1161,10 @@ func TestAPI_AgentConnectProxyConfig(t *testing.T) {
"bind_port": float64(20000), "bind_port": float64(20000),
"foo": "bar", "foo": "bar",
"local_service_address": "127.0.0.1:8000", "local_service_address": "127.0.0.1:8000",
"telemetry": map[string]interface{}{
"FilterDefault": true,
"MetricsPrefix": "consul.proxy.foo",
},
}, },
} }
require.Equal(t, expectConfig, config) require.Equal(t, expectConfig, config)

4
connect/proxy/config.go

@ -37,8 +37,8 @@ type Config struct {
// Upstreams configures outgoing proxies for remote connect services. // Upstreams configures outgoing proxies for remote connect services.
Upstreams []UpstreamConfig `json:"upstreams" hcl:"upstreams"` Upstreams []UpstreamConfig `json:"upstreams" hcl:"upstreams"`
// Telemetry stores configuration to configure go-metrics. It is typically // Telemetry stores configuration for go-metrics. It is typically populated
// passed the Telemetry block from the agent's config verbatim so that the // from the agent's runtime config via the proxy config endpoint so that the
// proxy will log metrics to the same location(s) as the agent. // proxy will log metrics to the same location(s) as the agent.
Telemetry map[string]interface{} Telemetry map[string]interface{}
} }

4
connect/proxy/config_test.go

@ -175,6 +175,10 @@ func TestAgentConfigWatcher(t *testing.T) {
ConnectTimeoutMs: 10000, // from applyDefaults ConnectTimeoutMs: 10000, // from applyDefaults
}, },
}, },
Telemetry: map[string]interface{}{
"FilterDefault": true,
"MetricsPrefix": "consul.proxy.web",
},
} }
assert.Equal(t, expectCfg, cfg) assert.Equal(t, expectCfg, cfg)

8
connect/proxy/listener.go

@ -160,6 +160,11 @@ func (l *Listener) handleConn(src net.Conn) {
// it closes. // it closes.
defer l.trackConn()() defer l.trackConn()()
// Make sure Close() waits for this conn to be cleaned up. Note defer is
// before conn.Close() so runs after defer conn.Close().
l.connWG.Add(1)
defer l.connWG.Done()
// Note no need to defer dst.Close() since conn handles that for us. // Note no need to defer dst.Close() since conn handles that for us.
conn := NewConn(src, dst) conn := NewConn(src, dst)
defer conn.Close() defer conn.Close()
@ -171,7 +176,6 @@ func (l *Listener) handleConn(src net.Conn) {
err = conn.CopyBytes() err = conn.CopyBytes()
if err != nil { if err != nil {
l.logger.Printf("[ERR] connection failed: %s", err) l.logger.Printf("[ERR] connection failed: %s", err)
return
} }
close(connStop) close(connStop)
}() }()
@ -215,13 +219,11 @@ func (l *Listener) handleConn(src net.Conn) {
// trackConn increments the count of active conns and returns a func() that can // trackConn increments the count of active conns and returns a func() that can
// be deferred on to decrement the counter again on connection close. // be deferred on to decrement the counter again on connection close.
func (l *Listener) trackConn() func() { func (l *Listener) trackConn() func() {
l.connWG.Add(1)
c := atomic.AddInt64(&l.activeConns, 1) c := atomic.AddInt64(&l.activeConns, 1)
metrics.SetGaugeWithLabels([]string{l.metricPrefix, "conns"}, float32(c), metrics.SetGaugeWithLabels([]string{l.metricPrefix, "conns"}, float32(c),
l.metricLabels) l.metricLabels)
return func() { return func() {
l.connWG.Done()
c := atomic.AddInt64(&l.activeConns, -1) c := atomic.AddInt64(&l.activeConns, -1)
metrics.SetGaugeWithLabels([]string{l.metricPrefix, "conns"}, float32(c), metrics.SetGaugeWithLabels([]string{l.metricPrefix, "conns"}, float32(c),
l.metricLabels) l.metricLabels)

9
connect/proxy/listener_test.go

@ -20,8 +20,8 @@ import (
) )
func testSetupMetrics(t *testing.T) *metrics.InmemSink { func testSetupMetrics(t *testing.T) *metrics.InmemSink {
// Record for ages so we can be confident that our assertions won't fail on // Record for ages (5 mins) so we can be confident that our assertions won't
// silly long test runs due to dropped data. // fail on silly long test runs due to dropped data.
s := metrics.NewInmemSink(10*time.Second, 300*time.Second) s := metrics.NewInmemSink(10*time.Second, 300*time.Second)
cfg := metrics.DefaultConfig("consul.proxy.test") cfg := metrics.DefaultConfig("consul.proxy.test")
cfg.EnableHostname = false cfg.EnableHostname = false
@ -88,7 +88,7 @@ func assertAllTimeCounterValue(t *testing.T, sink *metrics.InmemSink,
} }
func TestPublicListener(t *testing.T) { func TestPublicListener(t *testing.T) {
t.Parallel() // Can't enable t.Parallel since we rely on the global metrics instance.
ca := agConnect.TestCA(t, nil) ca := agConnect.TestCA(t, nil)
ports := freeport.GetT(t, 1) ports := freeport.GetT(t, 1)
@ -141,7 +141,7 @@ func TestPublicListener(t *testing.T) {
} }
func TestUpstreamListener(t *testing.T) { func TestUpstreamListener(t *testing.T) {
t.Parallel() // Can't enable t.Parallel since we rely on the global metrics instance.
ca := agConnect.TestCA(t, nil) ca := agConnect.TestCA(t, nil)
ports := freeport.GetT(t, 1) ports := freeport.GetT(t, 1)
@ -188,6 +188,7 @@ func TestUpstreamListener(t *testing.T) {
conn, err := net.Dial("tcp", conn, err := net.Dial("tcp",
fmt.Sprintf("%s:%d", cfg.LocalBindAddress, cfg.LocalBindPort)) fmt.Sprintf("%s:%d", cfg.LocalBindAddress, cfg.LocalBindPort))
require.NoError(t, err) require.NoError(t, err)
TestEchoConn(t, conn, "") TestEchoConn(t, conn, "")
// Check active conn is tracked in gauges // Check active conn is tracked in gauges

14
testutil/server.go

@ -43,12 +43,14 @@ type TestPerformanceConfig struct {
// TestPortConfig configures the various ports used for services // TestPortConfig configures the various ports used for services
// provided by the Consul server. // provided by the Consul server.
type TestPortConfig struct { type TestPortConfig struct {
DNS int `json:"dns,omitempty"` DNS int `json:"dns,omitempty"`
HTTP int `json:"http,omitempty"` HTTP int `json:"http,omitempty"`
HTTPS int `json:"https,omitempty"` HTTPS int `json:"https,omitempty"`
SerfLan int `json:"serf_lan,omitempty"` SerfLan int `json:"serf_lan,omitempty"`
SerfWan int `json:"serf_wan,omitempty"` SerfWan int `json:"serf_wan,omitempty"`
Server int `json:"server,omitempty"` Server int `json:"server,omitempty"`
ProxyMinPort int `json:"proxy_min_port,omitempty"`
ProxyMaxPort int `json:"proxy_max_port,omitempty"`
} }
// TestAddressConfig contains the bind addresses for various // TestAddressConfig contains the bind addresses for various

Loading…
Cancel
Save