Fix bug with prepared queries using sameness-groups.

This commit fixes an issue where the partition was not properly set
on the peering query failover target created from sameness-groups.
Before this change, it was always empty, meaning that the data
would be queried with respect to the default partition always. This
resulted in a situation where a PQ that was attempting to use a
sameness-group for failover would select peers from the default
partition, rather than the partition of the sameness-group itself.
pull/19970/head
Derek Menteer 2023-12-15 11:13:26 -06:00
parent f2b26ac194
commit 21bde38517
2 changed files with 67 additions and 64 deletions

3
.changelog/_7773.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
prepared-query: (Enterprise-only) Fix issue where sameness-group failover targets to peers would attempt to query data from the default partition, rather than the sameness-group's partition always.
```

View File

@ -3357,6 +3357,7 @@ type executeServers struct {
func createExecuteServers(t *testing.T) *executeServers {
es := newExecuteServers(t)
es.initPeering(t, "")
es.initWanFed(t)
es.exportPeeringServices(t)
es.initTokens(t)
@ -3365,7 +3366,6 @@ func createExecuteServers(t *testing.T) *executeServers {
}
func newExecuteServers(t *testing.T) *executeServers {
// Setup server
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
@ -3416,67 +3416,6 @@ func newExecuteServers(t *testing.T) *executeServers {
testrpc.WaitForLeader(t, s1.RPC, "dc1", testrpc.WithToken("root"))
testrpc.WaitForLeader(t, s3.RPC, "dc3")
acceptingPeerName := "my-peer-accepting-server"
dialingPeerName := "my-peer-dialing-server"
// Set up peering between dc1 (dialing) and dc3 (accepting) and export the foo service
{
// Create a peering by generating a token.
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(cancel)
options := structs.QueryOptions{Token: "root"}
ctx, err := grpcexternal.ContextWithQueryOptions(ctx, options)
require.NoError(t, err)
conn, err := grpc.DialContext(ctx, s3.config.RPCAddr.String(),
grpc.WithContextDialer(newServerDialer(s3.config.RPCAddr.String())),
//nolint:staticcheck
grpc.WithInsecure(),
grpc.WithBlock())
require.NoError(t, err)
t.Cleanup(func() {
conn.Close()
})
peeringClient := pbpeering.NewPeeringServiceClient(conn)
req := pbpeering.GenerateTokenRequest{
PeerName: dialingPeerName,
}
resp, err := peeringClient.GenerateToken(ctx, &req)
require.NoError(t, err)
conn, err = grpc.DialContext(ctx, s1.config.RPCAddr.String(),
grpc.WithContextDialer(newServerDialer(s1.config.RPCAddr.String())),
//nolint:staticcheck
grpc.WithInsecure(),
grpc.WithBlock())
require.NoError(t, err)
t.Cleanup(func() {
conn.Close()
})
peeringClient = pbpeering.NewPeeringServiceClient(conn)
establishReq := pbpeering.EstablishRequest{
PeerName: acceptingPeerName,
PeeringToken: resp.PeeringToken,
}
establishResp, err := peeringClient.Establish(ctx, &establishReq)
require.NoError(t, err)
require.NotNil(t, establishResp)
readResp, err := peeringClient.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: acceptingPeerName})
require.NoError(t, err)
require.NotNil(t, readResp)
// Wait for the stream to be connected.
retry.Run(t, func(r *retry.R) {
status, found := s1.peerStreamServer.StreamStatus(readResp.GetPeering().GetID())
require.True(r, found)
require.True(r, status.Connected)
})
}
es := executeServers{
server: &serverTestMetadata{
server: s1,
@ -3487,8 +3426,8 @@ func newExecuteServers(t *testing.T) *executeServers {
server: s3,
codec: codec3,
datacenter: "dc3",
dialingPeerName: dialingPeerName,
acceptingPeerName: acceptingPeerName,
dialingPeerName: "my-peer-dialing-server",
acceptingPeerName: "my-peer-accepting-server",
},
}
@ -3562,3 +3501,64 @@ func (es *executeServers) initWanFed(t *testing.T) {
datacenter: "dc2",
}
}
func (es *executeServers) initPeering(t *testing.T, localPartition string) {
// Create a peering by generating a token.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
t.Cleanup(cancel)
options := structs.QueryOptions{Token: "root"}
ctx, err := grpcexternal.ContextWithQueryOptions(ctx, options)
require.NoError(t, err)
conn, err := grpc.DialContext(ctx, es.peeringServer.server.config.RPCAddr.String(),
grpc.WithContextDialer(newServerDialer(es.peeringServer.server.config.RPCAddr.String())),
//nolint:staticcheck
grpc.WithInsecure(),
grpc.WithBlock())
require.NoError(t, err)
t.Cleanup(func() {
conn.Close()
})
peeringClient := pbpeering.NewPeeringServiceClient(conn)
req := pbpeering.GenerateTokenRequest{
PeerName: es.peeringServer.dialingPeerName,
}
resp, err := peeringClient.GenerateToken(ctx, &req)
require.NoError(t, err)
conn, err = grpc.DialContext(ctx, es.server.server.config.RPCAddr.String(),
grpc.WithContextDialer(newServerDialer(es.server.server.config.RPCAddr.String())),
//nolint:staticcheck
grpc.WithInsecure(),
grpc.WithBlock())
require.NoError(t, err)
t.Cleanup(func() {
conn.Close()
})
peeringClient = pbpeering.NewPeeringServiceClient(conn)
establishReq := pbpeering.EstablishRequest{
Partition: localPartition,
PeerName: es.peeringServer.acceptingPeerName,
PeeringToken: resp.PeeringToken,
}
establishResp, err := peeringClient.Establish(ctx, &establishReq)
require.NoError(t, err)
require.NotNil(t, establishResp)
readResp, err := peeringClient.PeeringRead(ctx, &pbpeering.PeeringReadRequest{
Partition: localPartition,
Name: es.peeringServer.acceptingPeerName,
})
require.NoError(t, err)
require.NotNil(t, readResp)
// Wait for the stream to be connected.
retry.Run(t, func(r *retry.R) {
status, found := es.server.server.peerStreamServer.StreamStatus(readResp.GetPeering().GetID())
require.True(r, found)
require.True(r, status.Connected)
})
}