@ -102,6 +102,7 @@ func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u UpdateEv
if err := s . resetWatchesFromChain ( ctx , uid , resp . Chain , upstreamsSnapshot ) ; err != nil {
if err := s . resetWatchesFromChain ( ctx , uid , resp . Chain , upstreamsSnapshot ) ; err != nil {
return err
return err
}
}
reconcilePeeringWatches ( upstreamsSnapshot . DiscoveryChain , upstreamsSnapshot . UpstreamConfig , upstreamsSnapshot . PeeredUpstreams , upstreamsSnapshot . PeerUpstreamEndpoints , upstreamsSnapshot . UpstreamPeerTrustBundles )
case strings . HasPrefix ( u . CorrelationID , upstreamPeerWatchIDPrefix ) :
case strings . HasPrefix ( u . CorrelationID , upstreamPeerWatchIDPrefix ) :
resp , ok := u . Result . ( * structs . IndexedCheckServiceNodes )
resp , ok := u . Result . ( * structs . IndexedCheckServiceNodes )
@ -301,12 +302,6 @@ func (s *handlerUpstreams) resetWatchesFromChain(
delete ( snap . WatchedUpstreams [ uid ] , targetID )
delete ( snap . WatchedUpstreams [ uid ] , targetID )
delete ( snap . WatchedUpstreamEndpoints [ uid ] , targetID )
delete ( snap . WatchedUpstreamEndpoints [ uid ] , targetID )
cancelFn ( )
cancelFn ( )
targetUID := NewUpstreamIDFromTargetID ( targetID )
if targetUID . Peer != "" {
snap . PeerUpstreamEndpoints . CancelWatch ( targetUID )
snap . UpstreamPeerTrustBundles . CancelWatch ( targetUID . Peer )
}
}
}
var (
var (
@ -479,8 +474,8 @@ func (s *handlerUpstreams) watchUpstreamTarget(ctx context.Context, snap *Config
var entMeta acl . EnterpriseMeta
var entMeta acl . EnterpriseMeta
entMeta . Merge ( opts . entMeta )
entMeta . Merge ( opts . entMeta )
c tx, cancel := context . WithCancel ( ctx )
peerC tx, cancel := context . WithCancel ( ctx )
err := s . dataSources . Health . Notify ( c tx, & structs . ServiceSpecificRequest {
err := s . dataSources . Health . Notify ( peerC tx, & structs . ServiceSpecificRequest {
PeerName : opts . peer ,
PeerName : opts . peer ,
Datacenter : opts . datacenter ,
Datacenter : opts . datacenter ,
QueryOptions : structs . QueryOptions {
QueryOptions : structs . QueryOptions {
@ -506,25 +501,25 @@ func (s *handlerUpstreams) watchUpstreamTarget(ctx context.Context, snap *Config
return nil
return nil
}
}
if ok := snap . PeerUpstreamEndpoints . IsWatched ( uid ) ; ! ok {
if ! snap . PeerUpstreamEndpoints . IsWatched ( uid ) {
snap . PeerUpstreamEndpoints . InitWatch ( uid , cancel )
snap . PeerUpstreamEndpoints . InitWatch ( uid , cancel )
}
}
// Check whether a watch for this peer exists to avoid duplicates.
// Check whether a watch for this peer exists to avoid duplicates.
if ok := snap . UpstreamPeerTrustBundles . IsWatched ( uid . Peer ) ; ! ok {
peerCtx , cancel := context . WithCancel ( ctx )
if ! snap . UpstreamPeerTrustBundles . IsWatched ( uid . Peer ) {
if err := s . dataSources . TrustBundle . Notify ( peerCtx , & cachetype . TrustBundleReadRequest {
peerCtx2 , cancel2 := context . WithCancel ( ctx )
if err := s . dataSources . TrustBundle . Notify ( peerCtx2 , & cachetype . TrustBundleReadRequest {
Request : & pbpeering . TrustBundleReadRequest {
Request : & pbpeering . TrustBundleReadRequest {
Name : uid . Peer ,
Name : uid . Peer ,
Partition : uid . PartitionOrDefault ( ) ,
Partition : uid . PartitionOrDefault ( ) ,
} ,
} ,
QueryOptions : structs . QueryOptions { Token : s . token } ,
QueryOptions : structs . QueryOptions { Token : s . token } ,
} , peerTrustBundleIDPrefix + uid . Peer , s . ch ) ; err != nil {
} , peerTrustBundleIDPrefix + uid . Peer , s . ch ) ; err != nil {
cancel ( )
cancel 2 ( )
return fmt . Errorf ( "error while watching trust bundle for peer %q: %w" , uid . Peer , err )
return fmt . Errorf ( "error while watching trust bundle for peer %q: %w" , uid . Peer , err )
}
}
snap . UpstreamPeerTrustBundles . InitWatch ( uid . Peer , cancel )
snap . UpstreamPeerTrustBundles . InitWatch ( uid . Peer , cancel 2 )
}
}
return nil
return nil