mirror of https://github.com/hashicorp/consul
Avoid blocking child type updates on parent ack (#15083)
parent
c064ddf606
commit
7f5f7e9cf9
|
@ -0,0 +1,3 @@
|
|||
```release-note:bug
|
||||
connect: fixed bug where endpoint updates for new xDS clusters could block for 15s before being sent to Envoy.
|
||||
```
|
|
@ -4,7 +4,6 @@ import (
|
|||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
@ -158,8 +157,14 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
|
|||
// representation of envoy state to force an update.
|
||||
//
|
||||
// see: https://github.com/envoyproxy/envoy/issues/13009
|
||||
handlers[xdscommon.ListenerType].childType = handlers[xdscommon.RouteType]
|
||||
handlers[xdscommon.ClusterType].childType = handlers[xdscommon.EndpointType]
|
||||
handlers[xdscommon.ListenerType].deltaChild = &xDSDeltaChild{
|
||||
childType: handlers[xdscommon.RouteType],
|
||||
childrenNames: make(map[string][]string),
|
||||
}
|
||||
handlers[xdscommon.ClusterType].deltaChild = &xDSDeltaChild{
|
||||
childType: handlers[xdscommon.EndpointType],
|
||||
childrenNames: make(map[string][]string),
|
||||
}
|
||||
|
||||
var authTimer <-chan time.Time
|
||||
extendAuthTimer := func() {
|
||||
|
@ -346,22 +351,6 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
|
|||
continue
|
||||
}
|
||||
|
||||
var pendingTypes []string
|
||||
for typeUrl, handler := range handlers {
|
||||
if !handler.registered {
|
||||
continue
|
||||
}
|
||||
if len(handler.pendingUpdates) > 0 {
|
||||
pendingTypes = append(pendingTypes, typeUrl)
|
||||
}
|
||||
}
|
||||
if len(pendingTypes) > 0 {
|
||||
sort.Strings(pendingTypes)
|
||||
generator.Logger.Trace("Skipping delta computation because there are responses in flight",
|
||||
"pendingTypeUrls", pendingTypes)
|
||||
continue
|
||||
}
|
||||
|
||||
generator.Logger.Trace("Invoking all xDS resource handlers and sending changed data if there are any")
|
||||
|
||||
streamStartOnce.Do(func() {
|
||||
|
@ -369,7 +358,25 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
|
|||
})
|
||||
|
||||
for _, op := range xDSUpdateOrder {
|
||||
err, sent := handlers[op.TypeUrl].SendIfNew(
|
||||
if op.TypeUrl == xdscommon.ListenerType || op.TypeUrl == xdscommon.RouteType {
|
||||
if clusterHandler := handlers[xdscommon.ClusterType]; clusterHandler.registered && len(clusterHandler.pendingUpdates) > 0 {
|
||||
generator.Logger.Trace("Skipping delta computation for resource because there are dependent updates pending",
|
||||
"typeUrl", op.TypeUrl, "dependent", xdscommon.ClusterType)
|
||||
|
||||
// Receiving an ACK from Envoy will unblock the select statement above,
|
||||
// and re-trigger an attempt to send these skipped updates.
|
||||
break
|
||||
}
|
||||
if endpointHandler := handlers[xdscommon.EndpointType]; endpointHandler.registered && len(endpointHandler.pendingUpdates) > 0 {
|
||||
generator.Logger.Trace("Skipping delta computation for resource because there are dependent updates pending",
|
||||
"typeUrl", op.TypeUrl, "dependent", xdscommon.EndpointType)
|
||||
|
||||
// Receiving an ACK from Envoy will unblock the select statement above,
|
||||
// and re-trigger an attempt to send these skipped updates.
|
||||
break
|
||||
}
|
||||
}
|
||||
err, _ := handlers[op.TypeUrl].SendIfNew(
|
||||
cfgSnap.Kind,
|
||||
currentVersions[op.TypeUrl],
|
||||
resourceMap,
|
||||
|
@ -383,9 +390,6 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
|
|||
op.errorLogNameReplyPrefix(),
|
||||
op.TypeUrl, err)
|
||||
}
|
||||
if sent {
|
||||
break // wait until we get an ACK to do more
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -435,16 +439,26 @@ func (op *xDSUpdateOperation) errorLogNameReplyPrefix() string {
|
|||
}
|
||||
}
|
||||
|
||||
type xDSDeltaChild struct {
|
||||
// childType is a type that in Envoy is actually stored within this type.
|
||||
// Upserts of THIS type should potentially trigger dependent named
|
||||
// resources within the child to be re-configured.
|
||||
childType *xDSDeltaType
|
||||
|
||||
// childrenNames is map of parent resource names to a list of associated child resource
|
||||
// names.
|
||||
childrenNames map[string][]string
|
||||
}
|
||||
|
||||
type xDSDeltaType struct {
|
||||
generator *ResourceGenerator
|
||||
stream ADSDeltaStream
|
||||
typeURL string
|
||||
allowEmptyFn func(kind structs.ServiceKind) bool
|
||||
|
||||
// childType is a type that in Envoy is actually stored within this type.
|
||||
// Upserts of THIS type should potentially trigger dependent named
|
||||
// resources within the child to be re-configured.
|
||||
childType *xDSDeltaType
|
||||
// deltaChild contains data for an xDS child type if there is one.
|
||||
// For example, endpoints are a child type of clusters.
|
||||
deltaChild *xDSDeltaChild
|
||||
|
||||
// registered indicates if this type has been requested at least once by
|
||||
// the proxy
|
||||
|
@ -484,9 +498,8 @@ func (t *xDSDeltaType) subscribed(name string) bool {
|
|||
}
|
||||
|
||||
type PendingUpdate struct {
|
||||
Remove bool
|
||||
Version string
|
||||
ChildResources []string // optional
|
||||
Remove bool
|
||||
Version string
|
||||
}
|
||||
|
||||
func newDeltaType(
|
||||
|
@ -610,6 +623,15 @@ func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest, sf su
|
|||
t.resourceVersions[name] = ""
|
||||
}
|
||||
|
||||
// Certain xDS types are children of other types, meaning that if Envoy subscribes to a parent.
|
||||
// We MUST assume that if Envoy ever had data for the children of this parent, then the child's
|
||||
// data is gone.
|
||||
if t.deltaChild != nil && t.deltaChild.childType.registered {
|
||||
for _, childName := range t.deltaChild.childrenNames[name] {
|
||||
t.ensureChildResend(name, childName)
|
||||
}
|
||||
}
|
||||
|
||||
if alreadySubscribed {
|
||||
logger.Trace("re-subscribing resource for stream", "resource", name)
|
||||
} else {
|
||||
|
@ -646,27 +668,6 @@ func (t *xDSDeltaType) ack(nonce string) {
|
|||
}
|
||||
|
||||
t.resourceVersions[name] = obj.Version
|
||||
if t.childType != nil {
|
||||
// This branch only matters on UPDATE, since we already have
|
||||
// mechanisms to clean up orphaned resources.
|
||||
for _, childName := range obj.ChildResources {
|
||||
if _, exist := t.childType.resourceVersions[childName]; !exist {
|
||||
continue
|
||||
}
|
||||
if !t.subscribed(childName) {
|
||||
continue
|
||||
}
|
||||
t.generator.Logger.Trace(
|
||||
"triggering implicit update of resource",
|
||||
"typeUrl", t.typeURL,
|
||||
"resource", name,
|
||||
"childTypeUrl", t.childType.typeURL,
|
||||
"childResource", childName,
|
||||
)
|
||||
// Basically manifest this as a re-subscribe/re-sync
|
||||
t.childType.resourceVersions[childName] = ""
|
||||
}
|
||||
}
|
||||
}
|
||||
t.sentToEnvoyOnce = true
|
||||
delete(t.pendingUpdates, nonce)
|
||||
|
@ -686,6 +687,12 @@ func (t *xDSDeltaType) SendIfNew(
|
|||
if t == nil || !t.registered {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// Wait for Envoy to catch up with this delta type before sending something new.
|
||||
if len(t.pendingUpdates) > 0 {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
logger := t.generator.Logger.With("typeUrl", t.typeURL)
|
||||
|
||||
allowEmpty := t.allowEmptyFn != nil && t.allowEmptyFn(kind)
|
||||
|
@ -721,14 +728,23 @@ func (t *xDSDeltaType) SendIfNew(
|
|||
}
|
||||
logger.Trace("sent response", "nonce", resp.Nonce)
|
||||
|
||||
if t.childType != nil {
|
||||
// Capture the relevant child resource names on this pending update so
|
||||
// we can properly clean up the linked children when this change is
|
||||
// ACKed.
|
||||
for name, obj := range updates {
|
||||
// Certain xDS types are children of other types, meaning that if an update is pushed for a parent,
|
||||
// we MUST send new data for all its children. Envoy will NOT re-subscribe to the child data upon
|
||||
// receiving updates for the parent, so we need to handle this ourselves.
|
||||
//
|
||||
// Note that we do not check whether the deltaChild.childType is registered here, since we send
|
||||
// parent types before child types, meaning that it's expected on first send of a parent that
|
||||
// there are no subscriptions for the child type.
|
||||
if t.deltaChild != nil {
|
||||
for name := range updates {
|
||||
if children, ok := resourceMap.ChildIndex[t.typeURL][name]; ok {
|
||||
obj.ChildResources = children
|
||||
updates[name] = obj
|
||||
// Capture the relevant child resource names on this pending update so
|
||||
// we can know the linked children if Envoy ever re-subscribes to the parent resource.
|
||||
t.deltaChild.childrenNames[name] = children
|
||||
|
||||
for _, childName := range children {
|
||||
t.ensureChildResend(name, childName)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -848,6 +864,28 @@ func (t *xDSDeltaType) createDeltaResponse(
|
|||
return resp, realUpdates, nil
|
||||
}
|
||||
|
||||
func (t *xDSDeltaType) ensureChildResend(parentName, childName string) {
|
||||
if _, exist := t.deltaChild.childType.resourceVersions[childName]; !exist {
|
||||
return
|
||||
}
|
||||
if !t.subscribed(childName) {
|
||||
return
|
||||
}
|
||||
|
||||
t.generator.Logger.Trace(
|
||||
"triggering implicit update of resource",
|
||||
"typeUrl", t.typeURL,
|
||||
"resource", parentName,
|
||||
"childTypeUrl", t.deltaChild.childType.typeURL,
|
||||
"childResource", childName,
|
||||
)
|
||||
|
||||
// resourceVersions tracks the last known version for this childName that Envoy
|
||||
// has ACKed. By setting this to empty it effectively tells us that Envoy does
|
||||
// not have any data for that child, and we need to re-send.
|
||||
t.deltaChild.childType.resourceVersions[childName] = ""
|
||||
}
|
||||
|
||||
func computeResourceVersions(resourceMap *xdscommon.IndexedResources) (map[string]map[string]string, error) {
|
||||
out := make(map[string]map[string]string)
|
||||
for typeUrl, resources := range resourceMap.Index {
|
||||
|
|
|
@ -95,9 +95,6 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
|
|||
},
|
||||
})
|
||||
|
||||
// It also (in parallel) issues the cluster ACK
|
||||
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)
|
||||
|
||||
// We should get a response immediately since the config is already present in
|
||||
// the server for endpoints. Note that this should not be racy if the server
|
||||
// is behaving well since the Cluster send above should be blocked until we
|
||||
|
@ -112,7 +109,10 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
|
|||
),
|
||||
})
|
||||
|
||||
// And no other response yet
|
||||
// After receiving the endpoints Envoy sends an ACK for the cluster
|
||||
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)
|
||||
|
||||
// We are caught up, so there should be nothing queued to send.
|
||||
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||
|
||||
// Envoy now sends listener request
|
||||
|
@ -132,13 +132,13 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
|
|||
),
|
||||
})
|
||||
|
||||
// And no other response yet
|
||||
// We are caught up, so there should be nothing queued to send.
|
||||
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||
|
||||
// ACKs the listener
|
||||
envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 3)
|
||||
|
||||
// If we re-subscribe to something even if there are no changes we get a
|
||||
// If Envoy re-subscribes to something even if there are no changes we send a
|
||||
// fresh copy.
|
||||
envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
|
||||
ResourceNamesSubscribe: []string{
|
||||
|
@ -156,7 +156,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
|
|||
|
||||
envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 4)
|
||||
|
||||
// And no other response yet
|
||||
// We are caught up, so there should be nothing queued to send.
|
||||
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||
})
|
||||
|
||||
|
@ -174,24 +174,24 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
|
|||
|
||||
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||
|
||||
// now reconfigure the snapshot and JUST edit the endpoints to strike one of the two current endpoints for DB
|
||||
// now reconfigure the snapshot and JUST edit the endpoints to strike one of the two current endpoints for db.
|
||||
snap = newTestSnapshot(t, snap, "")
|
||||
deleteAllButOneEndpoint(snap, UID("db"), "db.default.default.dc1")
|
||||
mgr.DeliverConfig(t, sid, snap)
|
||||
|
||||
// We never send an EDS reply about this change.
|
||||
// We never send an EDS reply about this change because Envoy is not subscribed to db anymore.
|
||||
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||
})
|
||||
|
||||
testutil.RunStep(t, "restore endpoint subscription", func(t *testing.T) {
|
||||
// Fix the snapshot
|
||||
// Restore db's deleted endpoints by generating a new snapshot.
|
||||
snap = newTestSnapshot(t, snap, "")
|
||||
mgr.DeliverConfig(t, sid, snap)
|
||||
|
||||
// We never send an EDS reply about this change.
|
||||
// We never send an EDS reply about this change because Envoy is still not subscribed to db.
|
||||
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||
|
||||
// and fix the subscription
|
||||
// When Envoy re-subscribes to db we send the endpoints for it.
|
||||
envoy.SendDeltaReq(t, xdscommon.EndpointType, &envoy_discovery_v3.DeltaDiscoveryRequest{
|
||||
ResourceNamesSubscribe: []string{
|
||||
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
|
||||
|
@ -215,9 +215,8 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
|
|||
// Force sends to fail
|
||||
envoy.SetSendErr(errors.New("test error"))
|
||||
|
||||
// Trigger only an EDS update (flipping BACK to 2 endpoints in the LBassignment)
|
||||
snap = newTestSnapshot(t, snap, "")
|
||||
mgr.DeliverConfig(t, sid, snap)
|
||||
// Trigger only an EDS update by deleting endpoints again.
|
||||
deleteAllButOneEndpoint(snap, UID("db"), "db.default.default.dc1")
|
||||
|
||||
// We never send any replies about this change because we died.
|
||||
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||
|
@ -267,7 +266,7 @@ func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) {
|
|||
mgr.DeliverConfig(t, sid, snap)
|
||||
})
|
||||
|
||||
testutil.RunStep(t, "first sync", func(t *testing.T) {
|
||||
testutil.RunStep(t, "simulate Envoy NACKing initial listener", func(t *testing.T) {
|
||||
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||
TypeUrl: xdscommon.ClusterType,
|
||||
Nonce: hexString(1),
|
||||
|
@ -286,9 +285,6 @@ func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) {
|
|||
},
|
||||
})
|
||||
|
||||
// It also (in parallel) issues the cluster ACK
|
||||
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)
|
||||
|
||||
// We should get a response immediately since the config is already present in
|
||||
// the server for endpoints. Note that this should not be racy if the server
|
||||
// is behaving well since the Cluster send above should be blocked until we
|
||||
|
@ -302,7 +298,10 @@ func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) {
|
|||
),
|
||||
})
|
||||
|
||||
// And no other response yet
|
||||
// After receiving the endpoints Envoy sends an ACK for the clusters
|
||||
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)
|
||||
|
||||
// We are caught up, so there should be nothing queued to send.
|
||||
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||
|
||||
// Envoy now sends listener request
|
||||
|
@ -323,13 +322,14 @@ func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) {
|
|||
),
|
||||
})
|
||||
|
||||
// And no other response yet
|
||||
// We are caught up, so there should be nothing queued to send.
|
||||
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||
|
||||
// NACKs the listener update due to the bad public listener
|
||||
// Envoy NACKs the listener update due to the bad public listener
|
||||
envoy.SendDeltaReqNACK(t, xdscommon.ListenerType, 3, &rpcstatus.Status{})
|
||||
|
||||
// Consul should not respond until a new snapshot is delivered
|
||||
// because the current snapshot is known to be bad.
|
||||
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||
})
|
||||
|
||||
|
@ -411,9 +411,6 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) {
|
|||
},
|
||||
})
|
||||
|
||||
// It also (in parallel) issues the cluster ACK
|
||||
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)
|
||||
|
||||
// We should get a response immediately since the config is already present in
|
||||
// the server for endpoints. Note that this should not be racy if the server
|
||||
// is behaving well since the Cluster send above should be blocked until we
|
||||
|
@ -427,7 +424,10 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) {
|
|||
),
|
||||
})
|
||||
|
||||
// And no other response yet
|
||||
// After receiving the endpoints Envoy sends an ACK for the clusters
|
||||
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)
|
||||
|
||||
// We are caught up, so there should be nothing queued to send.
|
||||
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||
|
||||
// Envoy now sends listener request
|
||||
|
@ -447,13 +447,13 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) {
|
|||
),
|
||||
})
|
||||
|
||||
// And no other response yet
|
||||
// We are caught up, so there should be nothing queued to send.
|
||||
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||
|
||||
// ACKs the listener
|
||||
envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 3)
|
||||
|
||||
// And no other response yet
|
||||
// We are caught up, so there should be nothing queued to send.
|
||||
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||
})
|
||||
|
||||
|
@ -480,7 +480,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) {
|
|||
),
|
||||
})
|
||||
|
||||
// And no other response yet
|
||||
// We are caught up, so there should be nothing queued to send.
|
||||
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||
|
||||
// Envoy now sends routes request
|
||||
|
@ -490,9 +490,6 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) {
|
|||
},
|
||||
})
|
||||
|
||||
// ACKs the listener
|
||||
envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 4)
|
||||
|
||||
// And should get a response immediately.
|
||||
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||
TypeUrl: xdscommon.RouteType,
|
||||
|
@ -502,6 +499,8 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) {
|
|||
),
|
||||
})
|
||||
|
||||
// After receiving the routes, Envoy sends acks back for the listener and routes.
|
||||
envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 4)
|
||||
envoy.SendDeltaReqACK(t, xdscommon.RouteType, 5)
|
||||
|
||||
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||
|
@ -580,9 +579,6 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T)
|
|||
},
|
||||
})
|
||||
|
||||
// It also (in parallel) issues the cluster ACK
|
||||
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)
|
||||
|
||||
// We should get a response immediately since the config is already present in
|
||||
// the server for endpoints. Note that this should not be racy if the server
|
||||
// is behaving well since the Cluster send above should be blocked until we
|
||||
|
@ -598,7 +594,14 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T)
|
|||
),
|
||||
})
|
||||
|
||||
// And no other response yet
|
||||
// After receiving the endpoints Envoy sends an ACK for the clusters.
|
||||
// Envoy aims to wait to receive endpoints before ACKing clusters,
|
||||
// but because it received an update for at least one of the clusters it cares about
|
||||
// then it will ACK despite not having received an update for all clusters.
|
||||
// This behavior was observed against Envoy v1.21 and v1.23.
|
||||
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)
|
||||
|
||||
// We are caught up, so there should be nothing queued to send.
|
||||
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||
|
||||
// Envoy now sends listener request
|
||||
|
@ -618,7 +621,7 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T)
|
|||
),
|
||||
})
|
||||
|
||||
// And no other response yet
|
||||
// We are caught up, so there should be nothing queued to send.
|
||||
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||
|
||||
// ACKs the listener
|
||||
|
@ -642,7 +645,7 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T)
|
|||
),
|
||||
})
|
||||
|
||||
// And no other response yet
|
||||
// We are caught up, so there should be nothing queued to send.
|
||||
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||
|
||||
// It also (in parallel) issues the endpoint ACK
|
||||
|
@ -705,9 +708,6 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa
|
|||
},
|
||||
})
|
||||
|
||||
// It also (in parallel) issues the cluster ACK
|
||||
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)
|
||||
|
||||
// We should get a response immediately since the config is already present in
|
||||
// the server for endpoints. Note that this should not be racy if the server
|
||||
// is behaving well since the Cluster send above should be blocked until we
|
||||
|
@ -721,7 +721,10 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa
|
|||
),
|
||||
})
|
||||
|
||||
// And no other response yet
|
||||
// After receiving the endpoints Envoy sends an ACK for the clusters
|
||||
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)
|
||||
|
||||
// We are caught up, so there should be nothing queued to send.
|
||||
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||
|
||||
// Envoy now sends listener request
|
||||
|
@ -741,7 +744,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa
|
|||
),
|
||||
})
|
||||
|
||||
// And no other response yet
|
||||
// We are caught up, so there should be nothing queued to send.
|
||||
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||
|
||||
// ACKs the listener
|
||||
|
@ -768,8 +771,6 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa
|
|||
),
|
||||
})
|
||||
|
||||
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 4)
|
||||
|
||||
// And we re-send the endpoints for the updated cluster after getting the
|
||||
// ACK for the cluster.
|
||||
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||
|
@ -780,9 +781,12 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa
|
|||
// SAME makeTestEndpoints(t, snap, "tcp:geo-cache"),
|
||||
),
|
||||
})
|
||||
|
||||
// Envoy then ACK's the clusters and the endpoints.
|
||||
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 4)
|
||||
envoy.SendDeltaReqACK(t, xdscommon.EndpointType, 5)
|
||||
|
||||
// And no other response yet
|
||||
// We are caught up, so there should be nothing queued to send.
|
||||
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||
})
|
||||
|
||||
|
@ -847,9 +851,6 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan
|
|||
},
|
||||
})
|
||||
|
||||
// It also (in parallel) issues the cluster ACK
|
||||
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)
|
||||
|
||||
// We should get a response immediately since the config is already present in
|
||||
// the server for endpoints. Note that this should not be racy if the server
|
||||
// is behaving well since the Cluster send above should be blocked until we
|
||||
|
@ -863,7 +864,10 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan
|
|||
),
|
||||
})
|
||||
|
||||
// And no other response yet
|
||||
// After receiving the endpoints Envoy sends an ACK for the clusters
|
||||
envoy.SendDeltaReqACK(t, xdscommon.ClusterType, 1)
|
||||
|
||||
// We are caught up, so there should be nothing queued to send.
|
||||
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||
|
||||
// Envoy now sends listener request
|
||||
|
@ -883,7 +887,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan
|
|||
),
|
||||
})
|
||||
|
||||
// And no other response yet
|
||||
// We are caught up, so there should be nothing queued to send.
|
||||
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||
|
||||
// Envoy now sends routes request
|
||||
|
@ -893,9 +897,6 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan
|
|||
},
|
||||
})
|
||||
|
||||
// ACKs the listener
|
||||
envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 3)
|
||||
|
||||
// And should get a response immediately.
|
||||
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||
TypeUrl: xdscommon.RouteType,
|
||||
|
@ -905,6 +906,8 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan
|
|||
),
|
||||
})
|
||||
|
||||
// After receiving the routes, Envoy sends acks back for the listener and routes.
|
||||
envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 3)
|
||||
envoy.SendDeltaReqACK(t, xdscommon.RouteType, 4)
|
||||
|
||||
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||
|
@ -960,9 +963,6 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan
|
|||
),
|
||||
})
|
||||
|
||||
// ACKs the listener
|
||||
envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 7)
|
||||
|
||||
// THE ACTUAL THING WE CARE ABOUT: replaced route config
|
||||
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
|
||||
TypeUrl: xdscommon.RouteType,
|
||||
|
@ -972,6 +972,8 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan
|
|||
),
|
||||
})
|
||||
|
||||
// After receiving the routes, Envoy sends acks back for the listener and routes.
|
||||
envoy.SendDeltaReqACK(t, xdscommon.ListenerType, 7)
|
||||
envoy.SendDeltaReqACK(t, xdscommon.RouteType, 8)
|
||||
|
||||
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)
|
||||
|
|
|
@ -650,10 +650,6 @@ func (s *ResourceGenerator) endpointsFromDiscoveryChain(
|
|||
primaryTargetID := node.Resolver.Target
|
||||
failover := node.Resolver.Failover
|
||||
|
||||
type targetLoadAssignmentOption struct {
|
||||
targetID string
|
||||
clusterName string
|
||||
}
|
||||
var targetsClustersData []targetClusterData
|
||||
|
||||
var numFailoverTargets int
|
||||
|
|
|
@ -11,7 +11,7 @@ import (
|
|||
|
||||
const (
|
||||
envoyEnvKey = "ENVOY_VERSION"
|
||||
envoyLogLevel = "info"
|
||||
envoyLogLevel = "debug"
|
||||
envoyVersion = "1.23.1"
|
||||
|
||||
hashicorpDockerProxy = "docker.mirror.hashicorp.services"
|
||||
|
|
|
@ -64,7 +64,7 @@ func (c ConnectContainer) Terminate() error {
|
|||
return err
|
||||
}
|
||||
|
||||
func NewConnectService(ctx context.Context, name string, serviceName string, serviceBindPort int, node libnode.Agent) (Service, error) {
|
||||
func NewConnectService(ctx context.Context, name string, serviceName string, serviceBindPort int, node libnode.Agent) (*ConnectContainer, error) {
|
||||
namePrefix := fmt.Sprintf("%s-service-connect-%s", node.GetDatacenter(), name)
|
||||
containerName := utils.RandName(namePrefix)
|
||||
|
||||
|
|
|
@ -41,10 +41,12 @@ func CreateAndRegisterStaticServerAndSidecar(node libnode.Agent) (Service, Servi
|
|||
Name: "Connect Sidecar Listening",
|
||||
TCP: fmt.Sprintf("%s:%d", serverConnectProxyIP, 20000),
|
||||
Interval: "10s",
|
||||
Status: api.HealthPassing,
|
||||
},
|
||||
&api.AgentServiceCheck{
|
||||
Name: "Connect Sidecar Aliasing Static Server",
|
||||
AliasService: "static-server",
|
||||
Status: api.HealthPassing,
|
||||
},
|
||||
},
|
||||
Proxy: &api.AgentServiceConnectProxyConfig{
|
||||
|
@ -55,9 +57,10 @@ func CreateAndRegisterStaticServerAndSidecar(node libnode.Agent) (Service, Servi
|
|||
},
|
||||
},
|
||||
Check: &api.AgentServiceCheck{
|
||||
Name: "Connect Sidecar Listening",
|
||||
Name: "Static Server Listening",
|
||||
TCP: fmt.Sprintf("%s:%d", serverServiceIP, 8080),
|
||||
Interval: "10s",
|
||||
Status: api.HealthPassing,
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -69,7 +72,7 @@ func CreateAndRegisterStaticServerAndSidecar(node libnode.Agent) (Service, Servi
|
|||
return serverService, serverConnectProxy, nil
|
||||
}
|
||||
|
||||
func CreateAndRegisterStaticClientSidecar(node libnode.Agent, peerName string, localMeshGateway bool) (Service, error) {
|
||||
func CreateAndRegisterStaticClientSidecar(node libnode.Agent, peerName string, localMeshGateway bool) (*ConnectContainer, error) {
|
||||
// Create a service and proxy instance
|
||||
clientConnectProxy, err := NewConnectService(context.Background(), "static-client-sidecar", "static-client", 5000, node)
|
||||
if err != nil {
|
||||
|
@ -97,6 +100,7 @@ func CreateAndRegisterStaticClientSidecar(node libnode.Agent, peerName string, l
|
|||
Name: "Connect Sidecar Listening",
|
||||
TCP: fmt.Sprintf("%s:%d", clientConnectProxyIP, 20000),
|
||||
Interval: "10s",
|
||||
Status: api.HealthPassing,
|
||||
},
|
||||
},
|
||||
Proxy: &api.AgentServiceConnectProxyConfig{
|
||||
|
|
Loading…
Reference in New Issue