From 5b2300e63fec172b6d465304cfb423b5b6f9f8ef Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Fri, 13 Nov 2020 13:27:27 -0500 Subject: [PATCH 1/6] docs: Fix reference to dns_config.use_cache --- website/content/docs/agent/options.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/website/content/docs/agent/options.mdx b/website/content/docs/agent/options.mdx index 28b1a0023d..1f274187d4 100644 --- a/website/content/docs/agent/options.mdx +++ b/website/content/docs/agent/options.mdx @@ -2213,7 +2213,7 @@ bind_addr = "{{ GetPrivateInterfaces | include \"network\" \"10.0.0.0/8\" | attr streaming rpc, instead of the traditional blocking queries, for endpoints which support streaming. All servers must have [`rpc.enable_streaming`](#rpc_enable_streaming) enabled before any client can enable `use_streaming_backend`. - At least one of [`dns.use_cache`](#dns_use_cache) or + At least one of [`dns_config.use_cache`](#dns_use_cache) or [`http_config.use_cache`](#http_config_use_cache) must be enabled, otherwise this setting has no effect. `use_streaming_backend` will default to true in a future version of Consul. From 1d2d15b1e135faa2efc553dcd6bb41855d9c0d2a Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Fri, 13 Nov 2020 15:05:16 -0500 Subject: [PATCH 2/6] agent: add a test for streaming in the service health endpoint Co-authored-by: Paul Banks --- agent/health_endpoint_test.go | 176 +++++++++++++++++++++++++++++++++- agent/rpc/subscribe/logger.go | 2 +- agent/testagent.go | 6 +- 3 files changed, 176 insertions(+), 8 deletions(-) diff --git a/agent/health_endpoint_test.go b/agent/health_endpoint_test.go index 975c42d556..d54f464614 100644 --- a/agent/health_endpoint_test.go +++ b/agent/health_endpoint_test.go @@ -8,15 +8,19 @@ import ( "net/http/httptest" "net/url" "reflect" + "strconv" "testing" + "time" + + "github.com/armon/go-metrics" + "github.com/hashicorp/serf/coordinate" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" - "github.com/hashicorp/serf/coordinate" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func TestHealthChecksInState(t *testing.T) { @@ -716,6 +720,12 @@ func TestHealthServiceNodes(t *testing.T) { if len(nodes) != 2 { r.Fatalf("Want 2 nodes") } + header := resp.Header().Get("X-Consul-Index") + if header == "" || header == "0" { + r.Fatalf("Want non-zero header: %q", header) + } + _, err = strconv.ParseUint(header, 10, 64) + r.Check(err) // Should be a cache hit! The data should've updated in the cache // in the background so this should've been fetched directly from @@ -727,6 +737,166 @@ func TestHealthServiceNodes(t *testing.T) { } } +func TestHealthServiceNodes_Blocking(t *testing.T) { + cases := []struct { + name string + hcl string + grpcMetrics bool + }{ + {name: "no streaming"}, + { + name: "streaming", + grpcMetrics: true, + hcl: ` +rpc { enable_streaming = true } +use_streaming_backend = true +`, + }, + } + + for _, tc := range cases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + + sink := metrics.NewInmemSink(5*time.Second, time.Minute) + metrics.NewGlobal(&metrics.Config{ + ServiceName: "testing", + AllowedPrefixes: []string{"testing.grpc."}, + }, sink) + + a := NewTestAgent(t, tc.hcl) + defer a.Shutdown() + testrpc.WaitForTestAgent(t, a.RPC, "dc1") + + // Register some initial service instances + for i := 0; i < 2; i++ { + args := &structs.RegisterRequest{ + Datacenter: "dc1", + Node: "bar", + Address: "127.0.0.1", + Service: &structs.NodeService{ + ID: fmt.Sprintf("test%03d", i), + Service: "test", + }, + } + + var out struct{} + require.NoError(t, a.RPC("Catalog.Register", args, &out)) + } + + // Initial request should return two instances + req, _ := http.NewRequest("GET", "/v1/health/service/test?dc=dc1", nil) + resp := httptest.NewRecorder() + obj, err := a.srv.HealthServiceNodes(resp, req) + require.NoError(t, err) + + nodes := obj.(structs.CheckServiceNodes) + require.Len(t, nodes, 2) + + idx := getIndex(t, resp) + require.True(t, idx > 0) + + // errCh collects errors from goroutines since it's unsafe for them to use + // t to fail tests directly. + errCh := make(chan error, 1) + + checkErrs := func() { + // Ensure no errors were sent on errCh and drain any nils we have + for { + select { + case err := <-errCh: + require.NoError(t, err) + default: + return + } + } + } + + // Blocking on that index should block. We test that by launching another + // goroutine that will wait a while before updating the registration and + // make sure that we unblock before timeout and see the update but that it + // takes at least as long as the sleep time. + sleep := 200 * time.Millisecond + start := time.Now() + go func() { + time.Sleep(sleep) + + args := &structs.RegisterRequest{ + Datacenter: "dc1", + Node: "zoo", + Address: "127.0.0.3", + Service: &structs.NodeService{ + ID: "test", + Service: "test", + }, + } + + var out struct{} + errCh <- a.RPC("Catalog.Register", args, &out) + }() + + { + timeout := 30 * time.Second + url := fmt.Sprintf("/v1/health/service/test?dc=dc1&index=%d&wait=%s", idx, timeout) + req, _ := http.NewRequest("GET", url, nil) + resp := httptest.NewRecorder() + obj, err := a.srv.HealthServiceNodes(resp, req) + require.NoError(t, err) + elapsed := time.Since(start) + require.True(t, elapsed > sleep, "request should block for at "+ + " least as long as sleep. sleep=%s, elapsed=%s", sleep, elapsed) + + require.True(t, elapsed < timeout, "request should unblock before"+ + " it timed out. timeout=%s, elapsed=%s", timeout, elapsed) + + nodes := obj.(structs.CheckServiceNodes) + require.Len(t, nodes, 3) + + newIdx := getIndex(t, resp) + require.True(t, idx < newIdx, "index should have increased."+ + "idx=%d, newIdx=%d", idx, newIdx) + + idx = newIdx + + checkErrs() + } + + // Blocking should last until timeout in absence of updates + start = time.Now() + { + timeout := 200 * time.Millisecond + url := fmt.Sprintf("/v1/health/service/test?dc=dc1&index=%d&wait=%s", + idx, timeout) + req, _ := http.NewRequest("GET", url, nil) + resp := httptest.NewRecorder() + obj, err := a.srv.HealthServiceNodes(resp, req) + require.NoError(t, err) + elapsed := time.Since(start) + // Note that servers add jitter to timeout requested but don't remove it + // so this should always be true. + require.True(t, elapsed > timeout, "request should block for at "+ + " least as long as timeout. timeout=%s, elapsed=%s", timeout, elapsed) + + nodes := obj.(structs.CheckServiceNodes) + require.Len(t, nodes, 3) + + newIdx := getIndex(t, resp) + require.Equal(t, idx, newIdx) + } + + if tc.grpcMetrics { + data := sink.Data() + if l := len(data); l < 1 { + t.Errorf("expected at least 1 metrics interval, got :%v", l) + } + if count := len(data[0].Gauges); count < 2 { + t.Errorf("expected at least 2 grpc gauge metrics, got: %v", count) + } + } + }) + } +} + func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") diff --git a/agent/rpc/subscribe/logger.go b/agent/rpc/subscribe/logger.go index 99394f5465..693c8604af 100644 --- a/agent/rpc/subscribe/logger.go +++ b/agent/rpc/subscribe/logger.go @@ -43,7 +43,7 @@ func newLoggerForRequest(l Logger, req *pbsubscribe.SubscribeRequest) Logger { "dc", req.Datacenter, "key", req.Key, "namespace", req.Namespace, - "index", req.Index, + "request_index", req.Index, "stream_id", &streamID{}) } diff --git a/agent/testagent.go b/agent/testagent.go index c58c6a6c5d..3bbfe0cbe3 100644 --- a/agent/testagent.go +++ b/agent/testagent.go @@ -21,8 +21,6 @@ import ( uuid "github.com/hashicorp/go-uuid" "github.com/stretchr/testify/require" - "github.com/hashicorp/consul/sdk/testutil" - "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/connect" @@ -30,6 +28,7 @@ import ( "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/sdk/freeport" + "github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/tlsutil" ) @@ -55,8 +54,7 @@ type TestAgent struct { // when Shutdown() is called. Config *config.RuntimeConfig - // LogOutput is the sink for the logs. If nil, logs are written - // to os.Stderr. + // LogOutput is the sink for the logs. If nil, logs are written to os.Stderr. LogOutput io.Writer // DataDir may be set to a directory which exists. If is it not set, From b7f8e3bad2b46c89d2523f40c5f34887b9d83c40 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Fri, 13 Nov 2020 19:25:28 -0500 Subject: [PATCH 3/6] state: Add a test for ServiceHealthSnapshot --- agent/consul/state/catalog.go | 4 + agent/consul/state/catalog_events.go | 2 - agent/consul/state/catalog_events_test.go | 96 ++++++++++++++++++++++- 3 files changed, 98 insertions(+), 4 deletions(-) diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 52714468ff..71a31d2725 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -2089,6 +2089,10 @@ func (s *Store) GatewayServices(ws memdb.WatchSet, gateway string, entMeta *stru // parseCheckServiceNodes is used to parse through a given set of services, // and query for an associated node and a set of checks. This is the inner // method used to return a rich set of results from a more simple query. +// +// TODO: idx parameter is not used except as a return value. Remove it. +// TODO: err parameter is only used for early return. Remove it and check from the +// caller. func parseCheckServiceNodes( tx ReadTxn, ws memdb.WatchSet, idx uint64, services structs.ServiceNodes, diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go index d18b720700..e88330c9cd 100644 --- a/agent/consul/state/catalog_events.go +++ b/agent/consul/state/catalog_events.go @@ -47,8 +47,6 @@ func (e EventPayloadCheckServiceNode) MatchesKey(key, namespace string) bool { // serviceHealthSnapshot returns a stream.SnapshotFunc that provides a snapshot // of stream.Events that describe the current state of a service health query. -// -// TODO: no tests for this yet func serviceHealthSnapshot(db ReadDB, topic stream.Topic) stream.SnapshotFunc { return func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (index uint64, err error) { tx := db.ReadTxn() diff --git a/agent/consul/state/catalog_events_test.go b/agent/consul/state/catalog_events_test.go index f7fa21df0b..1d1c0f2f35 100644 --- a/agent/consul/state/catalog_events_test.go +++ b/agent/consul/state/catalog_events_test.go @@ -8,15 +8,107 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" "github.com/stretchr/testify/require" - "github.com/hashicorp/consul/proto/pbcommon" - "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/proto/pbcommon" "github.com/hashicorp/consul/proto/pbsubscribe" "github.com/hashicorp/consul/types" ) +func TestServiceHealthSnapshot(t *testing.T) { + store := NewStateStore(nil) + + counter := newIndexCounter() + err := store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "db")) + require.NoError(t, err) + err = store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "web")) + require.NoError(t, err) + err = store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "web", regNode2)) + require.NoError(t, err) + + fn := serviceHealthSnapshot((*readDB)(store.db.db), topicServiceHealth) + buf := &snapshotAppender{} + req := stream.SubscribeRequest{Key: "web"} + + idx, err := fn(req, buf) + require.NoError(t, err) + require.Equal(t, counter.Last(), idx) + + expected := [][]stream.Event{ + { + testServiceHealthEvent(t, "web", func(e *stream.Event) error { + e.Index = counter.Last() + csn := getPayloadCheckServiceNode(e.Payload) + csn.Node.CreateIndex = 1 + csn.Node.ModifyIndex = 1 + csn.Service.CreateIndex = 2 + csn.Service.ModifyIndex = 2 + csn.Checks[0].CreateIndex = 1 + csn.Checks[0].ModifyIndex = 1 + csn.Checks[1].CreateIndex = 2 + csn.Checks[1].ModifyIndex = 2 + return nil + }), + }, + { + testServiceHealthEvent(t, "web", evNode2, func(e *stream.Event) error { + e.Index = counter.Last() + csn := getPayloadCheckServiceNode(e.Payload) + csn.Node.CreateIndex = 3 + csn.Node.ModifyIndex = 3 + csn.Service.CreateIndex = 3 + csn.Service.ModifyIndex = 3 + for i := range csn.Checks { + csn.Checks[i].CreateIndex = 3 + csn.Checks[i].ModifyIndex = 3 + } + return nil + }), + }, + } + assertDeepEqual(t, expected, buf.events, cmpEvents) +} + +type snapshotAppender struct { + events [][]stream.Event +} + +func (s *snapshotAppender) Append(events []stream.Event) { + s.events = append(s.events, events) +} + +type indexCounter struct { + value uint64 +} + +func (c *indexCounter) Next() uint64 { + c.value++ + return c.value +} + +func (c *indexCounter) Last() uint64 { + return c.value +} + +func newIndexCounter() *indexCounter { + return &indexCounter{} +} + +var _ stream.SnapshotAppender = (*snapshotAppender)(nil) + +func evIndexes(idx, create, modify uint64) func(e *stream.Event) error { + return func(e *stream.Event) error { + e.Index = idx + csn := getPayloadCheckServiceNode(e.Payload) + csn.Node.CreateIndex = create + csn.Node.ModifyIndex = modify + csn.Service.CreateIndex = create + csn.Service.ModifyIndex = modify + return nil + } +} + func TestServiceHealthEventsFromChanges(t *testing.T) { cases := []struct { Name string From 432dd2d204bc7529ff74ea74e6d3b2268c1d239b Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Tue, 17 Nov 2020 15:00:36 -0500 Subject: [PATCH 4/6] consul: Add integration tests of streaming. Restored from streaming-rpc-final branch. Co-authored-by: Paul Banks --- agent/consul/subscribe_backend_test.go | 462 +++++++++++++++++++++++++ 1 file changed, 462 insertions(+) create mode 100644 agent/consul/subscribe_backend_test.go diff --git a/agent/consul/subscribe_backend_test.go b/agent/consul/subscribe_backend_test.go new file mode 100644 index 0000000000..e7debf0ab5 --- /dev/null +++ b/agent/consul/subscribe_backend_test.go @@ -0,0 +1,462 @@ +package consul + +import ( + "context" + "fmt" + "io" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" + gogrpc "google.golang.org/grpc" + grpcresolver "google.golang.org/grpc/resolver" + + grpc "github.com/hashicorp/consul/agent/grpc" + "github.com/hashicorp/consul/agent/grpc/resolver" + "github.com/hashicorp/consul/agent/router" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/proto/pbservice" + "github.com/hashicorp/consul/proto/pbsubscribe" + "github.com/hashicorp/consul/testrpc" +) + +func TestSubscribeBackend_IntegrationWithServer_TLSEnabled(t *testing.T) { + t.Parallel() + + _, conf1 := testServerConfig(t) + conf1.VerifyIncoming = true + conf1.VerifyOutgoing = true + conf1.RPCConfig.EnableStreaming = true + configureTLS(conf1) + server, err := newServer(t, conf1) + require.NoError(t, err) + defer server.Shutdown() + + client, builder := newClientWithGRPCResolver(t, configureTLS, clientConfigVerifyOutgoing) + + // Try to join + testrpc.WaitForLeader(t, server.RPC, "dc1") + joinLAN(t, client, server) + testrpc.WaitForTestAgent(t, client.RPC, "dc1") + + // Register a dummy node with our service on it. + { + req := &structs.RegisterRequest{ + Node: "node1", + Address: "3.4.5.6", + Datacenter: "dc1", + Service: &structs.NodeService{ + ID: "redis1", + Service: "redis", + Address: "3.4.5.6", + Port: 8080, + }, + } + var out struct{} + require.NoError(t, server.RPC("Catalog.Register", &req, &out)) + } + + // Start a Subscribe call to our streaming endpoint from the client. + { + pool := grpc.NewClientConnPool(builder, grpc.TLSWrapper(client.tlsConfigurator.OutgoingRPCWrapper()), client.tlsConfigurator.UseTLS) + conn, err := pool.ClientConn("dc1") + require.NoError(t, err) + + streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + req := &pbsubscribe.SubscribeRequest{Topic: pbsubscribe.Topic_ServiceHealth, Key: "redis"} + streamHandle, err := streamClient.Subscribe(ctx, req) + require.NoError(t, err) + + // Start a goroutine to read updates off the pbsubscribe. + eventCh := make(chan *pbsubscribe.Event, 0) + go receiveSubscribeEvents(t, eventCh, streamHandle) + + var snapshotEvents []*pbsubscribe.Event + for i := 0; i < 2; i++ { + select { + case event := <-eventCh: + snapshotEvents = append(snapshotEvents, event) + case <-time.After(3 * time.Second): + t.Fatalf("did not receive events past %d", len(snapshotEvents)) + } + } + + // Make sure the snapshot events come back with no issues. + require.Len(t, snapshotEvents, 2) + } + + // Start a Subscribe call to our streaming endpoint from the server's loopback client. + { + + pool := grpc.NewClientConnPool(builder, grpc.TLSWrapper(client.tlsConfigurator.OutgoingRPCWrapper()), client.tlsConfigurator.UseTLS) + conn, err := pool.ClientConn("dc1") + require.NoError(t, err) + + retryFailedConn(t, conn) + + streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + req := &pbsubscribe.SubscribeRequest{Topic: pbsubscribe.Topic_ServiceHealth, Key: "redis"} + streamHandle, err := streamClient.Subscribe(ctx, req) + require.NoError(t, err) + + // Start a goroutine to read updates off the pbsubscribe. + eventCh := make(chan *pbsubscribe.Event, 0) + go receiveSubscribeEvents(t, eventCh, streamHandle) + + var snapshotEvents []*pbsubscribe.Event + for i := 0; i < 2; i++ { + select { + case event := <-eventCh: + snapshotEvents = append(snapshotEvents, event) + case <-time.After(3 * time.Second): + t.Fatalf("did not receive events past %d", len(snapshotEvents)) + } + } + + // Make sure the snapshot events come back with no issues. + require.Len(t, snapshotEvents, 2) + } +} + +// receiveSubscribeEvents and send them to the channel. +func receiveSubscribeEvents(t *testing.T, ch chan *pbsubscribe.Event, handle pbsubscribe.StateChangeSubscription_SubscribeClient) { + for { + event, err := handle.Recv() + if err == io.EOF { + break + } + if err != nil { + if strings.Contains(err.Error(), "context deadline exceeded") || + strings.Contains(err.Error(), "context canceled") { + break + } + t.Log(err) + } + ch <- event + } +} + +func TestSubscribeBackend_IntegrationWithServer_TLSReload(t *testing.T) { + t.Parallel() + + // Set up a server with initially bad certificates. + _, conf1 := testServerConfig(t) + conf1.VerifyIncoming = true + conf1.VerifyOutgoing = true + conf1.CAFile = "../../test/ca/root.cer" + conf1.CertFile = "../../test/key/ssl-cert-snakeoil.pem" + conf1.KeyFile = "../../test/key/ssl-cert-snakeoil.key" + conf1.RPCConfig.EnableStreaming = true + + server, err := newServer(t, conf1) + require.NoError(t, err) + defer server.Shutdown() + + // Set up a client with valid certs and verify_outgoing = true + client, builder := newClientWithGRPCResolver(t, configureTLS, clientConfigVerifyOutgoing) + + testrpc.WaitForLeader(t, server.RPC, "dc1") + + // Subscribe calls should fail initially + joinLAN(t, client, server) + + pool := grpc.NewClientConnPool(builder, grpc.TLSWrapper(client.tlsConfigurator.OutgoingRPCWrapper()), client.tlsConfigurator.UseTLS) + conn, err := pool.ClientConn("dc1") + require.NoError(t, err) + + streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + req := &pbsubscribe.SubscribeRequest{Topic: pbsubscribe.Topic_ServiceHealth, Key: "redis"} + _, err = streamClient.Subscribe(ctx, req) + require.Error(t, err) + + // Reload the server with valid certs + newConf := server.config.ToTLSUtilConfig() + newConf.CertFile = "../../test/key/ourdomain.cer" + newConf.KeyFile = "../../test/key/ourdomain.key" + server.tlsConfigurator.Update(newConf) + + // Try the subscribe call again + retryFailedConn(t, conn) + + streamClient = pbsubscribe.NewStateChangeSubscriptionClient(conn) + _, err = streamClient.Subscribe(ctx, req) + require.NoError(t, err) +} + +func clientConfigVerifyOutgoing(config *Config) { + config.VerifyOutgoing = true +} + +// retryFailedConn forces the ClientConn to reset its backoff timer and retry the connection, +// to simulate the client eventually retrying after the initial failure. This is used both to simulate +// retrying after an expected failure as well as to avoid flakiness when running many tests in parallel. +func retryFailedConn(t *testing.T, conn *gogrpc.ClientConn) { + state := conn.GetState() + if state.String() != "TRANSIENT_FAILURE" { + return + } + + // If the connection has failed, retry and wait for a state change. + conn.ResetConnectBackoff() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + require.True(t, conn.WaitForStateChange(ctx, state)) +} + +func TestSubscribeBackend_IntegrationWithServer_DeliversAllMessages(t *testing.T) { + if testing.Short() { + t.Skip("too slow for -short run") + } + // This is a fuzz/probabilistic test to try to provoke streaming into dropping + // messages. There is a bug in the initial implementation that should make + // this fail. While we can't be certain a pass means it's correct, it is + // useful for finding bugs in our concurrency design. + + // The issue is that when updates are coming in fast such that updates occur + // in between us making the snapshot and beginning the stream updates, we + // shouldn't miss anything. + + // To test this, we will run a background goroutine that will write updates as + // fast as possible while we then try to stream the results and ensure that we + // see every change. We'll make the updates monotonically increasing so we can + // easily tell if we missed one. + + _, server := testServerWithConfig(t, func(c *Config) { + c.Datacenter = "dc1" + c.Bootstrap = true + c.RPCConfig.EnableStreaming = true + }) + defer server.Shutdown() + codec := rpcClient(t, server) + defer codec.Close() + + client, builder := newClientWithGRPCResolver(t) + + // Try to join + testrpc.WaitForLeader(t, server.RPC, "dc1") + joinLAN(t, client, server) + testrpc.WaitForTestAgent(t, client.RPC, "dc1") + + // Register a whole bunch of service instances so that the initial snapshot on + // subscribe is big enough to take a bit of time to load giving more + // opportunity for missed updates if there is a bug. + for i := 0; i < 1000; i++ { + req := &structs.RegisterRequest{ + Node: fmt.Sprintf("node-redis-%03d", i), + Address: "3.4.5.6", + Datacenter: "dc1", + Service: &structs.NodeService{ + ID: fmt.Sprintf("redis-%03d", i), + Service: "redis", + Port: 11211, + }, + } + var out struct{} + require.NoError(t, server.RPC("Catalog.Register", &req, &out)) + } + + // Start background writer + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + go func() { + // Update the registration with a monotonically increasing port as fast as + // we can. + req := &structs.RegisterRequest{ + Node: "node1", + Address: "3.4.5.6", + Datacenter: "dc1", + Service: &structs.NodeService{ + ID: "redis-canary", + Service: "redis", + Port: 0, + }, + } + for { + if ctx.Err() != nil { + return + } + var out struct{} + require.NoError(t, server.RPC("Catalog.Register", &req, &out)) + req.Service.Port++ + if req.Service.Port > 100 { + return + } + time.Sleep(1 * time.Millisecond) + } + }() + + pool := grpc.NewClientConnPool(builder, grpc.TLSWrapper(client.tlsConfigurator.OutgoingRPCWrapper()), client.tlsConfigurator.UseTLS) + conn, err := pool.ClientConn("dc1") + require.NoError(t, err) + + streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn) + + // Now start a whole bunch of streamers in parallel to maximise chance of + // catching a race. + n := 5 + var wg sync.WaitGroup + var updateCount uint64 + // Buffered error chan so that workers can exit and terminate wg without + // blocking on send. We collect errors this way since t isn't thread safe. + errCh := make(chan error, n) + for i := 0; i < n; i++ { + i := i + wg.Add(1) + go func() { + defer wg.Done() + verifyMonotonicStreamUpdates(ctx, t, streamClient, i, &updateCount, errCh) + }() + } + + // Wait until all subscribers have verified the first bunch of updates all got + // delivered. + wg.Wait() + + close(errCh) + + // Require that none of them errored. Since we closed the chan above this loop + // should terminate immediately if no errors were buffered. + for err := range errCh { + require.NoError(t, err) + } + + // Sanity check that at least some non-snapshot messages were delivered. We + // can't know exactly how many because it's timing dependent based on when + // each subscribers snapshot occurs. + require.True(t, atomic.LoadUint64(&updateCount) > 0, + "at least some of the subscribers should have received non-snapshot updates") +} + +func newClientWithGRPCResolver(t *testing.T, ops ...func(*Config)) (*Client, *resolver.ServerResolverBuilder) { + builder := resolver.NewServerResolverBuilder(resolver.Config{Scheme: t.Name()}) + registerWithGRPC(builder) + + _, config := testClientConfig(t) + for _, op := range ops { + op(config) + } + + deps := newDefaultDeps(t, config) + deps.Router = router.NewRouter( + deps.Logger, + config.Datacenter, + fmt.Sprintf("%s.%s", config.NodeName, config.Datacenter), + builder) + + client, err := NewClient(config, deps) + require.NoError(t, err) + t.Cleanup(func() { + client.Shutdown() + }) + return client, builder +} + +var grpcRegisterLock sync.Mutex + +// registerWithGRPC registers the grpc/resolver.Builder as a grpc/resolver. +// This function exists to synchronize registrations with a lock. +// grpc/resolver.Register expects all registration to happen at init and does +// not allow for concurrent registration. This function exists to support +// parallel testing. +func registerWithGRPC(b grpcresolver.Builder) { + grpcRegisterLock.Lock() + defer grpcRegisterLock.Unlock() + grpcresolver.Register(b) +} + +type testLogger interface { + Logf(format string, args ...interface{}) +} + +func verifyMonotonicStreamUpdates(ctx context.Context, logger testLogger, client pbsubscribe.StateChangeSubscriptionClient, i int, updateCount *uint64, errCh chan<- error) { + req := &pbsubscribe.SubscribeRequest{Topic: pbsubscribe.Topic_ServiceHealth, Key: "redis"} + streamHandle, err := client.Subscribe(ctx, req) + if err != nil { + if strings.Contains(err.Error(), "context deadline exceeded") || + strings.Contains(err.Error(), "context canceled") { + logger.Logf("subscriber %05d: context cancelled before loop") + return + } + errCh <- err + return + } + + snapshotDone := false + expectPort := int32(0) + for { + event, err := streamHandle.Recv() + if err == io.EOF { + break + } + if err != nil { + if strings.Contains(err.Error(), "context deadline exceeded") || + strings.Contains(err.Error(), "context canceled") { + break + } + errCh <- err + return + } + + switch { + case event.GetEndOfSnapshot(): + snapshotDone = true + logger.Logf("subscriber %05d: snapshot done, expect next port to be %d", i, expectPort) + case snapshotDone: + // Verify we get all updates in order + svc, err := svcOrErr(event) + if err != nil { + errCh <- err + return + } + if expectPort != svc.Port { + errCh <- fmt.Errorf("subscriber %05d: missed %d update(s)!", i, svc.Port-expectPort) + return + } + atomic.AddUint64(updateCount, 1) + logger.Logf("subscriber %05d: got event with correct port=%d", i, expectPort) + expectPort++ + default: + // This is a snapshot update. Check if it's an update for the canary + // instance that got applied before our snapshot was sent (likely) + svc, err := svcOrErr(event) + if err != nil { + errCh <- err + return + } + if svc.ID == "redis-canary" { + // Update the expected port we see in the next update to be one more + // than the port in the snapshot. + expectPort = svc.Port + 1 + logger.Logf("subscriber %05d: saw canary in snapshot with port %d", i, svc.Port) + } + } + if expectPort > 100 { + return + } + } +} + +func svcOrErr(event *pbsubscribe.Event) (*pbservice.NodeService, error) { + health := event.GetServiceHealth() + if health == nil { + return nil, fmt.Errorf("not a health event: %#v", event) + } + csn := health.CheckServiceNode + if csn == nil { + return nil, fmt.Errorf("nil CSN: %#v", event) + } + if csn.Service == nil { + return nil, fmt.Errorf("nil service: %#v", event) + } + return csn.Service, nil +} From 55add287256f8dbe6f8b705c5bb5d7bde2af2683 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Fri, 4 Dec 2020 16:34:29 -0500 Subject: [PATCH 5/6] catalog_events: set the right key for connect snapshots Add a test for catalog_event snapshot on connect topic --- agent/consul/state/catalog_events.go | 14 +++-- agent/consul/state/catalog_events_test.go | 64 +++++++++++++++++++++++ 2 files changed, 74 insertions(+), 4 deletions(-) diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go index e88330c9cd..5aebcca6ee 100644 --- a/agent/consul/state/catalog_events.go +++ b/agent/consul/state/catalog_events.go @@ -64,11 +64,17 @@ func serviceHealthSnapshot(db ReadDB, topic stream.Topic) stream.SnapshotFunc { event := stream.Event{ Index: idx, Topic: topic, - Payload: EventPayloadCheckServiceNode{ - Op: pbsubscribe.CatalogOp_Register, - Value: &n, - }, } + payload := EventPayloadCheckServiceNode{ + Op: pbsubscribe.CatalogOp_Register, + Value: &n, + } + + if connect && n.Service.Kind == structs.ServiceKindConnectProxy { + payload.key = n.Service.Proxy.DestinationServiceName + } + + event.Payload = payload // append each event as a separate item so that they can be serialized // separately, to prevent the encoding of one massive message. diff --git a/agent/consul/state/catalog_events_test.go b/agent/consul/state/catalog_events_test.go index 1d1c0f2f35..b9ef8eadea 100644 --- a/agent/consul/state/catalog_events_test.go +++ b/agent/consul/state/catalog_events_test.go @@ -70,6 +70,70 @@ func TestServiceHealthSnapshot(t *testing.T) { assertDeepEqual(t, expected, buf.events, cmpEvents) } +func TestServiceHealthSnapshot_ConnectTopic(t *testing.T) { + store := NewStateStore(nil) + + counter := newIndexCounter() + err := store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "db")) + require.NoError(t, err) + err = store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "web")) + require.NoError(t, err) + err = store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "web", regSidecar)) + require.NoError(t, err) + err = store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "web", regNode2)) + require.NoError(t, err) + err = store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "web", regNode2, regSidecar)) + require.NoError(t, err) + + fn := serviceHealthSnapshot((*readDB)(store.db.db), topicServiceHealthConnect) + buf := &snapshotAppender{} + req := stream.SubscribeRequest{Key: "web", Topic: topicServiceHealthConnect} + + idx, err := fn(req, buf) + require.NoError(t, err) + require.Equal(t, counter.Last(), idx) + + expected := [][]stream.Event{ + { + testServiceHealthEvent(t, "web", evSidecar, evConnectTopic, func(e *stream.Event) error { + e.Index = counter.Last() + ep := e.Payload.(EventPayloadCheckServiceNode) + ep.key = "web" + e.Payload = ep + csn := ep.Value + csn.Node.CreateIndex = 1 + csn.Node.ModifyIndex = 1 + csn.Service.CreateIndex = 3 + csn.Service.ModifyIndex = 3 + csn.Checks[0].CreateIndex = 1 + csn.Checks[0].ModifyIndex = 1 + csn.Checks[1].CreateIndex = 3 + csn.Checks[1].ModifyIndex = 3 + return nil + }), + }, + { + testServiceHealthEvent(t, "web", evNode2, evSidecar, evConnectTopic, func(e *stream.Event) error { + e.Index = counter.Last() + ep := e.Payload.(EventPayloadCheckServiceNode) + ep.key = "web" + e.Payload = ep + csn := ep.Value + csn.Node.CreateIndex = 4 + csn.Node.ModifyIndex = 4 + csn.Service.CreateIndex = 5 + csn.Service.ModifyIndex = 5 + csn.Checks[0].CreateIndex = 4 + csn.Checks[0].ModifyIndex = 4 + csn.Checks[1].CreateIndex = 5 + csn.Checks[1].ModifyIndex = 5 + return nil + }), + }, + } + assertDeepEqual(t, expected, buf.events, cmpEvents) +} + type snapshotAppender struct { events [][]stream.Event } From 1156519d974d41c15e942861dfdb7e5b20611851 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Thu, 10 Dec 2020 13:48:10 -0500 Subject: [PATCH 6/6] docs: fix grpc metric names --- website/content/docs/agent/telemetry.mdx | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/website/content/docs/agent/telemetry.mdx b/website/content/docs/agent/telemetry.mdx index e26377ef41..f358a8256f 100644 --- a/website/content/docs/agent/telemetry.mdx +++ b/website/content/docs/agent/telemetry.mdx @@ -280,10 +280,10 @@ These metrics are used to monitor the health of the Consul servers. | `consul.txn.apply` | This measures the time spent applying a transaction operation. | ms | timer | | `consul.txn.read` | This measures the time spent returning a read transaction. | ms | timer | | `consul.grpc.client.request.count` | This metric counts the number of gRPC requests made by the client agent to a Consul server. | requests | counter | -| `consul.grpc.client.connect.count` | This metric counts the number of new gRPC connections opened by the client agent to a Consul server. | connections | counter | +| `consul.grpc.client.connection.count` | This metric counts the number of new gRPC connections opened by the client agent to a Consul server. | connections | counter | | `consul.grpc.client.connections` | This metric measures the number of active gRPC connections open from the client agent to any Consul servers. | connections | gauge | | `consul.grpc.server.request.count` | This metric counts the number of gRPC requests received by the server. | requests | counter | -| `consul.grpc.server.connect.count` | This metric counts the number of new gRPC connections received by the server. | connections | counter | +| `consul.grpc.server.connection.count` | This metric counts the number of new gRPC connections received by the server. | connections | counter | | `consul.grpc.server.connections` | This metric measures the number of active gRPC connections open on the server. | connections | gauge | | `consul.grpc.server.stream.count` | This metric counts the number of new gRPC streams received by the server. | streams | counter | | `consul.grpc.server.streams` | This metric measures the number of active gRPC streams handled by the server. | streams | guage |