agent/grpc: make TestHandler_EmitsStats predictable

Occasionally this test would flake. The flakes were fixed by:

1. Stopping the service and retrying to check on metrics. This way we
   also include the active_streams going to 0 in the metric calls.

2. Using a reference to the global Metrics. This way when other tests
   have background goroutines that are still shutting down, they won't
   emit metrics to the metric instance with the fake Sink. The stats
   test can patch the local reference to the global, so the existing
   statHandlers will continue to emit to the global, but the stats
   test will send all metrics to the replacement.
pull/8679/head
Daniel Nephin 2020-09-14 13:16:13 -04:00
parent 0c87cf468c
commit 636f76f6f1
4 changed files with 65 additions and 33 deletions

View File

@ -15,7 +15,7 @@ func NewHandler(addr net.Addr) *Handler {
// We don't need to pass tls.Config to the server since it's multiplexed // We don't need to pass tls.Config to the server since it's multiplexed
// behind the RPC listener, which already has TLS configured. // behind the RPC listener, which already has TLS configured.
srv := grpc.NewServer( srv := grpc.NewServer(
grpc.StatsHandler(&statsHandler{}), grpc.StatsHandler(newStatsHandler()),
grpc.StreamInterceptor((&activeStreamCounter{}).Intercept), grpc.StreamInterceptor((&activeStreamCounter{}).Intercept),
) )

28
agent/grpc/server_test.go Normal file
View File

@ -0,0 +1,28 @@
package grpc
import (
"context"
"time"
"github.com/hashicorp/consul/agent/grpc/internal/testservice"
)
type simple struct {
name string
dc string
}
func (s *simple) Flow(_ *testservice.Req, flow testservice.Simple_FlowServer) error {
for flow.Context().Err() == nil {
resp := &testservice.Resp{ServerName: "one", Datacenter: s.dc}
if err := flow.Send(resp); err != nil {
return err
}
time.Sleep(time.Millisecond)
}
return nil
}
func (s *simple) Something(_ context.Context, _ *testservice.Req) (*testservice.Resp, error) {
return &testservice.Resp{ServerName: s.name, Datacenter: s.dc}, nil
}

View File

@ -9,12 +9,19 @@ import (
"google.golang.org/grpc/stats" "google.golang.org/grpc/stats"
) )
var defaultMetrics = metrics.Default()
// statsHandler is a grpc/stats.StatsHandler which emits connection and // statsHandler is a grpc/stats.StatsHandler which emits connection and
// request metrics to go-metrics. // request metrics to go-metrics.
type statsHandler struct { type statsHandler struct {
metrics *metrics.Metrics
activeConns uint64 // must be 8-byte aligned for atomic access activeConns uint64 // must be 8-byte aligned for atomic access
} }
func newStatsHandler() *statsHandler {
return &statsHandler{metrics: defaultMetrics}
}
// TagRPC implements grpcStats.StatsHandler // TagRPC implements grpcStats.StatsHandler
func (c *statsHandler) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context { func (c *statsHandler) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context {
// No-op // No-op
@ -29,7 +36,7 @@ func (c *statsHandler) HandleRPC(_ context.Context, s stats.RPCStats) {
} }
switch s.(type) { switch s.(type) {
case *stats.InHeader: case *stats.InHeader:
metrics.IncrCounter([]string{"grpc", label, "request"}, 1) c.metrics.IncrCounter([]string{"grpc", label, "request"}, 1)
} }
} }
@ -53,7 +60,7 @@ func (c *statsHandler) HandleConn(_ context.Context, s stats.ConnStats) {
// Decrement! // Decrement!
count = atomic.AddUint64(&c.activeConns, ^uint64(0)) count = atomic.AddUint64(&c.activeConns, ^uint64(0))
} }
metrics.SetGauge([]string{"grpc", label, "active_conns"}, float32(count)) c.metrics.SetGauge([]string{"grpc", label, "active_conns"}, float32(count))
} }
type activeStreamCounter struct { type activeStreamCounter struct {
@ -71,10 +78,10 @@ func (i *activeStreamCounter) Intercept(
handler grpc.StreamHandler, handler grpc.StreamHandler,
) error { ) error {
count := atomic.AddUint64(&i.count, 1) count := atomic.AddUint64(&i.count, 1)
metrics.SetGauge([]string{"grpc", "server", "active_streams"}, float32(count)) defaultMetrics.SetGauge([]string{"grpc", "server", "active_streams"}, float32(count))
defer func() { defer func() {
count := atomic.AddUint64(&i.count, ^uint64(0)) count := atomic.AddUint64(&i.count, ^uint64(0))
metrics.SetGauge([]string{"grpc", "server", "active_streams"}, float32(count)) defaultMetrics.SetGauge([]string{"grpc", "server", "active_streams"}, float32(count))
}() }()
return handler(srv, ss) return handler(srv, ss)

View File

@ -8,6 +8,7 @@ import (
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/hashicorp/consul/agent/grpc/internal/testservice" "github.com/hashicorp/consul/agent/grpc/internal/testservice"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"google.golang.org/grpc" "google.golang.org/grpc"
@ -18,12 +19,11 @@ func TestHandler_EmitsStats(t *testing.T) {
addr := &net.IPAddr{IP: net.ParseIP("127.0.0.1")} addr := &net.IPAddr{IP: net.ParseIP("127.0.0.1")}
handler := NewHandler(addr) handler := NewHandler(addr)
testservice.RegisterSimpleServer(handler.srv, &simple{}) testservice.RegisterSimpleServer(handler.srv, &simple{})
lis, err := net.Listen("tcp", "127.0.0.1:0") lis, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err) require.NoError(t, err)
defer lis.Close() t.Cleanup(logError(t, lis.Close))
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel) t.Cleanup(cancel)
@ -43,7 +43,7 @@ func TestHandler_EmitsStats(t *testing.T) {
conn, err := grpc.DialContext(ctx, lis.Addr().String(), grpc.WithInsecure()) conn, err := grpc.DialContext(ctx, lis.Addr().String(), grpc.WithInsecure())
require.NoError(t, err) require.NoError(t, err)
defer conn.Close() t.Cleanup(logError(t, conn.Close))
client := testservice.NewSimpleClient(conn) client := testservice.NewSimpleClient(conn)
fClient, err := client.Flow(ctx, &testservice.Req{Datacenter: "mine"}) fClient, err := client.Flow(ctx, &testservice.Req{Datacenter: "mine"})
@ -53,36 +53,24 @@ func TestHandler_EmitsStats(t *testing.T) {
_, err = fClient.Recv() _, err = fClient.Recv()
require.NoError(t, err) require.NoError(t, err)
cancel()
// Wait for the server to stop so that active_streams is predictable.
retry.RunWith(fastRetry, t, func(r *retry.R) {
expectedGauge := []metricCall{
{key: []string{"testing", "grpc", "server", "active_conns"}, val: 1},
{key: []string{"testing", "grpc", "server", "active_streams"}, val: 1},
{key: []string{"testing", "grpc", "server", "active_streams"}, val: 0},
}
require.Equal(r, expectedGauge, sink.gaugeCalls)
})
expectedCounter := []metricCall{ expectedCounter := []metricCall{
{key: []string{"testing", "grpc", "server", "request"}, val: 1}, {key: []string{"testing", "grpc", "server", "request"}, val: 1},
} }
require.Equal(t, expectedCounter, sink.incrCounterCalls) require.Equal(t, expectedCounter, sink.incrCounterCalls)
expectedGauge := []metricCall{
{key: []string{"testing", "grpc", "server", "active_conns"}, val: 1},
{key: []string{"testing", "grpc", "server", "active_streams"}, val: 1},
// TODO: why is the count reset to 0 before the client receives the second message?
{key: []string{"testing", "grpc", "server", "active_streams"}, val: 0},
}
require.Equal(t, expectedGauge, sink.gaugeCalls)
} }
type simple struct { var fastRetry = &retry.Timer{Timeout: 7 * time.Second, Wait: 2 * time.Millisecond}
name string
}
func (s *simple) Flow(_ *testservice.Req, flow testservice.Simple_FlowServer) error {
if err := flow.Send(&testservice.Resp{ServerName: "one"}); err != nil {
return err
}
if err := flow.Send(&testservice.Resp{ServerName: "two"}); err != nil {
return err
}
return nil
}
func (s *simple) Something(_ context.Context, _ *testservice.Req) (*testservice.Resp, error) {
return &testservice.Resp{ServerName: "the-fake-service-name"}, nil
}
func patchGlobalMetrics(t *testing.T) *fakeMetricsSink { func patchGlobalMetrics(t *testing.T) *fakeMetricsSink {
t.Helper() t.Helper()
@ -94,7 +82,8 @@ func patchGlobalMetrics(t *testing.T) *fakeMetricsSink {
ProfileInterval: time.Second, // Poll runtime every second ProfileInterval: time.Second, // Poll runtime every second
FilterDefault: true, FilterDefault: true,
} }
_, err := metrics.NewGlobal(cfg, sink) var err error
defaultMetrics, err = metrics.New(cfg, sink)
require.NoError(t, err) require.NoError(t, err)
t.Cleanup(func() { t.Cleanup(func() {
_, err = metrics.NewGlobal(cfg, &metrics.BlackholeSink{}) _, err = metrics.NewGlobal(cfg, &metrics.BlackholeSink{})
@ -122,3 +111,11 @@ type metricCall struct {
val float32 val float32
labels []metrics.Label labels []metrics.Label
} }
func logError(t *testing.T, f func() error) func() {
return func() {
if err := f(); err != nil {
t.Logf(err.Error())
}
}
}