Backport of Fix xDS missing endpoint race condition. into release/1.17.x (#19874)

backport of commit 7d8764dc0e

Co-authored-by: Keith Smiley <ksmiley@salesforce.com>
Co-authored-by: Derek Menteer <derek.menteer@hashicorp.com>
pull/19884/head
hc-github-team-consul-core 2023-12-08 11:59:21 -06:00 committed by GitHub
parent 94fd096bfc
commit f80fc2b548
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 184 additions and 7 deletions

3
.changelog/19866.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
xds: ensure child resources are re-sent to Envoy when the parent is updated even if the child already has pending updates.
```

View File

@ -8,6 +8,7 @@ import (
"encoding/hex"
"errors"
"fmt"
"os"
"strconv"
"sync"
"sync/atomic"
@ -43,6 +44,11 @@ import (
var errOverwhelmed = status.Error(codes.ResourceExhausted, "this server has too many xDS streams open, please try another")
// xdsProtocolLegacyChildResend enables the legacy behavior for the `ensureChildResend` function.
// This environment variable exists as an escape hatch so that users can disable the behavior, if needed.
// Ideally, this is a flag we can remove in 1.19+
var xdsProtocolLegacyChildResend = (os.Getenv("XDS_PROTOCOL_LEGACY_CHILD_RESEND") != "")
type deltaRecvResponse int
const (
@ -1080,13 +1086,9 @@ func (t *xDSDeltaType) createDeltaResponse(
}
func (t *xDSDeltaType) ensureChildResend(parentName, childName string) {
if _, exist := t.deltaChild.childType.resourceVersions[childName]; !exist {
return
}
if !t.subscribed(childName) {
return
}
t.logger.Trace(
"triggering implicit update of resource",
"typeUrl", t.typeURL,
@ -1094,11 +1096,41 @@ func (t *xDSDeltaType) ensureChildResend(parentName, childName string) {
"childTypeUrl", t.deltaChild.childType.typeURL,
"childResource", childName,
)
// resourceVersions tracks the last known version for this childName that Envoy
// has ACKed. By setting this to empty it effectively tells us that Envoy does
// not have any data for that child, and we need to re-send.
t.deltaChild.childType.resourceVersions[childName] = ""
if _, exist := t.deltaChild.childType.resourceVersions[childName]; exist {
t.deltaChild.childType.resourceVersions[childName] = ""
}
if xdsProtocolLegacyChildResend {
return
// TODO: This legacy behavior can be removed in 1.19, provided there are no outstanding issues.
//
// In this legacy mode, there is a confirmed race condition:
// - Send update endpoints
// - Send update cluster
// - Recv ACK endpoints
// - Recv ACK cluster
//
// When this situation happens, Envoy wipes the child endpoints when the cluster is updated,
// but it would never receive new ones. The endpoints would not be resent, because their hash
// never changed since the previous ACK.
//
// Due to ambiguity with the Envoy protocol [https://github.com/envoyproxy/envoy/issues/13009],
// it's difficult to state with certainty that no other unexpected side-effects are possible.
// This legacy escape hatch is left in-place in case some other complex race condition crops up.
//
// Longer-term, we should modify the hash of children to include the parent hash so that this
// behavior is implicitly handled, rather than being an edge case.
}
// pendingUpdates can contain newer versions that have been sent to Envoy but
// that we haven't processed an ACK for yet. These need to be cleared out, too,
// so that they aren't moved to resourceVersions by ack()
for nonce := range t.deltaChild.childType.pendingUpdates {
delete(t.deltaChild.childType.pendingUpdates[nonce], childName)
}
}
func computeResourceVersions(resourceMap *xdscommon.IndexedResources) (map[string]map[string]string, error) {

View File

@ -6,7 +6,6 @@ package xds
import (
"errors"
"fmt"
"github.com/hashicorp/consul/envoyextensions/xdscommon"
"strconv"
"strings"
"sync"
@ -14,6 +13,8 @@ import (
"testing"
"time"
"github.com/hashicorp/consul/envoyextensions/xdscommon"
"github.com/armon/go-metrics"
envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
envoy_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
@ -821,6 +822,147 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa
}
}
func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangeBeforeEndpointAck(t *testing.T) {
// This test ensures that the following race condition does not block indefinitely:
// - Send update endpoints
// - Send update cluster
// - Recv ACK endpoints
// - Recv ACK cluster
// Prior to a bug fix, this would have resulted in the endpoints NOT existing in Envoy. This occurred because
// the cluster update implicitly clears the endpoints in Envoy, but we would never re-send the endpoint data
// to compensate for the loss because we would incorrectly ACK the invalid old endpoint hash. Since the
// endpoint's hash did not actually change, they would not be resent.
aclResolve := func(id string) (acl.Authorizer, error) {
// Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("web-sidecar-proxy", nil)
// Register the proxy to create state needed to Watch() on
mgr.RegisterProxy(t, sid)
var snap *proxycfg.ConfigSnapshot
testutil.RunStep(t, "initial setup", func(t *testing.T) {
snap = newTestSnapshot(t, nil, "", nil)
// Send initial cluster discover.
envoy.SendDeltaReq(t, xdscommon.ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{})
// Check no response sent yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
requireProtocolVersionGauge(t, scenario, "v3", 1)
// Deliver a new snapshot (tcp with one tcp upstream)
mgr.DeliverConfig(t, sid, snap)
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.ClusterType,
Nonce: hexString(1),
Resources: makeTestResources(t,
makeTestCluster(t, snap, "tcp:local_app"),
makeTestCluster(t, snap, "tcp:db"),
makeTestCluster(t, snap, "tcp:geo-cache"),
),
})
})
var newSnap *proxycfg.ConfigSnapshot
testutil.RunStep(t, "resend cluster immediately", func(t *testing.T) {
// Deliver updated snapshot with new CA roots and leaf certificate. This will not be
// sent to Envoy until the initial set of cluster message is ACKed.
newSnap = newTestSnapshot(t, nil, "", nil)
mgr.DeliverConfig(t, sid, newSnap)
// Envoy then tries to discover endpoints for clusters.
envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
ResourceNamesSubscribe: []string{
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
"geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul",
},
})
// We should get a response immediately since the config is already present in
// the server for endpoints. Note that this should not be racy if the server
// is behaving well since the Cluster send above should be blocked until we
// deliver a new config version.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.EndpointType,
Nonce: hexString(2),
Resources: makeTestResources(t,
makeTestEndpoints(t, snap, "tcp:db"),
makeTestEndpoints(t, snap, "tcp:geo-cache"),
),
})
// After receiving the endpoints Envoy sends an ACK for the clusters
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)
// The updated cluster snapshot with new certificates is sent immediately
// after the first is ACKed.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.ClusterType,
Nonce: hexString(3),
Resources: makeTestResources(t,
// SAME makeTestCluster(t, snap, "tcp:local_app"),
makeTestCluster(t, newSnap, "tcp:db"),
makeTestCluster(t, newSnap, "tcp:geo-cache"),
),
})
})
testutil.RunStep(t, "resend endpoints", func(t *testing.T) {
// Envoy requests listeners because it has received endpoints. We won't send listeners
// until Envoy ACKs the second cluster update.
envoy.SendDeltaReq(t, xdscommon.ListenerType, nil)
// Envoy ACKs the endpoints from the first cluster update.
envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 2)
// Resend endpoints because the clusters changed.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.EndpointType,
Nonce: hexString(4),
Resources: makeTestResources(t,
makeTestEndpoints(t, newSnap, "tcp:db"),
makeTestEndpoints(t, newSnap, "tcp:geo-cache"),
),
})
// Envoy ACKs the new cluster and endpoints.
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 3)
envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 4)
// Listeners are sent after the cluster and endpoints are ACKed.
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.ListenerType,
Nonce: hexString(5),
Resources: makeTestResources(t,
makeTestListener(t, newSnap, "tcp:public_listener"),
makeTestListener(t, newSnap, "tcp:db"),
makeTestListener(t, newSnap, "tcp:geo-cache"),
),
})
// We are caught up, so there should be nothing queued to send.
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
// ACKs the listener
envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 5)
})
envoy.Close()
select {
case err := <-errCh:
require.NoError(t, err)
case <-time.After(50 * time.Millisecond):
t.Fatalf("timed out waiting for handler to finish")
}
}
func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChangesImpactRoutes(t *testing.T) {
aclResolve := func(id string) (acl.Authorizer, error) {
// Allow all