From f5c9fa6fa669301f9f6a65920f36a8863fe3f213 Mon Sep 17 00:00:00 2001 From: Eric Date: Tue, 8 Mar 2022 14:37:24 -0500 Subject: [PATCH] Make an xdscommon package that will be shared between Consul and Envoy plugins --- agent/xds/clusters_test.go | 3 +- agent/xds/delta.go | 74 ++++------- agent/xds/delta_test.go | 207 ++++++++++++++++--------------- agent/xds/endpoints_test.go | 3 +- agent/xds/listeners_test.go | 3 +- agent/xds/resources.go | 11 +- agent/xds/routes_test.go | 3 +- agent/xds/server.go | 20 +-- agent/xds/xdscommon/xdscommon.go | 49 ++++++++ 9 files changed, 194 insertions(+), 179 deletions(-) create mode 100644 agent/xds/xdscommon/xdscommon.go diff --git a/agent/xds/clusters_test.go b/agent/xds/clusters_test.go index 3bda55904a..4b4de8d274 100644 --- a/agent/xds/clusters_test.go +++ b/agent/xds/clusters_test.go @@ -16,6 +16,7 @@ import ( "github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/xds/proxysupport" + "github.com/hashicorp/consul/agent/xds/xdscommon" "github.com/hashicorp/consul/sdk/testutil" ) @@ -491,7 +492,7 @@ func TestClustersFromSnapshot(t *testing.T) { return clusters[i].(*envoy_cluster_v3.Cluster).Name < clusters[j].(*envoy_cluster_v3.Cluster).Name }) - r, err := createResponse(ClusterType, "00000001", "00000001", clusters) + r, err := createResponse(xdscommon.ClusterType, "00000001", "00000001", clusters) require.NoError(t, err) t.Run("current", func(t *testing.T) { diff --git a/agent/xds/delta.go b/agent/xds/delta.go index 49360b8e33..cea41d453e 100644 --- a/agent/xds/delta.go +++ b/agent/xds/delta.go @@ -23,6 +23,7 @@ import ( "github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/agent/xds/xdscommon" "github.com/hashicorp/consul/logging" ) @@ -93,7 +94,7 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove // resourceMap is the SoTW we are incrementally attempting to sync to envoy. // // type => name => proto - resourceMap = emptyIndexedResources() + resourceMap = xdscommon.EmptyIndexedResources() // currentVersions is the the xDS versioning represented by Resources. // @@ -113,20 +114,20 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove // Configure handlers for each type of request we currently care about. handlers := map[string]*xDSDeltaType{ - ListenerType: newDeltaType(generator, stream, ListenerType, func(kind structs.ServiceKind) bool { + xdscommon.ListenerType: newDeltaType(generator, stream, xdscommon.ListenerType, func(kind structs.ServiceKind) bool { return cfgSnap.Kind == structs.ServiceKindIngressGateway }), - RouteType: newDeltaType(generator, stream, RouteType, func(kind structs.ServiceKind) bool { + xdscommon.RouteType: newDeltaType(generator, stream, xdscommon.RouteType, func(kind structs.ServiceKind) bool { return cfgSnap.Kind == structs.ServiceKindIngressGateway }), - ClusterType: newDeltaType(generator, stream, ClusterType, func(kind structs.ServiceKind) bool { + xdscommon.ClusterType: newDeltaType(generator, stream, xdscommon.ClusterType, func(kind structs.ServiceKind) bool { // Mesh, Ingress, and Terminating gateways are allowed to inform CDS of // no clusters. return cfgSnap.Kind == structs.ServiceKindMeshGateway || cfgSnap.Kind == structs.ServiceKindTerminatingGateway || cfgSnap.Kind == structs.ServiceKindIngressGateway }), - EndpointType: newDeltaType(generator, stream, EndpointType, nil), + xdscommon.EndpointType: newDeltaType(generator, stream, xdscommon.EndpointType, nil), } // Endpoints are stored within a Cluster (and Routes @@ -138,8 +139,8 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove // representation of envoy state to force an update. // // see: https://github.com/envoyproxy/envoy/issues/13009 - handlers[ListenerType].childType = handlers[RouteType] - handlers[ClusterType].childType = handlers[EndpointType] + handlers[xdscommon.ListenerType].childType = handlers[xdscommon.RouteType] + handlers[xdscommon.ClusterType].childType = handlers[xdscommon.EndpointType] var authTimer <-chan time.Time extendAuthTimer := func() { @@ -332,18 +333,18 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove var xDSUpdateOrder = []xDSUpdateOperation{ // 1. CDS updates (if any) must always be pushed first. - {TypeUrl: ClusterType, Upsert: true}, + {TypeUrl: xdscommon.ClusterType, Upsert: true}, // 2. EDS updates (if any) must arrive after CDS updates for the respective clusters. - {TypeUrl: EndpointType, Upsert: true}, + {TypeUrl: xdscommon.EndpointType, Upsert: true}, // 3. LDS updates must arrive after corresponding CDS/EDS updates. - {TypeUrl: ListenerType, Upsert: true, Remove: true}, + {TypeUrl: xdscommon.ListenerType, Upsert: true, Remove: true}, // 4. RDS updates related to the newly added listeners must arrive after CDS/EDS/LDS updates. - {TypeUrl: RouteType, Upsert: true, Remove: true}, + {TypeUrl: xdscommon.RouteType, Upsert: true, Remove: true}, // 5. (NOT IMPLEMENTED YET IN CONSUL) VHDS updates (if any) related to the newly added RouteConfigurations must arrive after RDS updates. // {}, // 6. Stale CDS clusters and related EDS endpoints (ones no longer being referenced) can then be removed. - {TypeUrl: ClusterType, Remove: true}, - {TypeUrl: EndpointType, Remove: true}, + {TypeUrl: xdscommon.ClusterType, Remove: true}, + {TypeUrl: xdscommon.EndpointType, Remove: true}, // xDS updates can be pushed independently if no new // clusters/routes/listeners are added or if it’s acceptable to // temporarily drop traffic during updates. Note that in case of @@ -464,7 +465,7 @@ func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest, sf su if sf.ForceLDSandCDSToAlwaysUseWildcardsOnReconnect { switch t.typeURL { - case ListenerType, ClusterType: + case xdscommon.ListenerType, xdscommon.ClusterType: if !t.wildcard { t.wildcard = true logger.Trace("fixing Envoy bug fixed in 1.19.0 by inferring wildcard mode for type") @@ -628,7 +629,7 @@ func (t *xDSDeltaType) nack(nonce string) { func (t *xDSDeltaType) SendIfNew( kind structs.ServiceKind, currentVersions map[string]string, // type => name => version (as consul knows right now) - resourceMap *IndexedResources, + resourceMap *xdscommon.IndexedResources, nonce *uint64, upsert, remove bool, ) (error, bool) { @@ -688,7 +689,7 @@ func (t *xDSDeltaType) SendIfNew( func (t *xDSDeltaType) createDeltaResponse( currentVersions map[string]string, // name => version (as consul knows right now) - resourceMap *IndexedResources, + resourceMap *xdscommon.IndexedResources, upsert, remove bool, ) (*envoy_discovery_v3.DeltaDiscoveryResponse, map[string]PendingUpdate, error) { // compute difference @@ -797,7 +798,7 @@ func (t *xDSDeltaType) createDeltaResponse( return resp, realUpdates, nil } -func computeResourceVersions(resourceMap *IndexedResources) (map[string]map[string]string, error) { +func computeResourceVersions(resourceMap *xdscommon.IndexedResources) (map[string]map[string]string, error) { out := make(map[string]map[string]string) for typeUrl, resources := range resourceMap.Index { m, err := hashResourceMap(resources) @@ -809,52 +810,27 @@ func computeResourceVersions(resourceMap *IndexedResources) (map[string]map[stri return out, nil } -type IndexedResources struct { - // Index is a map of typeURL => resourceName => resource - Index map[string]map[string]proto.Message - - // ChildIndex is a map of typeURL => parentResourceName => list of - // childResourceNames. This only applies if the child and parent do not - // share a name. - ChildIndex map[string]map[string][]string -} - -func emptyIndexedResources() *IndexedResources { - return &IndexedResources{ - Index: map[string]map[string]proto.Message{ - ListenerType: make(map[string]proto.Message), - RouteType: make(map[string]proto.Message), - ClusterType: make(map[string]proto.Message), - EndpointType: make(map[string]proto.Message), - }, - ChildIndex: map[string]map[string][]string{ - ListenerType: make(map[string][]string), - ClusterType: make(map[string][]string), - }, - } -} - -func populateChildIndexMap(resourceMap *IndexedResources) error { +func populateChildIndexMap(resourceMap *xdscommon.IndexedResources) error { // LDS and RDS have a more complicated relationship. - for name, res := range resourceMap.Index[ListenerType] { + for name, res := range resourceMap.Index[xdscommon.ListenerType] { listener := res.(*envoy_listener_v3.Listener) rdsRouteNames, err := extractRdsResourceNames(listener) if err != nil { return err } - resourceMap.ChildIndex[ListenerType][name] = rdsRouteNames + resourceMap.ChildIndex[xdscommon.ListenerType][name] = rdsRouteNames } // CDS and EDS share exact names. - for name := range resourceMap.Index[ClusterType] { - resourceMap.ChildIndex[ClusterType][name] = []string{name} + for name := range resourceMap.Index[xdscommon.ClusterType] { + resourceMap.ChildIndex[xdscommon.ClusterType][name] = []string{name} } return nil } -func indexResources(logger hclog.Logger, resources map[string][]proto.Message) *IndexedResources { - data := emptyIndexedResources() +func indexResources(logger hclog.Logger, resources map[string][]proto.Message) *xdscommon.IndexedResources { + data := xdscommon.EmptyIndexedResources() for typeURL, typeRes := range resources { for _, res := range typeRes { diff --git a/agent/xds/delta_test.go b/agent/xds/delta_test.go index de1e9e13a7..7e2ddeae13 100644 --- a/agent/xds/delta_test.go +++ b/agent/xds/delta_test.go @@ -18,6 +18,7 @@ import ( "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/agent/xds/xdscommon" ) // NOTE: For these tests, prefer not using xDS protobuf "factory" methods if @@ -46,7 +47,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) { // Send initial cluster discover. We'll assume we are testing a partial // reconnect and include some initial resource versions that will be // cleaned up. - envoy.SendDeltaReq(t, ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{ + envoy.SendDeltaReq(t, xdscommon.ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{ InitialResourceVersions: mustMakeVersionMap(t, makeTestCluster(t, snap, "tcp:geo-cache"), ), @@ -63,7 +64,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) { runStep(t, "first sync", func(t *testing.T) { assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: ClusterType, + TypeUrl: xdscommon.ClusterType, Nonce: hexString(1), Resources: makeTestResources(t, makeTestCluster(t, snap, "tcp:local_app"), @@ -73,7 +74,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) { }) // Envoy then tries to discover endpoints for those clusters. - envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ + envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ // We'll assume we are testing a partial "reconnect" InitialResourceVersions: mustMakeVersionMap(t, makeTestEndpoints(t, snap, "tcp:geo-cache"), @@ -89,14 +90,14 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) { }) // It also (in parallel) issues the cluster ACK - envoy.SendDeltaReqACK(t, ClusterType, 1) + envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1) // We should get a response immediately since the config is already present in // the server for endpoints. Note that this should not be racy if the server // is behaving well since the Cluster send above should be blocked until we // deliver a new config version. assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: EndpointType, + TypeUrl: xdscommon.EndpointType, Nonce: hexString(2), Resources: makeTestResources(t, makeTestEndpoints(t, snap, "tcp:db"), @@ -109,14 +110,14 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) { assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) // Envoy now sends listener request - envoy.SendDeltaReq(t, ListenerType, nil) + envoy.SendDeltaReq(t, xdscommon.ListenerType, nil) // It also (in parallel) issues the endpoint ACK - envoy.SendDeltaReqACK(t, EndpointType, 2) + envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 2) // And should get a response immediately. assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: ListenerType, + TypeUrl: xdscommon.ListenerType, Nonce: hexString(3), Resources: makeTestResources(t, makeTestListener(t, snap, "tcp:public_listener"), @@ -129,25 +130,25 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) { assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) // ACKs the listener - envoy.SendDeltaReqACK(t, ListenerType, 3) + envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 3) // If we re-subscribe to something even if there are no changes we get a // fresh copy. - envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ + envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ ResourceNamesSubscribe: []string{ "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul", }, }) assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: EndpointType, + TypeUrl: xdscommon.EndpointType, Nonce: hexString(4), Resources: makeTestResources(t, makeTestEndpoints(t, snap, "tcp:geo-cache"), ), }) - envoy.SendDeltaReqACK(t, EndpointType, 4) + envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 4) // And no other response yet assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) @@ -159,7 +160,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) { } runStep(t, "avoid sending config for unsubscribed resource", func(t *testing.T) { - envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ + envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ ResourceNamesUnsubscribe: []string{ "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul", }, @@ -185,20 +186,20 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) { assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) // and fix the subscription - envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ + envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ ResourceNamesSubscribe: []string{ "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul", }, }) assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: EndpointType, + TypeUrl: xdscommon.EndpointType, Nonce: hexString(5), Resources: makeTestResources(t, makeTestEndpoints(t, snap, "tcp:db"), ), }) - envoy.SendDeltaReqACK(t, EndpointType, 5) + envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 5) assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) }) @@ -247,7 +248,7 @@ func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) { snap.Port = 1 // Send initial cluster discover. - envoy.SendDeltaReq(t, ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{}) + envoy.SendDeltaReq(t, xdscommon.ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{}) // Check no response sent yet assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) @@ -260,7 +261,7 @@ func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) { runStep(t, "first sync", func(t *testing.T) { assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: ClusterType, + TypeUrl: xdscommon.ClusterType, Nonce: hexString(1), Resources: makeTestResources(t, makeTestCluster(t, snap, "tcp:local_app"), @@ -270,7 +271,7 @@ func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) { }) // Envoy then tries to discover endpoints for those clusters. - envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ + envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ ResourceNamesSubscribe: []string{ "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul", "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul", @@ -278,14 +279,14 @@ func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) { }) // It also (in parallel) issues the cluster ACK - envoy.SendDeltaReqACK(t, ClusterType, 1) + envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1) // We should get a response immediately since the config is already present in // the server for endpoints. Note that this should not be racy if the server // is behaving well since the Cluster send above should be blocked until we // deliver a new config version. assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: EndpointType, + TypeUrl: xdscommon.EndpointType, Nonce: hexString(2), Resources: makeTestResources(t, makeTestEndpoints(t, snap, "tcp:db"), @@ -297,14 +298,14 @@ func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) { assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) // Envoy now sends listener request - envoy.SendDeltaReq(t, ListenerType, nil) + envoy.SendDeltaReq(t, xdscommon.ListenerType, nil) // It also (in parallel) issues the endpoint ACK - envoy.SendDeltaReqACK(t, EndpointType, 2) + envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 2) // And should get a response immediately. assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: ListenerType, + TypeUrl: xdscommon.ListenerType, Nonce: hexString(3), Resources: makeTestResources(t, // Response contains public_listener with port that Envoy can't bind to @@ -318,7 +319,7 @@ func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) { assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) // NACKs the listener update due to the bad public listener - envoy.SendDeltaReqNACK(t, ListenerType, 3, &rpcstatus.Status{}) + envoy.SendDeltaReqNACK(t, xdscommon.ListenerType, 3, &rpcstatus.Status{}) // Consul should not respond until a new snapshot is delivered assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) @@ -331,7 +332,7 @@ func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) { // And should send a response immediately. assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: ListenerType, + TypeUrl: xdscommon.ListenerType, Nonce: hexString(4), Resources: makeTestResources(t, // Send a public listener that Envoy will accept @@ -342,7 +343,7 @@ func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) { }) // New listener is acked now - envoy.SendDeltaReqACK(t, EndpointType, 4) + envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 4) assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) }) @@ -370,7 +371,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) { mgr.RegisterProxy(t, sid) // Send initial cluster discover (empty payload) - envoy.SendDeltaReq(t, ClusterType, nil) + envoy.SendDeltaReq(t, xdscommon.ClusterType, nil) // Check no response sent yet assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) @@ -385,7 +386,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) { runStep(t, "no-rds", func(t *testing.T) { assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: ClusterType, + TypeUrl: xdscommon.ClusterType, Nonce: hexString(1), Resources: makeTestResources(t, makeTestCluster(t, snap, "tcp:local_app"), @@ -395,7 +396,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) { }) // Envoy then tries to discover endpoints for those clusters. - envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ + envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ ResourceNamesSubscribe: []string{ "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul", "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul", @@ -403,14 +404,14 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) { }) // It also (in parallel) issues the cluster ACK - envoy.SendDeltaReqACK(t, ClusterType, 1) + envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1) // We should get a response immediately since the config is already present in // the server for endpoints. Note that this should not be racy if the server // is behaving well since the Cluster send above should be blocked until we // deliver a new config version. assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: EndpointType, + TypeUrl: xdscommon.EndpointType, Nonce: hexString(2), Resources: makeTestResources(t, makeTestEndpoints(t, snap, "http2:db"), @@ -422,14 +423,14 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) { assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) // Envoy now sends listener request - envoy.SendDeltaReq(t, ListenerType, nil) + envoy.SendDeltaReq(t, xdscommon.ListenerType, nil) // It also (in parallel) issues the endpoint ACK - envoy.SendDeltaReqACK(t, EndpointType, 2) + envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 2) // And should get a response immediately. assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: ListenerType, + TypeUrl: xdscommon.ListenerType, Nonce: hexString(3), Resources: makeTestResources(t, makeTestListener(t, snap, "tcp:public_listener"), @@ -442,7 +443,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) { assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) // ACKs the listener - envoy.SendDeltaReqACK(t, ListenerType, 3) + envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 3) // And no other response yet assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) @@ -464,7 +465,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) { runStep(t, "with-rds", func(t *testing.T) { // Just the "db" listener sees a change assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: ListenerType, + TypeUrl: xdscommon.ListenerType, Nonce: hexString(4), Resources: makeTestResources(t, makeTestListener(t, snap, "http2:db:rds"), @@ -475,25 +476,25 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) { assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) // Envoy now sends routes request - envoy.SendDeltaReq(t, RouteType, &envoy_discovery_v3.DeltaDiscoveryRequest{ + envoy.SendDeltaReq(t, xdscommon.RouteType, &envoy_discovery_v3.DeltaDiscoveryRequest{ ResourceNamesSubscribe: []string{ "db", }, }) // ACKs the listener - envoy.SendDeltaReqACK(t, ListenerType, 4) + envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 4) // And should get a response immediately. assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: RouteType, + TypeUrl: xdscommon.RouteType, Nonce: hexString(5), Resources: makeTestResources(t, makeTestRoute(t, "http2:db"), ), }) - envoy.SendDeltaReqACK(t, RouteType, 5) + envoy.SendDeltaReqACK(t, xdscommon.RouteType, 5) assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) }) @@ -520,11 +521,11 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T) // This mutateFn causes any endpoint with a name containing "geo-cache" to be // omitted from the response while the hack is active. var slowHackDisabled uint32 - server.ResourceMapMutateFn = func(resourceMap *IndexedResources) { + server.ResourceMapMutateFn = func(resourceMap *xdscommon.IndexedResources) { if atomic.LoadUint32(&slowHackDisabled) == 1 { return } - if em, ok := resourceMap.Index[EndpointType]; ok { + if em, ok := resourceMap.Index[xdscommon.EndpointType]; ok { for k := range em { if strings.Contains(k, "geo-cache") { delete(em, k) @@ -543,7 +544,7 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T) snap = newTestSnapshot(t, nil, "") // Send initial cluster discover. - envoy.SendDeltaReq(t, ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{}) + envoy.SendDeltaReq(t, xdscommon.ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{}) // Check no response sent yet assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) @@ -554,7 +555,7 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T) mgr.DeliverConfig(t, sid, snap) assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: ClusterType, + TypeUrl: xdscommon.ClusterType, Nonce: hexString(1), Resources: makeTestResources(t, makeTestCluster(t, snap, "tcp:local_app"), @@ -564,7 +565,7 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T) }) // Envoy then tries to discover endpoints for those clusters. - envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ + envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ ResourceNamesSubscribe: []string{ "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul", "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul", @@ -572,7 +573,7 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T) }) // It also (in parallel) issues the cluster ACK - envoy.SendDeltaReqACK(t, ClusterType, 1) + envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1) // We should get a response immediately since the config is already present in // the server for endpoints. Note that this should not be racy if the server @@ -581,7 +582,7 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T) // // NOTE: we do NOT return back geo-cache yet assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: EndpointType, + TypeUrl: xdscommon.EndpointType, Nonce: hexString(2), Resources: makeTestResources(t, makeTestEndpoints(t, snap, "tcp:db"), @@ -593,14 +594,14 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T) assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) // Envoy now sends listener request - envoy.SendDeltaReq(t, ListenerType, nil) + envoy.SendDeltaReq(t, xdscommon.ListenerType, nil) // It also (in parallel) issues the endpoint ACK - envoy.SendDeltaReqACK(t, EndpointType, 2) + envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 2) // And should get a response immediately. assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: ListenerType, + TypeUrl: xdscommon.ListenerType, Nonce: hexString(3), Resources: makeTestResources(t, makeTestListener(t, snap, "tcp:public_listener"), @@ -613,7 +614,7 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T) assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) // ACKs the listener - envoy.SendDeltaReqACK(t, ListenerType, 3) + envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 3) }) // Disable hack. Need to wait for one more event to wake up the loop. @@ -626,7 +627,7 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T) mgr.DeliverConfig(t, sid, snap) assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: EndpointType, + TypeUrl: xdscommon.EndpointType, Nonce: hexString(4), Resources: makeTestResources(t, makeTestEndpoints(t, snap, "tcp:geo-cache"), @@ -637,7 +638,7 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T) assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) // It also (in parallel) issues the endpoint ACK - envoy.SendDeltaReqACK(t, EndpointType, 4) + envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 4) }) @@ -674,7 +675,7 @@ func TestServer_DeltaAggregatedResources_v3_GetAllClusterAfterConsulRestarted(t // This is to simulate the discovery request call from envoy after disconnected from consul ads stream. // // We need to force it to be an older version of envoy so that the logic shifts. - envoy.SendDeltaReq(t, ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{ + envoy.SendDeltaReq(t, xdscommon.ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{ ResourceNamesSubscribe: []string{ "local_app", "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul", @@ -697,7 +698,7 @@ func TestServer_DeltaAggregatedResources_v3_GetAllClusterAfterConsulRestarted(t mgr.DeliverConfig(t, sid, snap) assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: ClusterType, + TypeUrl: xdscommon.ClusterType, Nonce: hexString(1), Resources: makeTestResources(t, makeTestCluster(t, snap, "tcp:local_app"), @@ -734,7 +735,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa snap = newTestSnapshot(t, nil, "") // Send initial cluster discover. - envoy.SendDeltaReq(t, ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{}) + envoy.SendDeltaReq(t, xdscommon.ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{}) // Check no response sent yet assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) @@ -745,7 +746,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa mgr.DeliverConfig(t, sid, snap) assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: ClusterType, + TypeUrl: xdscommon.ClusterType, Nonce: hexString(1), Resources: makeTestResources(t, makeTestCluster(t, snap, "tcp:local_app"), @@ -755,7 +756,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa }) // Envoy then tries to discover endpoints for those clusters. - envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ + envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ ResourceNamesSubscribe: []string{ "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul", "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul", @@ -763,14 +764,14 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa }) // It also (in parallel) issues the cluster ACK - envoy.SendDeltaReqACK(t, ClusterType, 1) + envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1) // We should get a response immediately since the config is already present in // the server for endpoints. Note that this should not be racy if the server // is behaving well since the Cluster send above should be blocked until we // deliver a new config version. assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: EndpointType, + TypeUrl: xdscommon.EndpointType, Nonce: hexString(2), Resources: makeTestResources(t, makeTestEndpoints(t, snap, "tcp:db"), @@ -782,14 +783,14 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) // Envoy now sends listener request - envoy.SendDeltaReq(t, ListenerType, nil) + envoy.SendDeltaReq(t, xdscommon.ListenerType, nil) // It also (in parallel) issues the endpoint ACK - envoy.SendDeltaReqACK(t, EndpointType, 2) + envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 2) // And should get a response immediately. assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: ListenerType, + TypeUrl: xdscommon.ListenerType, Nonce: hexString(3), Resources: makeTestResources(t, makeTestListener(t, snap, "tcp:public_listener"), @@ -802,7 +803,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) // ACKs the listener - envoy.SendDeltaReqACK(t, ListenerType, 3) + envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 3) }) runStep(t, "trigger cluster update needing implicit endpoint replacements", func(t *testing.T) { @@ -816,7 +817,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa // The cluster is updated assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: ClusterType, + TypeUrl: xdscommon.ClusterType, Nonce: hexString(4), Resources: makeTestResources(t, // SAME makeTestCluster(t, snap, "tcp:local_app"), @@ -825,19 +826,19 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa ), }) - envoy.SendDeltaReqACK(t, ClusterType, 4) + envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 4) // And we re-send the endpoints for the updated cluster after getting the // ACK for the cluster. assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: EndpointType, + TypeUrl: xdscommon.EndpointType, Nonce: hexString(5), Resources: makeTestResources(t, makeTestEndpoints(t, snap, "tcp:db"), // SAME makeTestEndpoints(t, snap, "tcp:geo-cache"), ), }) - envoy.SendDeltaReqACK(t, EndpointType, 5) + envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 5) // And no other response yet assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) @@ -869,7 +870,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan runStep(t, "get into initial state", func(t *testing.T) { // Send initial cluster discover (empty payload) - envoy.SendDeltaReq(t, ClusterType, nil) + envoy.SendDeltaReq(t, xdscommon.ClusterType, nil) // Check no response sent yet assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) @@ -887,7 +888,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan mgr.DeliverConfig(t, sid, snap) assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: ClusterType, + TypeUrl: xdscommon.ClusterType, Nonce: hexString(1), Resources: makeTestResources(t, makeTestCluster(t, snap, "tcp:local_app"), @@ -897,7 +898,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan }) // Envoy then tries to discover endpoints for those clusters. - envoy.SendDeltaReq(t, EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ + envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{ ResourceNamesSubscribe: []string{ "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul", "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul", @@ -905,14 +906,14 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan }) // It also (in parallel) issues the cluster ACK - envoy.SendDeltaReqACK(t, ClusterType, 1) + envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1) // We should get a response immediately since the config is already present in // the server for endpoints. Note that this should not be racy if the server // is behaving well since the Cluster send above should be blocked until we // deliver a new config version. assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: EndpointType, + TypeUrl: xdscommon.EndpointType, Nonce: hexString(2), Resources: makeTestResources(t, makeTestEndpoints(t, snap, "http2:db"), @@ -924,14 +925,14 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) // Envoy now sends listener request - envoy.SendDeltaReq(t, ListenerType, nil) + envoy.SendDeltaReq(t, xdscommon.ListenerType, nil) // It also (in parallel) issues the endpoint ACK - envoy.SendDeltaReqACK(t, EndpointType, 2) + envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 2) // And should get a response immediately. assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: ListenerType, + TypeUrl: xdscommon.ListenerType, Nonce: hexString(3), Resources: makeTestResources(t, makeTestListener(t, snap, "tcp:public_listener"), @@ -944,25 +945,25 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) // Envoy now sends routes request - envoy.SendDeltaReq(t, RouteType, &envoy_discovery_v3.DeltaDiscoveryRequest{ + envoy.SendDeltaReq(t, xdscommon.RouteType, &envoy_discovery_v3.DeltaDiscoveryRequest{ ResourceNamesSubscribe: []string{ "db", }, }) // ACKs the listener - envoy.SendDeltaReqACK(t, ListenerType, 3) + envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 3) // And should get a response immediately. assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: RouteType, + TypeUrl: xdscommon.RouteType, Nonce: hexString(4), Resources: makeTestResources(t, makeTestRoute(t, "http2:db"), ), }) - envoy.SendDeltaReqACK(t, RouteType, 4) + envoy.SendDeltaReqACK(t, xdscommon.RouteType, 4) assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) }) @@ -984,14 +985,14 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan // db cluster is refreshed (unrelated to the test scenario other than it's required) assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: ClusterType, + TypeUrl: xdscommon.ClusterType, Nonce: hexString(5), Resources: makeTestResources(t, makeTestCluster(t, snap, "http:db"), ), }) - envoy.SendDeltaReqACK(t, ClusterType, 5) + envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 5) // The behaviors of Cluster updates triggering re-sends of Endpoint updates // tested in TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpactEndpoints @@ -999,18 +1000,18 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan // this exchange to get to the part we care about. assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: EndpointType, + TypeUrl: xdscommon.EndpointType, Nonce: hexString(6), Resources: makeTestResources(t, makeTestEndpoints(t, snap, "http:db"), ), }) - envoy.SendDeltaReqACK(t, EndpointType, 6) + envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 6) // the listener is updated assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: ListenerType, + TypeUrl: xdscommon.ListenerType, Nonce: hexString(7), Resources: makeTestResources(t, makeTestListener(t, snap, "http:db:rds"), @@ -1018,18 +1019,18 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan }) // ACKs the listener - envoy.SendDeltaReqACK(t, ListenerType, 7) + envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 7) // THE ACTUAL THING WE CARE ABOUT: replaced route config assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: RouteType, + TypeUrl: xdscommon.RouteType, Nonce: hexString(8), Resources: makeTestResources(t, makeTestRoute(t, "http2:db"), ), }) - envoy.SendDeltaReqACK(t, RouteType, 8) + envoy.SendDeltaReqACK(t, xdscommon.RouteType, 8) assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) }) @@ -1134,11 +1135,11 @@ func TestServer_DeltaAggregatedResources_v3_ACLEnforcement(t *testing.T) { // Send initial listener discover, in real life Envoy always sends cluster // first but it doesn't really matter and listener has a response that // includes the token in the ext rbac filter so lets us test more stuff. - envoy.SendDeltaReq(t, ListenerType, nil) + envoy.SendDeltaReq(t, xdscommon.ListenerType, nil) if !tt.wantDenied { assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: ListenerType, + TypeUrl: xdscommon.ListenerType, Nonce: hexString(1), Resources: makeTestResources(t, makeTestListener(t, snap, "tcp:public_listener"), @@ -1208,7 +1209,7 @@ func TestServer_DeltaAggregatedResources_v3_ACLTokenDeleted_StreamTerminatedDuri mgr.RegisterProxy(t, sid) // Send initial cluster discover (OK) - envoy.SendDeltaReq(t, ClusterType, nil) + envoy.SendDeltaReq(t, xdscommon.ClusterType, nil) { err, ok := getError() require.NoError(t, err) @@ -1228,7 +1229,7 @@ func TestServer_DeltaAggregatedResources_v3_ACLTokenDeleted_StreamTerminatedDuri mgr.DeliverConfig(t, sid, snap) assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: ClusterType, + TypeUrl: xdscommon.ClusterType, Nonce: hexString(1), Resources: makeTestResources(t, makeTestCluster(t, snap, "tcp:local_app"), @@ -1239,7 +1240,7 @@ func TestServer_DeltaAggregatedResources_v3_ACLTokenDeleted_StreamTerminatedDuri // It also (in parallel) issues the next cluster request (which acts as an ACK // of the version we sent) - envoy.SendDeltaReq(t, ClusterType, nil) + envoy.SendDeltaReq(t, xdscommon.ClusterType, nil) // Check no response sent yet assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) @@ -1306,7 +1307,7 @@ func TestServer_DeltaAggregatedResources_v3_ACLTokenDeleted_StreamTerminatedInBa mgr.RegisterProxy(t, sid) // Send initial cluster discover (OK) - envoy.SendDeltaReq(t, ClusterType, nil) + envoy.SendDeltaReq(t, xdscommon.ClusterType, nil) { err, ok := getError() require.NoError(t, err) @@ -1326,7 +1327,7 @@ func TestServer_DeltaAggregatedResources_v3_ACLTokenDeleted_StreamTerminatedInBa mgr.DeliverConfig(t, sid, snap) assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: ClusterType, + TypeUrl: xdscommon.ClusterType, Nonce: hexString(1), Resources: makeTestResources(t, makeTestCluster(t, snap, "tcp:local_app"), @@ -1337,7 +1338,7 @@ func TestServer_DeltaAggregatedResources_v3_ACLTokenDeleted_StreamTerminatedInBa // It also (in parallel) issues the next cluster request (which acts as an ACK // of the version we sent) - envoy.SendDeltaReq(t, ClusterType, nil) + envoy.SendDeltaReq(t, xdscommon.ClusterType, nil) // Check no response sent yet assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) @@ -1378,7 +1379,7 @@ func TestServer_DeltaAggregatedResources_v3_IngressEmptyResponse(t *testing.T) { mgr.RegisterProxy(t, sid) // Send initial cluster discover - envoy.SendDeltaReq(t, ClusterType, nil) + envoy.SendDeltaReq(t, xdscommon.ClusterType, nil) // Check no response sent yet assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) @@ -1388,25 +1389,25 @@ func TestServer_DeltaAggregatedResources_v3_IngressEmptyResponse(t *testing.T) { mgr.DeliverConfig(t, sid, snap) // REQ: clusters - envoy.SendDeltaReq(t, ClusterType, nil) + envoy.SendDeltaReq(t, xdscommon.ClusterType, nil) // RESP: cluster assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: ClusterType, + TypeUrl: xdscommon.ClusterType, Nonce: hexString(1), }) assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) // ACK: clusters - envoy.SendDeltaReqACK(t, ClusterType, 1) + envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1) // REQ: listeners - envoy.SendDeltaReq(t, ListenerType, nil) + envoy.SendDeltaReq(t, xdscommon.ListenerType, nil) // RESP: listeners assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ - TypeUrl: ListenerType, + TypeUrl: xdscommon.ListenerType, Nonce: hexString(2), }) diff --git a/agent/xds/endpoints_test.go b/agent/xds/endpoints_test.go index 62aad1e112..ca40d2cdb0 100644 --- a/agent/xds/endpoints_test.go +++ b/agent/xds/endpoints_test.go @@ -15,6 +15,7 @@ import ( "github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/xds/proxysupport" + "github.com/hashicorp/consul/agent/xds/xdscommon" "github.com/hashicorp/consul/sdk/testutil" ) @@ -512,7 +513,7 @@ func TestEndpointsFromSnapshot(t *testing.T) { sort.Slice(endpoints, func(i, j int) bool { return endpoints[i].(*envoy_endpoint_v3.ClusterLoadAssignment).ClusterName < endpoints[j].(*envoy_endpoint_v3.ClusterLoadAssignment).ClusterName }) - r, err := createResponse(EndpointType, "00000001", "00000001", endpoints) + r, err := createResponse(xdscommon.EndpointType, "00000001", "00000001", endpoints) require.NoError(t, err) t.Run("current", func(t *testing.T) { diff --git a/agent/xds/listeners_test.go b/agent/xds/listeners_test.go index 5a3a018326..71c5a09fef 100644 --- a/agent/xds/listeners_test.go +++ b/agent/xds/listeners_test.go @@ -17,6 +17,7 @@ import ( "github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/xds/proxysupport" + "github.com/hashicorp/consul/agent/xds/xdscommon" "github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/types" ) @@ -667,7 +668,7 @@ func TestListenersFromSnapshot(t *testing.T) { return listeners[i].(*envoy_listener_v3.Listener).Name < listeners[j].(*envoy_listener_v3.Listener).Name }) - r, err := createResponse(ListenerType, "00000001", "00000001", listeners) + r, err := createResponse(xdscommon.ListenerType, "00000001", "00000001", listeners) require.NoError(t, err) t.Run("current", func(t *testing.T) { diff --git a/agent/xds/resources.go b/agent/xds/resources.go index dbede72189..c2bb43b3a3 100644 --- a/agent/xds/resources.go +++ b/agent/xds/resources.go @@ -7,6 +7,7 @@ import ( "github.com/hashicorp/go-hclog" "github.com/hashicorp/consul/agent/proxycfg" + "github.com/hashicorp/consul/agent/xds/xdscommon" ) // ResourceGenerator is associated with a single gRPC stream and creates xDS @@ -36,7 +37,7 @@ func newResourceGenerator( func (g *ResourceGenerator) allResourcesFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot) (map[string][]proto.Message, error) { all := make(map[string][]proto.Message) - for _, typeUrl := range []string{ListenerType, RouteType, ClusterType, EndpointType} { + for _, typeUrl := range []string{xdscommon.ListenerType, xdscommon.RouteType, xdscommon.ClusterType, xdscommon.EndpointType} { res, err := g.resourcesFromSnapshot(typeUrl, cfgSnap) if err != nil { return nil, fmt.Errorf("failed to generate xDS resources for %q: %v", typeUrl, err) @@ -48,13 +49,13 @@ func (g *ResourceGenerator) allResourcesFromSnapshot(cfgSnap *proxycfg.ConfigSna func (g *ResourceGenerator) resourcesFromSnapshot(typeUrl string, cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) { switch typeUrl { - case ListenerType: + case xdscommon.ListenerType: return g.listenersFromSnapshot(cfgSnap) - case RouteType: + case xdscommon.RouteType: return g.routesFromSnapshot(cfgSnap) - case ClusterType: + case xdscommon.ClusterType: return g.clustersFromSnapshot(cfgSnap) - case EndpointType: + case xdscommon.EndpointType: return g.endpointsFromSnapshot(cfgSnap) default: return nil, fmt.Errorf("unknown typeUrl: %s", typeUrl) diff --git a/agent/xds/routes_test.go b/agent/xds/routes_test.go index 7284a715d8..ad033faadb 100644 --- a/agent/xds/routes_test.go +++ b/agent/xds/routes_test.go @@ -15,6 +15,7 @@ import ( "github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/xds/proxysupport" + "github.com/hashicorp/consul/agent/xds/xdscommon" "github.com/hashicorp/consul/sdk/testutil" ) @@ -201,7 +202,7 @@ func TestRoutesFromSnapshot(t *testing.T) { sort.Slice(routes, func(i, j int) bool { return routes[i].(*envoy_route_v3.RouteConfiguration).Name < routes[j].(*envoy_route_v3.RouteConfiguration).Name }) - r, err := createResponse(RouteType, "00000001", "00000001", routes) + r, err := createResponse(xdscommon.RouteType, "00000001", "00000001", routes) require.NoError(t, err) t.Run("current", func(t *testing.T) { diff --git a/agent/xds/server.go b/agent/xds/server.go index d0e1934ad3..4ebbd088b5 100644 --- a/agent/xds/server.go +++ b/agent/xds/server.go @@ -23,6 +23,7 @@ import ( agentgrpc "github.com/hashicorp/consul/agent/grpc" "github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/agent/xds/xdscommon" "github.com/hashicorp/consul/tlsutil" ) @@ -37,23 +38,6 @@ var StatsGauges = []prometheus.GaugeDefinition{ type ADSStream = envoy_discovery_v3.AggregatedDiscoveryService_StreamAggregatedResourcesServer const ( - // Resource types in xDS v3. These are copied from - // envoyproxy/go-control-plane/pkg/resource/v3/resource.go since we don't need any of - // the rest of that package. - apiTypePrefix = "type.googleapis.com/" - - // EndpointType is the TypeURL for Endpoint discovery responses. - EndpointType = apiTypePrefix + "envoy.config.endpoint.v3.ClusterLoadAssignment" - - // ClusterType is the TypeURL for Cluster discovery responses. - ClusterType = apiTypePrefix + "envoy.config.cluster.v3.Cluster" - - // RouteType is the TypeURL for Route discovery responses. - RouteType = apiTypePrefix + "envoy.config.route.v3.RouteConfiguration" - - // ListenerType is the TypeURL for Listener discovery responses. - ListenerType = apiTypePrefix + "envoy.config.listener.v3.Listener" - // PublicListenerName is the name we give the public listener in Envoy config. PublicListenerName = "public_listener" @@ -145,7 +129,7 @@ type Server struct { AuthCheckFrequency time.Duration // ResourceMapMutateFn exclusively exists for testing purposes. - ResourceMapMutateFn func(resourceMap *IndexedResources) + ResourceMapMutateFn func(resourceMap *xdscommon.IndexedResources) activeStreams *activeStreamCounters } diff --git a/agent/xds/xdscommon/xdscommon.go b/agent/xds/xdscommon/xdscommon.go new file mode 100644 index 0000000000..2c373e2982 --- /dev/null +++ b/agent/xds/xdscommon/xdscommon.go @@ -0,0 +1,49 @@ +package xdscommon + +import ( + "github.com/golang/protobuf/proto" +) + +const ( + // Resource types in xDS v3. These are copied from + // envoyproxy/go-control-plane/pkg/resource/v3/resource.go since we don't need any of + // the rest of that package. + apiTypePrefix = "type.googleapis.com/" + + // EndpointType is the TypeURL for Endpoint discovery responses. + EndpointType = apiTypePrefix + "envoy.config.endpoint.v3.ClusterLoadAssignment" + + // ClusterType is the TypeURL for Cluster discovery responses. + ClusterType = apiTypePrefix + "envoy.config.cluster.v3.Cluster" + + // RouteType is the TypeURL for Route discovery responses. + RouteType = apiTypePrefix + "envoy.config.route.v3.RouteConfiguration" + + // ListenerType is the TypeURL for Listener discovery responses. + ListenerType = apiTypePrefix + "envoy.config.listener.v3.Listener" +) + +type IndexedResources struct { + // Index is a map of typeURL => resourceName => resource + Index map[string]map[string]proto.Message + + // ChildIndex is a map of typeURL => parentResourceName => list of + // childResourceNames. This only applies if the child and parent do not + // share a name. + ChildIndex map[string]map[string][]string +} + +func EmptyIndexedResources() *IndexedResources { + return &IndexedResources{ + Index: map[string]map[string]proto.Message{ + ListenerType: make(map[string]proto.Message), + RouteType: make(map[string]proto.Message), + ClusterType: make(map[string]proto.Message), + EndpointType: make(map[string]proto.Message), + }, + ChildIndex: map[string]map[string][]string{ + ListenerType: make(map[string][]string), + ClusterType: make(map[string][]string), + }, + } +}