mirror of https://github.com/hashicorp/consul
agent/grpc: pass metrics to constructor
Instead of referencing a package var. This does not fix the flaky test, but it seems more correct.pull/8961/head
parent
d19657404f
commit
8a785a351c
|
@ -61,8 +61,7 @@ func (c *ClientConnPool) ClientConn(datacenter string) (*grpc.ClientConn, error)
|
||||||
grpc.WithInsecure(),
|
grpc.WithInsecure(),
|
||||||
grpc.WithContextDialer(c.dialer),
|
grpc.WithContextDialer(c.dialer),
|
||||||
grpc.WithDisableRetry(),
|
grpc.WithDisableRetry(),
|
||||||
// TODO: previously this statsHandler was shared with the Handler. Is that necessary?
|
grpc.WithStatsHandler(newStatsHandler(defaultMetrics)),
|
||||||
grpc.WithStatsHandler(newStatsHandler()),
|
|
||||||
// nolint:staticcheck // there is no other supported alternative to WithBalancerName
|
// nolint:staticcheck // there is no other supported alternative to WithBalancerName
|
||||||
grpc.WithBalancerName("pick_first"))
|
grpc.WithBalancerName("pick_first"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -17,8 +17,8 @@ func NewHandler(addr net.Addr, register func(server *grpc.Server)) *Handler {
|
||||||
// We don't need to pass tls.Config to the server since it's multiplexed
|
// We don't need to pass tls.Config to the server since it's multiplexed
|
||||||
// behind the RPC listener, which already has TLS configured.
|
// behind the RPC listener, which already has TLS configured.
|
||||||
srv := grpc.NewServer(
|
srv := grpc.NewServer(
|
||||||
grpc.StatsHandler(newStatsHandler()),
|
grpc.StatsHandler(newStatsHandler(defaultMetrics)),
|
||||||
grpc.StreamInterceptor((&activeStreamCounter{}).Intercept),
|
grpc.StreamInterceptor((&activeStreamCounter{metrics: defaultMetrics}).Intercept),
|
||||||
)
|
)
|
||||||
register(srv)
|
register(srv)
|
||||||
|
|
||||||
|
|
|
@ -18,8 +18,8 @@ type statsHandler struct {
|
||||||
activeConns uint64 // must be 8-byte aligned for atomic access
|
activeConns uint64 // must be 8-byte aligned for atomic access
|
||||||
}
|
}
|
||||||
|
|
||||||
func newStatsHandler() *statsHandler {
|
func newStatsHandler(m *metrics.Metrics) *statsHandler {
|
||||||
return &statsHandler{metrics: defaultMetrics}
|
return &statsHandler{metrics: m}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TagRPC implements grpcStats.StatsHandler
|
// TagRPC implements grpcStats.StatsHandler
|
||||||
|
@ -64,6 +64,7 @@ func (c *statsHandler) HandleConn(_ context.Context, s stats.ConnStats) {
|
||||||
}
|
}
|
||||||
|
|
||||||
type activeStreamCounter struct {
|
type activeStreamCounter struct {
|
||||||
|
metrics *metrics.Metrics
|
||||||
// count of the number of open streaming RPCs on a server. It is accessed
|
// count of the number of open streaming RPCs on a server. It is accessed
|
||||||
// atomically.
|
// atomically.
|
||||||
count uint64
|
count uint64
|
||||||
|
@ -78,10 +79,10 @@ func (i *activeStreamCounter) Intercept(
|
||||||
handler grpc.StreamHandler,
|
handler grpc.StreamHandler,
|
||||||
) error {
|
) error {
|
||||||
count := atomic.AddUint64(&i.count, 1)
|
count := atomic.AddUint64(&i.count, 1)
|
||||||
defaultMetrics.SetGauge([]string{"grpc", "server", "active_streams"}, float32(count))
|
i.metrics.SetGauge([]string{"grpc", "server", "active_streams"}, float32(count))
|
||||||
defer func() {
|
defer func() {
|
||||||
count := atomic.AddUint64(&i.count, ^uint64(0))
|
count := atomic.AddUint64(&i.count, ^uint64(0))
|
||||||
defaultMetrics.SetGauge([]string{"grpc", "server", "active_streams"}, float32(count))
|
i.metrics.SetGauge([]string{"grpc", "server", "active_streams"}, float32(count))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return handler(srv, ss)
|
return handler(srv, ss)
|
||||||
|
|
|
@ -20,10 +20,11 @@ import (
|
||||||
func noopRegister(*grpc.Server) {}
|
func noopRegister(*grpc.Server) {}
|
||||||
|
|
||||||
func TestHandler_EmitsStats(t *testing.T) {
|
func TestHandler_EmitsStats(t *testing.T) {
|
||||||
sink := patchGlobalMetrics(t)
|
sink, reset := patchGlobalMetrics(t)
|
||||||
|
|
||||||
addr := &net.IPAddr{IP: net.ParseIP("127.0.0.1")}
|
addr := &net.IPAddr{IP: net.ParseIP("127.0.0.1")}
|
||||||
handler := NewHandler(addr, noopRegister)
|
handler := NewHandler(addr, noopRegister)
|
||||||
|
reset()
|
||||||
|
|
||||||
testservice.RegisterSimpleServer(handler.srv, &simple{})
|
testservice.RegisterSimpleServer(handler.srv, &simple{})
|
||||||
|
|
||||||
|
@ -99,7 +100,7 @@ func assertDeepEqual(t *testing.T, x, y interface{}, opts ...cmp.Option) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func patchGlobalMetrics(t *testing.T) *fakeMetricsSink {
|
func patchGlobalMetrics(t *testing.T) (*fakeMetricsSink, func()) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
sink := &fakeMetricsSink{}
|
sink := &fakeMetricsSink{}
|
||||||
|
@ -112,11 +113,12 @@ func patchGlobalMetrics(t *testing.T) *fakeMetricsSink {
|
||||||
var err error
|
var err error
|
||||||
defaultMetrics, err = metrics.New(cfg, sink)
|
defaultMetrics, err = metrics.New(cfg, sink)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
t.Cleanup(func() {
|
reset := func() {
|
||||||
_, err = metrics.NewGlobal(cfg, &metrics.BlackholeSink{})
|
t.Helper()
|
||||||
|
defaultMetrics, err = metrics.New(cfg, &metrics.BlackholeSink{})
|
||||||
require.NoError(t, err, "failed to reset global metrics")
|
require.NoError(t, err, "failed to reset global metrics")
|
||||||
})
|
}
|
||||||
return sink
|
return sink, reset
|
||||||
}
|
}
|
||||||
|
|
||||||
type fakeMetricsSink struct {
|
type fakeMetricsSink struct {
|
||||||
|
|
Loading…
Reference in New Issue