From 21bde385174473f4d537f3a5fcf6b8fcfec60218 Mon Sep 17 00:00:00 2001 From: Derek Menteer Date: Fri, 15 Dec 2023 11:13:26 -0600 Subject: [PATCH] Fix bug with prepared queries using sameness-groups. This commit fixes an issue where the partition was not properly set on the peering query failover target created from sameness-groups. Before this change, it was always empty, meaning that the data would be queried with respect to the default partition always. This resulted in a situation where a PQ that was attempting to use a sameness-group for failover would select peers from the default partition, rather than the partition of the sameness-group itself. --- .changelog/_7773.txt | 3 + agent/consul/prepared_query_endpoint_test.go | 128 +++++++++---------- 2 files changed, 67 insertions(+), 64 deletions(-) create mode 100644 .changelog/_7773.txt diff --git a/.changelog/_7773.txt b/.changelog/_7773.txt new file mode 100644 index 0000000000..6f5b340f56 --- /dev/null +++ b/.changelog/_7773.txt @@ -0,0 +1,3 @@ +```release-note:bug +prepared-query: (Enterprise-only) Fix issue where sameness-group failover targets to peers would attempt to query data from the default partition, rather than the sameness-group's partition always. +``` diff --git a/agent/consul/prepared_query_endpoint_test.go b/agent/consul/prepared_query_endpoint_test.go index f0d63dc277..2761df4093 100644 --- a/agent/consul/prepared_query_endpoint_test.go +++ b/agent/consul/prepared_query_endpoint_test.go @@ -3357,6 +3357,7 @@ type executeServers struct { func createExecuteServers(t *testing.T) *executeServers { es := newExecuteServers(t) + es.initPeering(t, "") es.initWanFed(t) es.exportPeeringServices(t) es.initTokens(t) @@ -3365,7 +3366,6 @@ func createExecuteServers(t *testing.T) *executeServers { } func newExecuteServers(t *testing.T) *executeServers { - // Setup server dir1, s1 := testServerWithConfig(t, func(c *Config) { c.PrimaryDatacenter = "dc1" @@ -3416,67 +3416,6 @@ func newExecuteServers(t *testing.T) *executeServers { testrpc.WaitForLeader(t, s1.RPC, "dc1", testrpc.WithToken("root")) testrpc.WaitForLeader(t, s3.RPC, "dc3") - acceptingPeerName := "my-peer-accepting-server" - dialingPeerName := "my-peer-dialing-server" - - // Set up peering between dc1 (dialing) and dc3 (accepting) and export the foo service - { - // Create a peering by generating a token. - ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - t.Cleanup(cancel) - - options := structs.QueryOptions{Token: "root"} - ctx, err := grpcexternal.ContextWithQueryOptions(ctx, options) - require.NoError(t, err) - - conn, err := grpc.DialContext(ctx, s3.config.RPCAddr.String(), - grpc.WithContextDialer(newServerDialer(s3.config.RPCAddr.String())), - //nolint:staticcheck - grpc.WithInsecure(), - grpc.WithBlock()) - require.NoError(t, err) - t.Cleanup(func() { - conn.Close() - }) - - peeringClient := pbpeering.NewPeeringServiceClient(conn) - req := pbpeering.GenerateTokenRequest{ - PeerName: dialingPeerName, - } - resp, err := peeringClient.GenerateToken(ctx, &req) - require.NoError(t, err) - - conn, err = grpc.DialContext(ctx, s1.config.RPCAddr.String(), - grpc.WithContextDialer(newServerDialer(s1.config.RPCAddr.String())), - //nolint:staticcheck - grpc.WithInsecure(), - grpc.WithBlock()) - require.NoError(t, err) - t.Cleanup(func() { - conn.Close() - }) - - peeringClient = pbpeering.NewPeeringServiceClient(conn) - establishReq := pbpeering.EstablishRequest{ - PeerName: acceptingPeerName, - PeeringToken: resp.PeeringToken, - } - establishResp, err := peeringClient.Establish(ctx, &establishReq) - require.NoError(t, err) - require.NotNil(t, establishResp) - - readResp, err := peeringClient.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: acceptingPeerName}) - require.NoError(t, err) - require.NotNil(t, readResp) - - // Wait for the stream to be connected. - retry.Run(t, func(r *retry.R) { - status, found := s1.peerStreamServer.StreamStatus(readResp.GetPeering().GetID()) - require.True(r, found) - require.True(r, status.Connected) - }) - } - es := executeServers{ server: &serverTestMetadata{ server: s1, @@ -3487,8 +3426,8 @@ func newExecuteServers(t *testing.T) *executeServers { server: s3, codec: codec3, datacenter: "dc3", - dialingPeerName: dialingPeerName, - acceptingPeerName: acceptingPeerName, + dialingPeerName: "my-peer-dialing-server", + acceptingPeerName: "my-peer-accepting-server", }, } @@ -3562,3 +3501,64 @@ func (es *executeServers) initWanFed(t *testing.T) { datacenter: "dc2", } } + +func (es *executeServers) initPeering(t *testing.T, localPartition string) { + // Create a peering by generating a token. + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + t.Cleanup(cancel) + + options := structs.QueryOptions{Token: "root"} + ctx, err := grpcexternal.ContextWithQueryOptions(ctx, options) + require.NoError(t, err) + + conn, err := grpc.DialContext(ctx, es.peeringServer.server.config.RPCAddr.String(), + grpc.WithContextDialer(newServerDialer(es.peeringServer.server.config.RPCAddr.String())), + //nolint:staticcheck + grpc.WithInsecure(), + grpc.WithBlock()) + require.NoError(t, err) + t.Cleanup(func() { + conn.Close() + }) + + peeringClient := pbpeering.NewPeeringServiceClient(conn) + req := pbpeering.GenerateTokenRequest{ + PeerName: es.peeringServer.dialingPeerName, + } + resp, err := peeringClient.GenerateToken(ctx, &req) + require.NoError(t, err) + + conn, err = grpc.DialContext(ctx, es.server.server.config.RPCAddr.String(), + grpc.WithContextDialer(newServerDialer(es.server.server.config.RPCAddr.String())), + //nolint:staticcheck + grpc.WithInsecure(), + grpc.WithBlock()) + require.NoError(t, err) + t.Cleanup(func() { + conn.Close() + }) + + peeringClient = pbpeering.NewPeeringServiceClient(conn) + establishReq := pbpeering.EstablishRequest{ + Partition: localPartition, + PeerName: es.peeringServer.acceptingPeerName, + PeeringToken: resp.PeeringToken, + } + establishResp, err := peeringClient.Establish(ctx, &establishReq) + require.NoError(t, err) + require.NotNil(t, establishResp) + + readResp, err := peeringClient.PeeringRead(ctx, &pbpeering.PeeringReadRequest{ + Partition: localPartition, + Name: es.peeringServer.acceptingPeerName, + }) + require.NoError(t, err) + require.NotNil(t, readResp) + + // Wait for the stream to be connected. + retry.Run(t, func(r *retry.R) { + status, found := es.server.server.peerStreamServer.StreamStatus(readResp.GetPeering().GetID()) + require.True(r, found) + require.True(r, status.Connected) + }) +}