mirror of https://github.com/hashicorp/consul
xds: generate clusters directly from API gateway snapshot (#17391)
* endpoints xds cluster configuration * clusters xds native generation * resources test fix * fix reversion in resources_test * Update agent/proxycfg/api_gateway.go Co-authored-by: John Maguire <john.maguire@hashicorp.com> * gofmt * Modify getReadyUpstreams to filter upstreams by listener (#17410) Each listener would previously have all upstreams from any route that bound to the listener. This is problematic when a route bound to one listener also binds to other listeners and so includes upstreams for multiple listeners. The list for a given listener would then wind up including upstreams for other listeners. * Update agent/proxycfg/api_gateway.go Co-authored-by: Nathan Coleman <nathan.coleman@hashicorp.com> * Restore import blocking * Undo removal of unrelated code --------- Co-authored-by: John Maguire <john.maguire@hashicorp.com> Co-authored-by: Nathan Coleman <nathan.coleman@hashicorp.com>pull/17416/head^2
parent
93bad3ea1b
commit
d34bde0e4e
|
@ -65,13 +65,7 @@ func (s *ResourceGenerator) clustersFromSnapshot(cfgSnap *proxycfg.ConfigSnapsho
|
|||
}
|
||||
return res, nil
|
||||
case structs.ServiceKindAPIGateway:
|
||||
// TODO Find a cleaner solution, can't currently pass unexported property types
|
||||
var err error
|
||||
cfgSnap.IngressGateway, err = cfgSnap.APIGateway.ToIngress(cfgSnap.Datacenter)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
res, err := s.clustersFromSnapshotIngressGateway(cfgSnap)
|
||||
res, err := s.clustersFromSnapshotAPIGateway(cfgSnap)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -816,6 +810,44 @@ func (s *ResourceGenerator) clustersFromSnapshotIngressGateway(cfgSnap *proxycfg
|
|||
return clusters, nil
|
||||
}
|
||||
|
||||
func (s *ResourceGenerator) clustersFromSnapshotAPIGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
|
||||
var clusters []proto.Message
|
||||
createdClusters := make(map[proxycfg.UpstreamID]bool)
|
||||
readyUpstreamsList := getReadyUpstreams(cfgSnap)
|
||||
|
||||
for _, readyUpstreams := range readyUpstreamsList {
|
||||
for _, upstream := range readyUpstreams.upstreams {
|
||||
uid := proxycfg.NewUpstreamID(&upstream)
|
||||
|
||||
// If we've already created a cluster for this upstream, skip it. Multiple listeners may
|
||||
// reference the same upstream, so we don't need to create duplicate clusters in that case.
|
||||
if createdClusters[uid] {
|
||||
continue
|
||||
}
|
||||
|
||||
// Grab the discovery chain compiled in handlerAPIGateway.recompileDiscoveryChains
|
||||
chain, ok := cfgSnap.APIGateway.DiscoveryChain[uid]
|
||||
if !ok {
|
||||
// this should not happen
|
||||
return nil, fmt.Errorf("no discovery chain for upstream %q", uid)
|
||||
}
|
||||
|
||||
// Generate the list of upstream clusters for the discovery chain
|
||||
upstreamClusters, err := s.makeUpstreamClustersForDiscoveryChain(uid, &upstream, chain, cfgSnap, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, cluster := range upstreamClusters {
|
||||
clusters = append(clusters, cluster)
|
||||
}
|
||||
|
||||
createdClusters[uid] = true
|
||||
}
|
||||
}
|
||||
return clusters, nil
|
||||
}
|
||||
|
||||
func (s *ResourceGenerator) configIngressUpstreamCluster(c *envoy_cluster_v3.Cluster, cfgSnap *proxycfg.ConfigSnapshot, listenerKey proxycfg.IngressListenerKey, u *structs.Upstream) {
|
||||
var threshold *envoy_cluster_v3.CircuitBreakers_Thresholds
|
||||
setThresholdLimit := func(limitType string, limit int) {
|
||||
|
|
|
@ -606,6 +606,7 @@ func (s *ResourceGenerator) endpointsFromSnapshotAPIGateway(cfgSnap *proxycfg.Co
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resources = append(resources, endpoints...)
|
||||
createdClusters[uid] = struct{}{}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue