From 21cca2dc5b1b35e1db1151b4dcacefa3975e7d92 Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Tue, 19 Nov 2024 09:36:13 -0500 Subject: [PATCH] Fix PeerUpstreamEndpoints and UpstreamPeerTrustBundles to only Cancel watch when needed, otherwise keep the watch active (#21871) * fix to only reset peering watches when no other target need watching * remove unused logger * add changelog * Update .changelog/21871.txt Co-authored-by: Nitya Dhanushkodi --------- Co-authored-by: Nitya Dhanushkodi --- .changelog/21871.txt | 3 ++ agent/proxycfg/api_gateway.go | 6 +--- agent/proxycfg/connect_proxy.go | 53 ++----------------------------- agent/proxycfg/ingress_gateway.go | 7 +--- agent/proxycfg/state.go | 50 ++++++++++++++++++++++++++++- agent/proxycfg/upstreams.go | 25 ++++++--------- 6 files changed, 67 insertions(+), 77 deletions(-) create mode 100644 .changelog/21871.txt diff --git a/.changelog/21871.txt b/.changelog/21871.txt new file mode 100644 index 0000000000..425ba2b5e2 --- /dev/null +++ b/.changelog/21871.txt @@ -0,0 +1,3 @@ +```release-note:bug +proxycfg: fix a bug where peered upstreams watches are canceled even when another target needs it. +``` diff --git a/agent/proxycfg/api_gateway.go b/agent/proxycfg/api_gateway.go index eb5d246462..0c44e5bfd4 100644 --- a/agent/proxycfg/api_gateway.go +++ b/agent/proxycfg/api_gateway.go @@ -476,16 +476,12 @@ func (h *handlerAPIGateway) handleRouteConfigUpdate(ctx context.Context, u Updat cancelUpstream() delete(snap.APIGateway.WatchedUpstreams[upstreamID], targetID) delete(snap.APIGateway.WatchedUpstreamEndpoints[upstreamID], targetID) - - if targetUID := NewUpstreamIDFromTargetID(targetID); targetUID.Peer != "" { - snap.APIGateway.PeerUpstreamEndpoints.CancelWatch(targetUID) - snap.APIGateway.UpstreamPeerTrustBundles.CancelWatch(targetUID.Peer) - } } cancelDiscoChain() delete(snap.APIGateway.WatchedDiscoveryChains, upstreamID) } + reconcilePeeringWatches(snap.APIGateway.DiscoveryChain, snap.APIGateway.UpstreamConfig, snap.APIGateway.PeeredUpstreams, snap.APIGateway.PeerUpstreamEndpoints, snap.APIGateway.UpstreamPeerTrustBundles) return nil } diff --git a/agent/proxycfg/connect_proxy.go b/agent/proxycfg/connect_proxy.go index 0a8c173792..35c0462cb3 100644 --- a/agent/proxycfg/connect_proxy.go +++ b/agent/proxycfg/connect_proxy.go @@ -380,49 +380,7 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u UpdateEvent, s // // Clean up data // - - peeredChainTargets := make(map[UpstreamID]struct{}) - for _, discoChain := range snap.ConnectProxy.DiscoveryChain { - for _, target := range discoChain.Targets { - if target.Peer == "" { - continue - } - uid := NewUpstreamIDFromTargetID(target.ID) - peeredChainTargets[uid] = struct{}{} - } - } - - validPeerNames := make(map[string]struct{}) - - // Iterate through all known endpoints and remove references to upstream IDs that weren't in the update - snap.ConnectProxy.PeerUpstreamEndpoints.ForEachKey(func(uid UpstreamID) bool { - // Peered upstream is explicitly defined in upstream config - if _, ok := snap.ConnectProxy.UpstreamConfig[uid]; ok { - validPeerNames[uid.Peer] = struct{}{} - return true - } - // Peered upstream came from dynamic source of imported services - if _, ok := seenUpstreams[uid]; ok { - validPeerNames[uid.Peer] = struct{}{} - return true - } - // Peered upstream came from a discovery chain target - if _, ok := peeredChainTargets[uid]; ok { - validPeerNames[uid.Peer] = struct{}{} - return true - } - snap.ConnectProxy.PeerUpstreamEndpoints.CancelWatch(uid) - return true - }) - - // Iterate through all known trust bundles and remove references to any unseen peer names - snap.ConnectProxy.UpstreamPeerTrustBundles.ForEachKey(func(peerName PeerName) bool { - if _, ok := validPeerNames[peerName]; !ok { - snap.ConnectProxy.UpstreamPeerTrustBundles.CancelWatch(peerName) - } - return true - }) - + reconcilePeeringWatches(snap.ConnectProxy.DiscoveryChain, snap.ConnectProxy.UpstreamConfig, snap.ConnectProxy.PeeredUpstreams, snap.ConnectProxy.PeerUpstreamEndpoints, snap.ConnectProxy.UpstreamPeerTrustBundles) case u.CorrelationID == intentionUpstreamsID: resp, ok := u.Result.(*structs.IndexedServiceList) if !ok { @@ -490,18 +448,13 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u UpdateEvent, s continue } if _, ok := seenUpstreams[uid]; !ok { - for targetID, cancelFn := range targets { + for _, cancelFn := range targets { cancelFn() - - targetUID := NewUpstreamIDFromTargetID(targetID) - if targetUID.Peer != "" { - snap.ConnectProxy.PeerUpstreamEndpoints.CancelWatch(targetUID) - snap.ConnectProxy.UpstreamPeerTrustBundles.CancelWatch(targetUID.Peer) - } } delete(snap.ConnectProxy.WatchedUpstreams, uid) } } + reconcilePeeringWatches(snap.ConnectProxy.DiscoveryChain, snap.ConnectProxy.UpstreamConfig, snap.ConnectProxy.PeeredUpstreams, snap.ConnectProxy.PeerUpstreamEndpoints, snap.ConnectProxy.UpstreamPeerTrustBundles) for uid := range snap.ConnectProxy.WatchedUpstreamEndpoints { if upstream, ok := snap.ConnectProxy.UpstreamConfig[uid]; ok && !upstream.CentrallyConfigured { continue diff --git a/agent/proxycfg/ingress_gateway.go b/agent/proxycfg/ingress_gateway.go index 3ab5828add..0262ffcb37 100644 --- a/agent/proxycfg/ingress_gateway.go +++ b/agent/proxycfg/ingress_gateway.go @@ -171,18 +171,13 @@ func (s *handlerIngressGateway) handleUpdate(ctx context.Context, u UpdateEvent, delete(snap.IngressGateway.WatchedUpstreams[uid], targetID) delete(snap.IngressGateway.WatchedUpstreamEndpoints[uid], targetID) cancelUpstreamFn() - - targetUID := NewUpstreamIDFromTargetID(targetID) - if targetUID.Peer != "" { - snap.IngressGateway.PeerUpstreamEndpoints.CancelWatch(targetUID) - snap.IngressGateway.UpstreamPeerTrustBundles.CancelWatch(targetUID.Peer) - } } cancelFn() delete(snap.IngressGateway.WatchedDiscoveryChains, uid) } } + reconcilePeeringWatches(snap.IngressGateway.DiscoveryChain, snap.IngressGateway.UpstreamConfig, snap.IngressGateway.PeeredUpstreams, snap.IngressGateway.PeerUpstreamEndpoints, snap.IngressGateway.UpstreamPeerTrustBundles) if err := s.watchIngressLeafCert(ctx, snap); err != nil { return err diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go index b6b9c78f32..d0ae44fbab 100644 --- a/agent/proxycfg/state.go +++ b/agent/proxycfg/state.go @@ -13,12 +13,15 @@ import ( "sync/atomic" "time" - "github.com/hashicorp/go-hclog" "golang.org/x/time/rate" + "github.com/hashicorp/go-hclog" + cachetype "github.com/hashicorp/consul/agent/cache-types" + "github.com/hashicorp/consul/agent/proxycfg/internal/watch" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/logging" + "github.com/hashicorp/consul/proto/private/pbpeering" ) const ( @@ -551,3 +554,48 @@ func watchMeshGateway(ctx context.Context, opts gatewayWatchOpts) error { EnterpriseMeta: *structs.DefaultEnterpriseMetaInPartition(opts.key.Partition), }, correlationId, opts.notifyCh) } + +func reconcilePeeringWatches(compiledDiscoveryChains map[UpstreamID]*structs.CompiledDiscoveryChain, upstreams map[UpstreamID]*structs.Upstream, peeredUpstreams map[UpstreamID]struct{}, peerUpstreamEndpoints watch.Map[UpstreamID, structs.CheckServiceNodes], upstreamPeerTrustBundles watch.Map[PeerName, *pbpeering.PeeringTrustBundle]) { + + peeredChainTargets := make(map[UpstreamID]struct{}) + for _, discoChain := range compiledDiscoveryChains { + for _, target := range discoChain.Targets { + if target.Peer == "" { + continue + } + uid := NewUpstreamIDFromTargetID(target.ID) + peeredChainTargets[uid] = struct{}{} + } + } + + validPeerNames := make(map[string]struct{}) + + // Iterate through all known endpoints and remove references to upstream IDs that weren't in the update + peerUpstreamEndpoints.ForEachKey(func(uid UpstreamID) bool { + // Peered upstream is explicitly defined in upstream config + if _, ok := upstreams[uid]; ok { + validPeerNames[uid.Peer] = struct{}{} + return true + } + // Peered upstream came from dynamic source of imported services + if _, ok := peeredUpstreams[uid]; ok { + validPeerNames[uid.Peer] = struct{}{} + return true + } + // Peered upstream came from a discovery chain target + if _, ok := peeredChainTargets[uid]; ok { + validPeerNames[uid.Peer] = struct{}{} + return true + } + peerUpstreamEndpoints.CancelWatch(uid) + return true + }) + + // Iterate through all known trust bundles and remove references to any unseen peer names + upstreamPeerTrustBundles.ForEachKey(func(peerName PeerName) bool { + if _, ok := validPeerNames[peerName]; !ok { + upstreamPeerTrustBundles.CancelWatch(peerName) + } + return true + }) +} diff --git a/agent/proxycfg/upstreams.go b/agent/proxycfg/upstreams.go index 209a3446d9..052e91eb10 100644 --- a/agent/proxycfg/upstreams.go +++ b/agent/proxycfg/upstreams.go @@ -102,6 +102,7 @@ func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u UpdateEv if err := s.resetWatchesFromChain(ctx, uid, resp.Chain, upstreamsSnapshot); err != nil { return err } + reconcilePeeringWatches(upstreamsSnapshot.DiscoveryChain, upstreamsSnapshot.UpstreamConfig, upstreamsSnapshot.PeeredUpstreams, upstreamsSnapshot.PeerUpstreamEndpoints, upstreamsSnapshot.UpstreamPeerTrustBundles) case strings.HasPrefix(u.CorrelationID, upstreamPeerWatchIDPrefix): resp, ok := u.Result.(*structs.IndexedCheckServiceNodes) @@ -301,12 +302,6 @@ func (s *handlerUpstreams) resetWatchesFromChain( delete(snap.WatchedUpstreams[uid], targetID) delete(snap.WatchedUpstreamEndpoints[uid], targetID) cancelFn() - - targetUID := NewUpstreamIDFromTargetID(targetID) - if targetUID.Peer != "" { - snap.PeerUpstreamEndpoints.CancelWatch(targetUID) - snap.UpstreamPeerTrustBundles.CancelWatch(targetUID.Peer) - } } var ( @@ -479,8 +474,8 @@ func (s *handlerUpstreams) watchUpstreamTarget(ctx context.Context, snap *Config var entMeta acl.EnterpriseMeta entMeta.Merge(opts.entMeta) - ctx, cancel := context.WithCancel(ctx) - err := s.dataSources.Health.Notify(ctx, &structs.ServiceSpecificRequest{ + peerCtx, cancel := context.WithCancel(ctx) + err := s.dataSources.Health.Notify(peerCtx, &structs.ServiceSpecificRequest{ PeerName: opts.peer, Datacenter: opts.datacenter, QueryOptions: structs.QueryOptions{ @@ -506,25 +501,25 @@ func (s *handlerUpstreams) watchUpstreamTarget(ctx context.Context, snap *Config return nil } - if ok := snap.PeerUpstreamEndpoints.IsWatched(uid); !ok { + if !snap.PeerUpstreamEndpoints.IsWatched(uid) { snap.PeerUpstreamEndpoints.InitWatch(uid, cancel) } - // Check whether a watch for this peer exists to avoid duplicates. - if ok := snap.UpstreamPeerTrustBundles.IsWatched(uid.Peer); !ok { - peerCtx, cancel := context.WithCancel(ctx) - if err := s.dataSources.TrustBundle.Notify(peerCtx, &cachetype.TrustBundleReadRequest{ + + if !snap.UpstreamPeerTrustBundles.IsWatched(uid.Peer) { + peerCtx2, cancel2 := context.WithCancel(ctx) + if err := s.dataSources.TrustBundle.Notify(peerCtx2, &cachetype.TrustBundleReadRequest{ Request: &pbpeering.TrustBundleReadRequest{ Name: uid.Peer, Partition: uid.PartitionOrDefault(), }, QueryOptions: structs.QueryOptions{Token: s.token}, }, peerTrustBundleIDPrefix+uid.Peer, s.ch); err != nil { - cancel() + cancel2() return fmt.Errorf("error while watching trust bundle for peer %q: %w", uid.Peer, err) } - snap.UpstreamPeerTrustBundles.InitWatch(uid.Peer, cancel) + snap.UpstreamPeerTrustBundles.InitWatch(uid.Peer, cancel2) } return nil