xds: emit a labeled gauge of connected xDS streams by version (#10243)

Fixes #10099
pull/10230/head
R.B. Boyer 2021-05-14 13:59:13 -05:00 committed by GitHub
parent 597448da47
commit ede14b7c54
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 88 additions and 1 deletions

3
.changelog/10243.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:feature
xds: emit a labeled gauge of connected xDS streams by version
```

View File

@ -26,6 +26,7 @@ import (
"github.com/hashicorp/consul/agent/router" "github.com/hashicorp/consul/agent/router"
"github.com/hashicorp/consul/agent/submatview" "github.com/hashicorp/consul/agent/submatview"
"github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/agent/xds"
"github.com/hashicorp/consul/ipaddr" "github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/logging" "github.com/hashicorp/consul/logging"
@ -195,6 +196,7 @@ func getPrometheusDefs(cfg lib.TelemetryConfig) ([]prometheus.GaugeDefinition, [
consul.RPCGauges, consul.RPCGauges,
consul.SessionGauges, consul.SessionGauges,
grpc.StatsGauges, grpc.StatsGauges,
xds.StatsGauges,
usagemetrics.Gauges, usagemetrics.Gauges,
consul.ReplicationGauges, consul.ReplicationGauges,
Gauges, Gauges,

View File

@ -31,6 +31,8 @@ type ADSDeltaStream = envoy_discovery_v3.AggregatedDiscoveryService_DeltaAggrega
// DeltaAggregatedResources implements envoy_discovery_v3.AggregatedDiscoveryServiceServer // DeltaAggregatedResources implements envoy_discovery_v3.AggregatedDiscoveryServiceServer
func (s *Server) DeltaAggregatedResources(stream ADSDeltaStream) error { func (s *Server) DeltaAggregatedResources(stream ADSDeltaStream) error {
defer s.activeStreams.Increment("v3")()
// a channel for receiving incoming requests // a channel for receiving incoming requests
reqCh := make(chan *envoy_discovery_v3.DeltaDiscoveryRequest) reqCh := make(chan *envoy_discovery_v3.DeltaDiscoveryRequest)
reqStop := int32(0) reqStop := int32(0)

View File

@ -59,6 +59,8 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
// Check no response sent yet // Check no response sent yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
requireProtocolVersionGauge(t, scenario, "v3", 1)
// Deliver a new snapshot (tcp with one tcp upstream) // Deliver a new snapshot (tcp with one tcp upstream)
mgr.DeliverConfig(t, sid, snap) mgr.DeliverConfig(t, sid, snap)

View File

@ -11,6 +11,8 @@ import (
envoy_discovery_v2 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" envoy_discovery_v2 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
envoy_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" envoy_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
@ -25,6 +27,13 @@ import (
"github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/consul/tlsutil"
) )
var StatsGauges = []prometheus.GaugeDefinition{
{
Name: []string{"xds", "server", "streams"},
Help: "Measures the number of active xDS streams handled by the server split by protocol version.",
},
}
// ADSStream is a shorter way of referring to this thing... // ADSStream is a shorter way of referring to this thing...
type ADSStream = envoy_discovery_v3.AggregatedDiscoveryService_StreamAggregatedResourcesServer type ADSStream = envoy_discovery_v3.AggregatedDiscoveryService_StreamAggregatedResourcesServer
type ADSStream_v2 = envoy_discovery_v2.AggregatedDiscoveryService_StreamAggregatedResourcesServer type ADSStream_v2 = envoy_discovery_v2.AggregatedDiscoveryService_StreamAggregatedResourcesServer
@ -141,6 +150,36 @@ type Server struct {
AuthCheckFrequency time.Duration AuthCheckFrequency time.Duration
DisableV2Protocol bool DisableV2Protocol bool
activeStreams activeStreamCounters
}
// activeStreamCounters simply encapsulates two counters accessed atomically to
// ensure alignment is correct.
type activeStreamCounters struct {
xDSv3 uint64
xDSv2 uint64
}
func (c *activeStreamCounters) Increment(xdsVersion string) func() {
var counter *uint64
switch xdsVersion {
case "v3":
counter = &c.xDSv3
case "v2":
counter = &c.xDSv2
default:
return func() {}
}
labels := []metrics.Label{{Name: "version", Value: xdsVersion}}
count := atomic.AddUint64(counter, 1)
metrics.SetGaugeWithLabels([]string{"xds", "server", "streams"}, float32(count), labels)
return func() {
count := atomic.AddUint64(counter, ^uint64(0))
metrics.SetGaugeWithLabels([]string{"xds", "server", "streams"}, float32(count), labels)
}
} }
func NewServer( func NewServer(
@ -171,6 +210,8 @@ func (s *Server) StreamAggregatedResources(stream ADSStream) error {
// Deprecated: remove when xDS v2 is no longer supported // Deprecated: remove when xDS v2 is no longer supported
func (s *Server) streamAggregatedResources(stream ADSStream) error { func (s *Server) streamAggregatedResources(stream ADSStream) error {
defer s.activeStreams.Increment("v2")()
// Note: despite dealing entirely in v3 protobufs, this function is // Note: despite dealing entirely in v3 protobufs, this function is
// exclusively used from the xDS v2 shim RPC handler, so the logging below // exclusively used from the xDS v2 shim RPC handler, so the logging below
// will refer to it as "v2". // will refer to it as "v2".

View File

@ -43,6 +43,8 @@ func TestServer_StreamAggregatedResources_v2_BasicProtocol_TCP(t *testing.T) {
// Check no response sent yet // Check no response sent yet
assertChanBlocked(t, envoy.stream.sendCh) assertChanBlocked(t, envoy.stream.sendCh)
requireProtocolVersionGauge(t, scenario, "v2", 1)
// Deliver a new snapshot // Deliver a new snapshot
snap := newTestSnapshot(t, nil, "") snap := newTestSnapshot(t, nil, "")
mgr.DeliverConfig(t, sid, snap) mgr.DeliverConfig(t, sid, snap)

View File

@ -19,6 +19,7 @@ import (
envoy_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" envoy_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
envoy_type_v3 "github.com/envoyproxy/go-control-plane/envoy/type/v3" envoy_type_v3 "github.com/envoyproxy/go-control-plane/envoy/type/v3"
"github.com/armon/go-metrics"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/wrappers" "github.com/golang/protobuf/ptypes/wrappers"
@ -118,6 +119,7 @@ type testServerScenario struct {
server *Server server *Server
mgr *testManager mgr *testManager
envoy *TestEnvoy envoy *TestEnvoy
sink *metrics.InmemSink
errCh <-chan error errCh <-chan error
} }
@ -155,6 +157,17 @@ func newTestServerScenarioInner(
envoy.Close() envoy.Close()
}) })
sink := metrics.NewInmemSink(1*time.Minute, 1*time.Minute)
cfg := metrics.DefaultConfig("consul.xds.test")
cfg.EnableHostname = false
cfg.EnableRuntimeMetrics = false
metrics.NewGlobal(cfg, sink)
t.Cleanup(func() {
sink := &metrics.BlackholeSink{}
metrics.NewGlobal(cfg, sink)
})
s := NewServer( s := NewServer(
testutil.Logger(t), testutil.Logger(t),
mgr, mgr,
@ -178,6 +191,7 @@ func newTestServerScenarioInner(
server: s, server: s,
mgr: mgr, mgr: mgr,
envoy: envoy, envoy: envoy,
sink: sink,
errCh: errCh, errCh: errCh,
} }
} }
@ -647,3 +661,23 @@ func runStep(t *testing.T, name string, fn func(t *testing.T)) {
t.FailNow() t.FailNow()
} }
} }
func requireProtocolVersionGauge(
t *testing.T,
scenario *testServerScenario,
xdsVersion string,
expected int,
) {
data := scenario.sink.Data()
require.Len(t, data, 1)
item := data[0]
require.Len(t, item.Gauges, 1)
val, ok := item.Gauges["consul.xds.test.xds.server.streams;version="+xdsVersion]
require.True(t, ok)
require.Equal(t, "consul.xds.test.xds.server.streams", val.Name)
require.Equal(t, expected, int(val.Value))
require.Equal(t, []metrics.Label{{Name: "version", Value: xdsVersion}}, val.Labels)
}

View File

@ -417,7 +417,8 @@ These metrics are used to monitor the health of the Consul servers.
| `consul.grpc.server.connection.count` | Counts the number of new gRPC connections received by the server. | connections | counter | | `consul.grpc.server.connection.count` | Counts the number of new gRPC connections received by the server. | connections | counter |
| `consul.grpc.server.connections` | Measures the number of active gRPC connections open on the server. | connections | gauge | | `consul.grpc.server.connections` | Measures the number of active gRPC connections open on the server. | connections | gauge |
| `consul.grpc.server.stream.count` | Counts the number of new gRPC streams received by the server. | streams | counter | | `consul.grpc.server.stream.count` | Counts the number of new gRPC streams received by the server. | streams | counter |
| `consul.grpc.server.streams` | Measures the number of active gRPC streams handled by the server. | streams | guage | | `consul.grpc.server.streams` | Measures the number of active gRPC streams handled by the server. | streams | gauge |
| `consul.xds.server.streams` | Measures the number of active xDS streams handled by the server split by protocol version. | streams | gauge |
## Cluster Health ## Cluster Health