diff --git a/agent/grpc/client.go b/agent/grpc/client.go index 783cbae36e..6a3feaf7a7 100644 --- a/agent/grpc/client.go +++ b/agent/grpc/client.go @@ -61,8 +61,7 @@ func (c *ClientConnPool) ClientConn(datacenter string) (*grpc.ClientConn, error) grpc.WithInsecure(), grpc.WithContextDialer(c.dialer), grpc.WithDisableRetry(), - // TODO: previously this statsHandler was shared with the Handler. Is that necessary? - grpc.WithStatsHandler(newStatsHandler()), + grpc.WithStatsHandler(newStatsHandler(defaultMetrics)), // nolint:staticcheck // there is no other supported alternative to WithBalancerName grpc.WithBalancerName("pick_first")) if err != nil { diff --git a/agent/grpc/handler.go b/agent/grpc/handler.go index e78231276d..d381b9da83 100644 --- a/agent/grpc/handler.go +++ b/agent/grpc/handler.go @@ -17,8 +17,8 @@ func NewHandler(addr net.Addr, register func(server *grpc.Server)) *Handler { // We don't need to pass tls.Config to the server since it's multiplexed // behind the RPC listener, which already has TLS configured. srv := grpc.NewServer( - grpc.StatsHandler(newStatsHandler()), - grpc.StreamInterceptor((&activeStreamCounter{}).Intercept), + grpc.StatsHandler(newStatsHandler(defaultMetrics)), + grpc.StreamInterceptor((&activeStreamCounter{metrics: defaultMetrics}).Intercept), ) register(srv) diff --git a/agent/grpc/stats.go b/agent/grpc/stats.go index d25048110d..16961e7f0a 100644 --- a/agent/grpc/stats.go +++ b/agent/grpc/stats.go @@ -18,8 +18,8 @@ type statsHandler struct { activeConns uint64 // must be 8-byte aligned for atomic access } -func newStatsHandler() *statsHandler { - return &statsHandler{metrics: defaultMetrics} +func newStatsHandler(m *metrics.Metrics) *statsHandler { + return &statsHandler{metrics: m} } // TagRPC implements grpcStats.StatsHandler @@ -64,6 +64,7 @@ func (c *statsHandler) HandleConn(_ context.Context, s stats.ConnStats) { } type activeStreamCounter struct { + metrics *metrics.Metrics // count of the number of open streaming RPCs on a server. It is accessed // atomically. count uint64 @@ -78,10 +79,10 @@ func (i *activeStreamCounter) Intercept( handler grpc.StreamHandler, ) error { count := atomic.AddUint64(&i.count, 1) - defaultMetrics.SetGauge([]string{"grpc", "server", "active_streams"}, float32(count)) + i.metrics.SetGauge([]string{"grpc", "server", "active_streams"}, float32(count)) defer func() { count := atomic.AddUint64(&i.count, ^uint64(0)) - defaultMetrics.SetGauge([]string{"grpc", "server", "active_streams"}, float32(count)) + i.metrics.SetGauge([]string{"grpc", "server", "active_streams"}, float32(count)) }() return handler(srv, ss) diff --git a/agent/grpc/stats_test.go b/agent/grpc/stats_test.go index f16cfb9cbe..ea4cb70b24 100644 --- a/agent/grpc/stats_test.go +++ b/agent/grpc/stats_test.go @@ -20,10 +20,11 @@ import ( func noopRegister(*grpc.Server) {} func TestHandler_EmitsStats(t *testing.T) { - sink := patchGlobalMetrics(t) + sink, reset := patchGlobalMetrics(t) addr := &net.IPAddr{IP: net.ParseIP("127.0.0.1")} handler := NewHandler(addr, noopRegister) + reset() testservice.RegisterSimpleServer(handler.srv, &simple{}) @@ -99,7 +100,7 @@ func assertDeepEqual(t *testing.T, x, y interface{}, opts ...cmp.Option) { } } -func patchGlobalMetrics(t *testing.T) *fakeMetricsSink { +func patchGlobalMetrics(t *testing.T) (*fakeMetricsSink, func()) { t.Helper() sink := &fakeMetricsSink{} @@ -112,11 +113,12 @@ func patchGlobalMetrics(t *testing.T) *fakeMetricsSink { var err error defaultMetrics, err = metrics.New(cfg, sink) require.NoError(t, err) - t.Cleanup(func() { - _, err = metrics.NewGlobal(cfg, &metrics.BlackholeSink{}) + reset := func() { + t.Helper() + defaultMetrics, err = metrics.New(cfg, &metrics.BlackholeSink{}) require.NoError(t, err, "failed to reset global metrics") - }) - return sink + } + return sink, reset } type fakeMetricsSink struct {