backport of commit 746a0a1d73 (#16734)

Co-authored-by: Eric <eric@haberkorn.co>
backport/nathancoleman-patch-1/distinctly-growing-parakeet
hc-github-team-consul-core 2023-03-22 06:35:37 -07:00 committed by GitHub
parent f76c9fd247
commit 9021e0c2c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 39 additions and 11 deletions

3
.changelog/16729.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
peering: Fix issue resulting in prepared query failover to cluster peers never un-failing over.
```

View File

@ -468,7 +468,7 @@ func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest,
// by the query setup.
if len(reply.Nodes) == 0 {
wrapper := &queryServerWrapper{srv: p.srv, executeRemote: p.ExecuteRemote}
if err := queryFailover(wrapper, query, args, reply); err != nil {
if err := queryFailover(wrapper, *query, args, reply); err != nil {
return err
}
}
@ -707,7 +707,7 @@ func (q *queryServerWrapper) GetOtherDatacentersByDistance() ([]string, error) {
// queryFailover runs an algorithm to determine which DCs to try and then calls
// them to try to locate alternative services.
func queryFailover(q queryServer, query *structs.PreparedQuery,
func queryFailover(q queryServer, query structs.PreparedQuery,
args *structs.PreparedQueryExecuteRequest,
reply *structs.PreparedQueryExecuteResponse) error {
@ -789,7 +789,7 @@ func queryFailover(q queryServer, query *structs.PreparedQuery,
// the remote query as well.
remote := &structs.PreparedQueryExecuteRemoteRequest{
Datacenter: dc,
Query: *query,
Query: query,
Limit: args.Limit,
QueryOptions: args.QueryOptions,
Connect: args.Connect,

View File

@ -2092,16 +2092,16 @@ func TestPreparedQuery_Execute(t *testing.T) {
require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID))
// Update the health of a node to mark it critical.
setHealth := func(t *testing.T, codec rpc.ClientCodec, dc string, node string, health string) {
setHealth := func(t *testing.T, codec rpc.ClientCodec, dc string, i int, health string) {
t.Helper()
req := structs.RegisterRequest{
Datacenter: dc,
Node: node,
Node: fmt.Sprintf("node%d", i),
Address: "127.0.0.1",
Service: &structs.NodeService{
Service: "foo",
Port: 8000,
Tags: []string{"dc1", "tag1"},
Tags: []string{dc, fmt.Sprintf("tag%d", i)},
},
Check: &structs.HealthCheck{
Name: "failing",
@ -2113,7 +2113,7 @@ func TestPreparedQuery_Execute(t *testing.T) {
var reply struct{}
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &reply))
}
setHealth(t, codec1, "dc1", "node1", api.HealthCritical)
setHealth(t, codec1, "dc1", 1, api.HealthCritical)
// The failing node should be filtered.
t.Run("failing node filtered", func(t *testing.T) {
@ -2133,7 +2133,7 @@ func TestPreparedQuery_Execute(t *testing.T) {
})
// Upgrade it to a warning and re-query, should be 10 nodes again.
setHealth(t, codec1, "dc1", "node1", api.HealthWarning)
setHealth(t, codec1, "dc1", 1, api.HealthWarning)
t.Run("warning nodes are included", func(t *testing.T) {
req := structs.PreparedQueryExecuteRequest{
Datacenter: "dc1",
@ -2303,7 +2303,7 @@ func TestPreparedQuery_Execute(t *testing.T) {
// Now fail everything in dc1 and we should get an empty list back.
for i := 0; i < 10; i++ {
setHealth(t, codec1, "dc1", fmt.Sprintf("node%d", i+1), api.HealthCritical)
setHealth(t, codec1, "dc1", i+1, api.HealthCritical)
}
t.Run("everything is failing so should get empty list", func(t *testing.T) {
req := structs.PreparedQueryExecuteRequest{
@ -2474,7 +2474,7 @@ func TestPreparedQuery_Execute(t *testing.T) {
// Set all checks in dc2 as critical
for i := 0; i < 10; i++ {
setHealth(t, codec2, "dc2", fmt.Sprintf("node%d", i+1), api.HealthCritical)
setHealth(t, codec2, "dc2", i+1, api.HealthCritical)
}
// Now we should see 9 nodes from dc3 (we have the tag filter still)
@ -2493,6 +2493,31 @@ func TestPreparedQuery_Execute(t *testing.T) {
}
expectFailoverPeerNodes(t, &query, &reply, 9)
})
// Set all checks in dc1 as passing
for i := 0; i < 10; i++ {
setHealth(t, codec1, "dc1", i+1, api.HealthPassing)
}
// Nothing is healthy so nothing is returned
t.Run("un-failing over", func(t *testing.T) {
retry.Run(t, func(r *retry.R) {
req := structs.PreparedQueryExecuteRequest{
Datacenter: "dc1",
QueryIDOrName: query.Query.ID,
QueryOptions: structs.QueryOptions{Token: execToken},
}
var reply structs.PreparedQueryExecuteResponse
require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply))
for _, node := range reply.Nodes {
assert.NotEqual(t, "node3", node.Node.Node)
}
expectNodes(t, &query, &reply, 9)
})
})
}
func TestPreparedQuery_Execute_ForwardLeader(t *testing.T) {
@ -2982,7 +3007,7 @@ func (m *mockQueryServer) ExecuteRemote(args *structs.PreparedQueryExecuteRemote
func TestPreparedQuery_queryFailover(t *testing.T) {
t.Parallel()
query := &structs.PreparedQuery{
query := structs.PreparedQuery{
Name: "test",
Service: structs.ServiceQuery{
Failover: structs.QueryFailoverOptions{