consul/agent/xds/delta.go

1119 lines
36 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package xds
import (
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/armon/go-metrics"
envoy_config_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
envoy_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/hashicorp/go-hclog"
goversion "github.com/hashicorp/go-version"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"github.com/hashicorp/consul/agent/envoyextensions"
external "github.com/hashicorp/consul/agent/grpc-external"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/xds/extensionruntime"
"github.com/hashicorp/consul/envoyextensions/extensioncommon"
"github.com/hashicorp/consul/envoyextensions/xdscommon"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/version"
)
var errOverwhelmed = status.Error(codes.ResourceExhausted, "this server has too many xDS streams open, please try another")
type deltaRecvResponse int
const (
deltaRecvResponseNack deltaRecvResponse = iota
deltaRecvResponseAck
deltaRecvNewSubscription
deltaRecvUnknownType
)
// ADSDeltaStream is a shorter way of referring to this thing...
type ADSDeltaStream = envoy_discovery_v3.AggregatedDiscoveryService_DeltaAggregatedResourcesServer
// DeltaAggregatedResources implements envoy_discovery_v3.AggregatedDiscoveryServiceServer
func (s *Server) DeltaAggregatedResources(stream ADSDeltaStream) error {
defer s.activeStreams.Increment(stream.Context())()
// a channel for receiving incoming requests
reqCh := make(chan *envoy_discovery_v3.DeltaDiscoveryRequest)
reqStop := int32(0)
go func() {
for {
req, err := stream.Recv()
if atomic.LoadInt32(&reqStop) != 0 {
return
}
if err != nil {
s.Logger.Error("Error receiving new DeltaDiscoveryRequest; closing request channel", "error", err)
close(reqCh)
return
}
reqCh <- req
}
}()
err := s.processDelta(stream, reqCh)
if err != nil {
s.Logger.Error("Error handling ADS delta stream", "xdsVersion", "v3", "error", err)
}
// prevents writing to a closed channel if send failed on blocked recv
atomic.StoreInt32(&reqStop, 1)
return err
}
const (
stateDeltaInit int = iota
stateDeltaPendingInitialConfig
stateDeltaRunning
)
func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discovery_v3.DeltaDiscoveryRequest) error {
// Handle invalid ACL tokens up-front.
if _, err := s.authenticate(stream.Context()); err != nil {
return err
}
// Loop state
var (
cfgSnap *proxycfg.ConfigSnapshot
node *envoy_config_core_v3.Node
stateCh <-chan *proxycfg.ConfigSnapshot
drainCh limiter.SessionTerminatedChan
watchCancel func()
proxyID structs.ServiceID
nonce uint64 // xDS requires a unique nonce to correlate response/request pairs
ready bool // set to true after the first snapshot arrives
streamStartTime = time.Now()
streamStartOnce sync.Once
)
var (
// resourceMap is the SoTW we are incrementally attempting to sync to envoy.
//
// type => name => proto
resourceMap = xdscommon.EmptyIndexedResources()
// currentVersions is the xDS versioning represented by Resources.
//
// type => name => version (as consul knows right now)
currentVersions = make(map[string]map[string]string)
)
generator := NewResourceGenerator(
s.Logger.Named(logging.XDS).With("xdsVersion", "v3"),
s.CfgFetcher,
true,
)
// need to run a small state machine to get through initial authentication.
var state = stateDeltaInit
// Configure handlers for each type of request we currently care about.
handlers := map[string]*xDSDeltaType{
xdscommon.ListenerType: newDeltaType(generator, stream, xdscommon.ListenerType, func(kind structs.ServiceKind) bool {
// Ingress and API gateways are allowed to inform LDS of no listeners.
return cfgSnap.Kind == structs.ServiceKindIngressGateway ||
cfgSnap.Kind == structs.ServiceKindAPIGateway
}),
xdscommon.RouteType: newDeltaType(generator, stream, xdscommon.RouteType, func(kind structs.ServiceKind) bool {
// Ingress and API gateways are allowed to inform RDS of no routes.
return cfgSnap.Kind == structs.ServiceKindIngressGateway ||
cfgSnap.Kind == structs.ServiceKindAPIGateway
}),
xdscommon.ClusterType: newDeltaType(generator, stream, xdscommon.ClusterType, func(kind structs.ServiceKind) bool {
// Mesh, Ingress, API and Terminating gateways are allowed to inform CDS of no clusters.
return cfgSnap.Kind == structs.ServiceKindMeshGateway ||
cfgSnap.Kind == structs.ServiceKindTerminatingGateway ||
cfgSnap.Kind == structs.ServiceKindIngressGateway ||
cfgSnap.Kind == structs.ServiceKindAPIGateway
}),
xdscommon.EndpointType: newDeltaType(generator, stream, xdscommon.EndpointType, nil),
xdscommon.SecretType: newDeltaType(generator, stream, xdscommon.SecretType, nil), // TODO allowEmptyFn
}
// Endpoints are stored within a Cluster (and Routes
// are stored within a Listener) so whenever the
// enclosing resource is updated the inner resource
// list is cleared implicitly.
//
// When this happens we should update our local
// representation of envoy state to force an update.
//
// see: https://github.com/envoyproxy/envoy/issues/13009
handlers[xdscommon.ListenerType].deltaChild = &xDSDeltaChild{
childType: handlers[xdscommon.RouteType],
childrenNames: make(map[string][]string),
}
handlers[xdscommon.ClusterType].deltaChild = &xDSDeltaChild{
childType: handlers[xdscommon.EndpointType],
childrenNames: make(map[string][]string),
}
var authTimer <-chan time.Time
extendAuthTimer := func() {
authTimer = time.After(s.AuthCheckFrequency)
}
checkStreamACLs := func(cfgSnap *proxycfg.ConfigSnapshot) error {
return s.authorize(stream.Context(), cfgSnap)
}
for {
select {
case <-drainCh:
generator.Logger.Debug("draining stream to rebalance load")
metrics.IncrCounter([]string{"xds", "server", "streamDrained"}, 1)
return errOverwhelmed
case <-authTimer:
// It's been too long since a Discovery{Request,Response} so recheck ACLs.
if err := checkStreamACLs(cfgSnap); err != nil {
return err
}
extendAuthTimer()
case req, ok := <-reqCh:
if !ok {
// reqCh is closed when stream.Recv errors which is how we detect client
// going away. AFAICT the stream.Context() is only canceled once the
// RPC method returns which it can't until we return from this one so
// there's no point in blocking on that.
return nil
}
generator.logTraceRequest("Incremental xDS v3", req)
if req.TypeUrl == "" {
return status.Errorf(codes.InvalidArgument, "type URL is required for ADS")
}
if node == nil && req.Node != nil {
node = req.Node
var err error
generator.ProxyFeatures, err = xdscommon.DetermineSupportedProxyFeatures(req.Node)
if err != nil {
return status.Errorf(codes.InvalidArgument, err.Error())
}
}
if handler, ok := handlers[req.TypeUrl]; ok {
switch handler.Recv(req, generator.ProxyFeatures) {
case deltaRecvNewSubscription:
generator.Logger.Trace("subscribing to type", "typeUrl", req.TypeUrl)
case deltaRecvResponseNack:
generator.Logger.Trace("got nack response for type", "typeUrl", req.TypeUrl)
// There is no reason to believe that generating new xDS resources from the same snapshot
// would lead to an ACK from Envoy. Instead we continue to the top of this for loop and wait
// for a new request or snapshot.
continue
}
}
case cs, ok := <-stateCh:
if !ok {
// stateCh is closed either when *we* cancel the watch (on-exit via defer)
// or by the proxycfg.Manager when an irrecoverable error is encountered
// such as the ACL token getting deleted.
//
// We know for sure that this is the latter case, because in the former we
// would've already exited this loop.
return status.Error(codes.Aborted, "xDS stream terminated due to an irrecoverable error, please try again")
}
cfgSnap = cs
newRes, err := generator.AllResourcesFromSnapshot(cfgSnap)
if err != nil {
return status.Errorf(codes.Unavailable, "failed to generate all xDS resources from the snapshot: %v", err)
}
// index and hash the xDS structures
newResourceMap := xdscommon.IndexResources(generator.Logger, newRes)
if s.ResourceMapMutateFn != nil {
s.ResourceMapMutateFn(newResourceMap)
}
if newResourceMap, err = s.applyEnvoyExtensions(newResourceMap, cfgSnap, node); err != nil {
// err is already the result of calling status.Errorf
return err
}
if err := populateChildIndexMap(newResourceMap); err != nil {
return status.Errorf(codes.Unavailable, "failed to index xDS resource versions: %v", err)
}
newVersions, err := computeResourceVersions(newResourceMap)
if err != nil {
return status.Errorf(codes.Unavailable, "failed to compute xDS resource versions: %v", err)
}
resourceMap = newResourceMap
currentVersions = newVersions
ready = true
}
// Trigger state machine
switch state {
case stateDeltaInit:
if node == nil {
// This can't happen (tm) since stateCh is nil until after the first req
// is received but lets not panic about it.
continue
}
nodeName := node.GetMetadata().GetFields()["node_name"].GetStringValue()
if nodeName == "" {
nodeName = s.NodeName
}
// Start authentication process, we need the proxyID
proxyID = structs.NewServiceID(node.Id, parseEnterpriseMeta(node))
// Start watching config for that proxy
var err error
options, err := external.QueryOptionsFromContext(stream.Context())
if err != nil {
return status.Errorf(codes.Internal, "failed to watch proxy service: %s", err)
}
stateCh, drainCh, watchCancel, err = s.CfgSrc.Watch(proxyID, nodeName, options.Token)
switch {
case errors.Is(err, limiter.ErrCapacityReached):
return errOverwhelmed
case err != nil:
return status.Errorf(codes.Internal, "failed to watch proxy service: %s", err)
}
// Note that in this case we _intend_ the defer to only be triggered when
// this whole process method ends (i.e. when streaming RPC aborts) not at
// the end of the current loop iteration. We have to do it in the loop
// here since we can't start watching until we get to this state in the
// state machine.
defer watchCancel()
generator.Logger = generator.Logger.With("service_id", proxyID.String()) // enhance future logs
generator.Logger.Trace("watching proxy, pending initial proxycfg snapshot for xDS")
// Now wait for the config so we can check ACL
state = stateDeltaPendingInitialConfig
case stateDeltaPendingInitialConfig:
if cfgSnap == nil {
// Nothing we can do until we get the initial config
continue
}
// Got config, try to authenticate next.
state = stateDeltaRunning
// Upgrade the logger
switch cfgSnap.Kind {
case structs.ServiceKindConnectProxy:
case structs.ServiceKindTerminatingGateway:
generator.Logger = generator.Logger.Named(logging.TerminatingGateway)
case structs.ServiceKindMeshGateway:
generator.Logger = generator.Logger.Named(logging.MeshGateway)
case structs.ServiceKindIngressGateway:
generator.Logger = generator.Logger.Named(logging.IngressGateway)
}
generator.Logger.Trace("Got initial config snapshot")
// Let's actually process the config we just got, or we'll miss responding
fallthrough
case stateDeltaRunning:
// Check ACLs on every Discovery{Request,Response}.
if err := checkStreamACLs(cfgSnap); err != nil {
return err
}
// For the first time through the state machine, this is when the
// timer is first started.
extendAuthTimer()
if !ready {
generator.Logger.Trace("Skipping delta computation because we haven't gotten a snapshot yet")
continue
}
generator.Logger.Trace("Invoking all xDS resource handlers and sending changed data if there are any")
streamStartOnce.Do(func() {
metrics.MeasureSince([]string{"xds", "server", "streamStart"}, streamStartTime)
})
for _, op := range xDSUpdateOrder {
if op.TypeUrl == xdscommon.ListenerType || op.TypeUrl == xdscommon.RouteType {
if clusterHandler := handlers[xdscommon.ClusterType]; clusterHandler.registered && len(clusterHandler.pendingUpdates) > 0 {
generator.Logger.Trace("Skipping delta computation for resource because there are dependent updates pending",
"typeUrl", op.TypeUrl, "dependent", xdscommon.ClusterType)
// Receiving an ACK from Envoy will unblock the select statement above,
// and re-trigger an attempt to send these skipped updates.
break
}
if endpointHandler := handlers[xdscommon.EndpointType]; endpointHandler.registered && len(endpointHandler.pendingUpdates) > 0 {
generator.Logger.Trace("Skipping delta computation for resource because there are dependent updates pending",
"typeUrl", op.TypeUrl, "dependent", xdscommon.EndpointType)
// Receiving an ACK from Envoy will unblock the select statement above,
// and re-trigger an attempt to send these skipped updates.
break
}
}
err, _ := handlers[op.TypeUrl].SendIfNew(
cfgSnap.Kind,
currentVersions[op.TypeUrl],
resourceMap,
&nonce,
op.Upsert,
op.Remove,
)
if err != nil {
return status.Errorf(codes.Unavailable,
"failed to send %sreply for type %q: %v",
op.errorLogNameReplyPrefix(),
op.TypeUrl, err)
}
}
}
}
}
func (s *Server) applyEnvoyExtensions(resources *xdscommon.IndexedResources, cfgSnap *proxycfg.ConfigSnapshot, node *envoy_config_core_v3.Node) (*xdscommon.IndexedResources, error) {
var err error
envoyVersion := xdscommon.DetermineEnvoyVersionFromNode(node)
consulVersion, err := goversion.NewVersion(version.Version)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "failed to parse Consul version")
}
serviceConfigs := extensionruntime.GetRuntimeConfigurations(cfgSnap)
for _, cfgs := range serviceConfigs {
for _, cfg := range cfgs {
resources, err = validateAndApplyEnvoyExtension(s.Logger, cfgSnap, resources, cfg, envoyVersion, consulVersion)
if err != nil {
return nil, err
}
}
}
return resources, nil
}
func validateAndApplyEnvoyExtension(logger hclog.Logger, cfgSnap *proxycfg.ConfigSnapshot, resources *xdscommon.IndexedResources, runtimeConfig extensioncommon.RuntimeConfig, envoyVersion, consulVersion *goversion.Version) (*xdscommon.IndexedResources, error) {
logFn := logger.Warn
if runtimeConfig.EnvoyExtension.Required {
logFn = logger.Error
}
svc := runtimeConfig.ServiceName
errorParams := []interface{}{
"extension", runtimeConfig.EnvoyExtension.Name,
"service", svc.Name,
"namespace", svc.Namespace,
"partition", svc.Partition,
}
getMetricLabels := func(err error) []metrics.Label {
return []metrics.Label{
{Name: "extension", Value: runtimeConfig.EnvoyExtension.Name},
{Name: "version", Value: "builtin/" + version.Version},
{Name: "service", Value: cfgSnap.Service},
{Name: "partition", Value: cfgSnap.ProxyID.PartitionOrDefault()},
{Name: "namespace", Value: cfgSnap.ProxyID.NamespaceOrDefault()},
{Name: "error", Value: strconv.FormatBool(err != nil)},
}
}
ext := runtimeConfig.EnvoyExtension
if v := ext.EnvoyVersion; v != "" {
c, err := goversion.NewConstraint(v)
if err != nil {
logFn("failed to parse Envoy extension version constraint", errorParams...)
if ext.Required {
return nil, status.Errorf(codes.InvalidArgument, "failed to parse Envoy version constraint for extension %q for service %q", ext.Name, svc.Name)
}
return resources, nil
}
if !c.Check(envoyVersion) {
logger.Info("skipping envoy extension due to Envoy version constraint violation", errorParams...)
return resources, nil
}
}
if v := ext.ConsulVersion; v != "" {
c, err := goversion.NewConstraint(v)
if err != nil {
logFn("failed to parse Consul extension version constraint", errorParams...)
if ext.Required {
return nil, status.Errorf(codes.InvalidArgument, "failed to parse Consul version constraint for extension %q for service %q", ext.Name, svc.Name)
}
return resources, nil
}
if !c.Check(consulVersion) {
logger.Info("skipping envoy extension due to Consul version constraint violation", errorParams...)
return resources, nil
}
}
now := time.Now()
extender, err := envoyextensions.ConstructExtension(ext)
metrics.MeasureSinceWithLabels([]string{"envoy_extension", "validate_arguments"}, now, getMetricLabels(err))
if err != nil {
errorParams = append(errorParams, "error", err)
logFn("failed to construct extension", errorParams...)
if ext.Required {
return nil, status.Errorf(codes.InvalidArgument, "failed to construct extension %q for service %q", ext.Name, svc.Name)
}
return resources, nil
}
now = time.Now()
err = extender.Validate(&runtimeConfig)
metrics.MeasureSinceWithLabels([]string{"envoy_extension", "validate"}, now, getMetricLabels(err))
if err != nil {
errorParams = append(errorParams, "error", err)
logFn("failed to validate extension arguments", errorParams...)
if ext.Required {
return nil, status.Errorf(codes.InvalidArgument, "failed to validate arguments for extension %q for service %q", ext.Name, svc.Name)
}
return resources, nil
}
now = time.Now()
resources, err = applyEnvoyExtension(extender, resources, &runtimeConfig)
metrics.MeasureSinceWithLabels([]string{"envoy_extension", "extend"}, now, getMetricLabels(err))
if err != nil {
errorParams = append(errorParams, "error", err)
logFn("failed to apply envoy extension", errorParams...)
if ext.Required {
return nil, status.Errorf(codes.InvalidArgument, "failed to patch xDS resources in the %q extension: %v", ext.Name, err)
}
}
return resources, nil
}
// applyEnvoyExtension safely checks whether an extension can be applied, and if so attempts to apply it.
//
// applyEnvoyExtension makes a copy of the provided IndexedResources, then applies the given extension to them.
// The copy ensures against partial application if a non-required extension modifies a resource then fails at a later
// stage; this is necessary because IndexedResources and its proto messages are all passed by reference, and
// non-required extensions do not lead to a terminal failure in xDS updates.
//
// If the application is successful, the modified copy is returned. If not, the original and an error is returned.
// Returning resources in either case allows for applying extensions in a loop and reporting on non-required extension
// failures simultaneously.
func applyEnvoyExtension(extender extensioncommon.EnvoyExtender, resources *xdscommon.IndexedResources, runtimeConfig *extensioncommon.RuntimeConfig) (r *xdscommon.IndexedResources, e error) {
// Don't panic due to an extension misbehaving.
defer func() {
if err := recover(); err != nil {
r = resources
e = fmt.Errorf("attempt to apply Envoy extension %q caused an unexpected panic: %v",
runtimeConfig.EnvoyExtension.Name, err)
}
}()
// First check whether the extension is eligible for application in the current environment.
// Do this before copying indexed resources for the sake of efficiency.
if !extender.CanApply(runtimeConfig) {
return resources, nil
}
newResources, err := extender.Extend(xdscommon.Clone(resources), runtimeConfig)
if err != nil {
return resources, err
}
return newResources, nil
}
// https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#eventual-consistency-considerations
var xDSUpdateOrder = []xDSUpdateOperation{
// 1. SDS updates (if any) can be pushed here with no harm.
{TypeUrl: xdscommon.SecretType, Upsert: true},
// 2. CDS updates (if any) must always be pushed before the following types.
{TypeUrl: xdscommon.ClusterType, Upsert: true},
// 3. EDS updates (if any) must arrive after CDS updates for the respective clusters.
{TypeUrl: xdscommon.EndpointType, Upsert: true},
// 4. LDS updates must arrive after corresponding CDS/EDS updates.
{TypeUrl: xdscommon.ListenerType, Upsert: true, Remove: true},
// 5. RDS updates related to the newly added listeners must arrive after CDS/EDS/LDS updates.
{TypeUrl: xdscommon.RouteType, Upsert: true, Remove: true},
// 6. (NOT IMPLEMENTED YET IN CONSUL) VHDS updates (if any) related to the newly added RouteConfigurations must arrive after RDS updates.
// {},
// 7. Stale CDS clusters, related EDS endpoints (ones no longer being referenced) and SDS secrets can then be removed.
{TypeUrl: xdscommon.ClusterType, Remove: true},
{TypeUrl: xdscommon.EndpointType, Remove: true},
{TypeUrl: xdscommon.SecretType, Remove: true},
// xDS updates can be pushed independently if no new
// clusters/routes/listeners are added or if its acceptable to
// temporarily drop traffic during updates. Note that in case of
// LDS updates, the listeners will be warmed before they receive
// traffic, i.e. the dependent routes are fetched through RDS if
// configured. Clusters are warmed when adding/removing/updating
// clusters. On the other hand, routes are not warmed, i.e., the
// management plane must ensure that clusters referenced by a route
// are in place, before pushing the updates for a route.
}
type xDSUpdateOperation struct {
TypeUrl string
Upsert bool
Remove bool
}
func (op *xDSUpdateOperation) errorLogNameReplyPrefix() string {
switch {
case op.Upsert && op.Remove:
return "upsert/remove "
case op.Upsert:
return "upsert "
case op.Remove:
return "remove "
default:
return ""
}
}
type xDSDeltaChild struct {
// childType is a type that in Envoy is actually stored within this type.
// Upserts of THIS type should potentially trigger dependent named
// resources within the child to be re-configured.
childType *xDSDeltaType
// childrenNames is map of parent resource names to a list of associated child resource
// names.
childrenNames map[string][]string
}
type xDSDeltaType struct {
generator *ResourceGenerator
stream ADSDeltaStream
typeURL string
allowEmptyFn func(kind structs.ServiceKind) bool
// deltaChild contains data for an xDS child type if there is one.
// For example, endpoints are a child type of clusters.
deltaChild *xDSDeltaChild
// registered indicates if this type has been requested at least once by
// the proxy
registered bool
// wildcard indicates that this type was requested with no preference for
// specific resource names. subscribe/unsubscribe are ignored.
wildcard bool
// sentToEnvoyOnce is true after we've sent one response to envoy.
sentToEnvoyOnce bool
// subscriptions is the set of currently subscribed envoy resources.
// If wildcard == true, this will be empty.
subscriptions map[string]struct{}
// resourceVersions is the current view of CONFIRMED/ACKed updates to
// envoy's view of the loaded resources.
//
// name => version
resourceVersions map[string]string
// pendingUpdates is a set of un-ACKed updates to the 'resourceVersions'
// map. Once we get an ACK from envoy we'll update the resourceVersions map
// and strike the entry from this map.
//
// nonce -> name -> {version}
pendingUpdates map[string]map[string]PendingUpdate
}
func (t *xDSDeltaType) subscribed(name string) bool {
if t.wildcard {
return true
}
_, subscribed := t.subscriptions[name]
return subscribed
}
type PendingUpdate struct {
Remove bool
Version string
}
func newDeltaType(
generator *ResourceGenerator,
stream ADSDeltaStream,
typeUrl string,
allowEmptyFn func(kind structs.ServiceKind) bool,
) *xDSDeltaType {
return &xDSDeltaType{
generator: generator,
stream: stream,
typeURL: typeUrl,
allowEmptyFn: allowEmptyFn,
subscriptions: make(map[string]struct{}),
resourceVersions: make(map[string]string),
pendingUpdates: make(map[string]map[string]PendingUpdate),
}
}
// Recv handles new discovery requests from envoy.
//
// Returns true the first time a type receives a request.
func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest, sf xdscommon.SupportedProxyFeatures) deltaRecvResponse {
if t == nil {
return deltaRecvUnknownType // not something we care about
}
logger := t.generator.Logger.With("typeUrl", t.typeURL)
registeredThisTime := false
if !t.registered {
// We are in the wildcard mode if the first request of a particular
// type has empty subscription list
t.wildcard = len(req.ResourceNamesSubscribe) == 0
t.registered = true
registeredThisTime = true
}
/*
DeltaDiscoveryRequest can be sent in the following situations:
Initial message in a xDS bidirectional gRPC stream.
As an ACK or NACK response to a previous DeltaDiscoveryResponse. In
this case the response_nonce is set to the nonce value in the Response.
ACK or NACK is determined by the absence or presence of error_detail.
Spontaneous DeltaDiscoveryRequests from the client. This can be done to
dynamically add or remove elements from the tracked resource_names set.
In this case response_nonce must be omitted.
*/
/*
DeltaDiscoveryRequest plays two independent roles. Any
DeltaDiscoveryRequest can be either or both of:
*/
if req.ResponseNonce != "" {
/*
[2] (N)ACKing an earlier resource update from the server (using
response_nonce, with presence of error_detail making it a NACK).
*/
if req.ErrorDetail == nil {
logger.Trace("got ok response from envoy proxy", "nonce", req.ResponseNonce)
t.ack(req.ResponseNonce)
} else {
logger.Error("got error response from envoy proxy", "nonce", req.ResponseNonce,
"error", status.ErrorProto(req.ErrorDetail))
t.nack(req.ResponseNonce)
return deltaRecvResponseNack
}
}
if registeredThisTime && len(req.InitialResourceVersions) > 0 {
/*
Additionally, the first message (for a given type_url) of a
reconnected gRPC stream has a third role:
[3] informing the server of the resources (and their versions) that
the client already possesses, using the initial_resource_versions
field.
*/
logger.Trace("setting initial resource versions for stream",
"resources", req.InitialResourceVersions)
t.resourceVersions = req.InitialResourceVersions
if !t.wildcard {
for k := range req.InitialResourceVersions {
t.subscriptions[k] = struct{}{}
}
}
}
if !t.wildcard {
/*
[1] informing the server of what resources the client has
gained/lost interest in (using resource_names_subscribe and
resource_names_unsubscribe), or
*/
for _, name := range req.ResourceNamesSubscribe {
// A resource_names_subscribe field may contain resource names that
// the server believes the client is already subscribed to, and
// furthermore has the most recent versions of. However, the server
// must still provide those resources in the response; due to
// implementation details hidden from the server, the client may
// have “forgotten” those resources despite apparently remaining
// subscribed.
//
// NOTE: the server must respond with all resources listed in
// resource_names_subscribe, even if it believes the client has the
// most recent version of them. The reason: the client may have
// dropped them, but then regained interest before it had a chance
// to send the unsubscribe message.
//
// We handle that here by ALWAYS wiping the version so the diff
// decides to send the value.
_, alreadySubscribed := t.subscriptions[name]
t.subscriptions[name] = struct{}{}
// Reset the tracked version so we force a reply.
if _, alreadyTracked := t.resourceVersions[name]; alreadyTracked {
t.resourceVersions[name] = ""
}
// Certain xDS types are children of other types, meaning that if Envoy subscribes to a parent.
// We MUST assume that if Envoy ever had data for the children of this parent, then the child's
// data is gone.
if t.deltaChild != nil && t.deltaChild.childType.registered {
for _, childName := range t.deltaChild.childrenNames[name] {
t.ensureChildResend(name, childName)
}
}
if alreadySubscribed {
logger.Trace("re-subscribing resource for stream", "resource", name)
} else {
logger.Trace("subscribing resource for stream", "resource", name)
}
}
for _, name := range req.ResourceNamesUnsubscribe {
if _, ok := t.subscriptions[name]; !ok {
continue
}
delete(t.subscriptions, name)
logger.Trace("unsubscribing resource for stream", "resource", name)
// NOTE: we'll let the normal differential comparison handle cleaning up resourceVersions
}
}
if registeredThisTime {
return deltaRecvNewSubscription
}
return deltaRecvResponseAck
}
func (t *xDSDeltaType) ack(nonce string) {
pending, ok := t.pendingUpdates[nonce]
if !ok {
return
}
for name, obj := range pending {
if obj.Remove {
delete(t.resourceVersions, name)
continue
}
t.resourceVersions[name] = obj.Version
}
t.sentToEnvoyOnce = true
delete(t.pendingUpdates, nonce)
}
func (t *xDSDeltaType) nack(nonce string) {
delete(t.pendingUpdates, nonce)
}
func (t *xDSDeltaType) SendIfNew(
kind structs.ServiceKind,
currentVersions map[string]string, // type => name => version (as consul knows right now)
resourceMap *xdscommon.IndexedResources,
nonce *uint64,
upsert, remove bool,
) (error, bool) {
if t == nil || !t.registered {
return nil, false
}
// Wait for Envoy to catch up with this delta type before sending something new.
if len(t.pendingUpdates) > 0 {
return nil, false
}
logger := t.generator.Logger.With("typeUrl", t.typeURL)
allowEmpty := t.allowEmptyFn != nil && t.allowEmptyFn(kind)
// Zero length resource responses should be ignored and are the result of no
// data yet. Notice that this caused a bug originally where we had zero
// healthy endpoints for an upstream that would cause Envoy to hang waiting
// for the EDS response. This is fixed though by ensuring we send an explicit
// empty LoadAssignment resource for the cluster rather than allowing junky
// empty resources.
if len(currentVersions) == 0 && !allowEmpty {
// Nothing to send yet
return nil, false
}
resp, updates, err := t.createDeltaResponse(currentVersions, resourceMap, upsert, remove)
if err != nil {
return err, false
}
if resp == nil {
return nil, false
}
*nonce++
resp.Nonce = fmt.Sprintf("%08x", *nonce)
t.generator.logTraceResponse("Incremental xDS v3", resp)
logger.Trace("sending response", "nonce", resp.Nonce)
if err := t.stream.Send(resp); err != nil {
return err, false
}
logger.Trace("sent response", "nonce", resp.Nonce)
// Certain xDS types are children of other types, meaning that if an update is pushed for a parent,
// we MUST send new data for all its children. Envoy will NOT re-subscribe to the child data upon
// receiving updates for the parent, so we need to handle this ourselves.
//
// Note that we do not check whether the deltaChild.childType is registered here, since we send
// parent types before child types, meaning that it's expected on first send of a parent that
// there are no subscriptions for the child type.
if t.deltaChild != nil {
for name := range updates {
if children, ok := resourceMap.ChildIndex[t.typeURL][name]; ok {
// Capture the relevant child resource names on this pending update so
// we can know the linked children if Envoy ever re-subscribes to the parent resource.
t.deltaChild.childrenNames[name] = children
for _, childName := range children {
t.ensureChildResend(name, childName)
}
}
}
}
t.pendingUpdates[resp.Nonce] = updates
return nil, true
}
func (t *xDSDeltaType) createDeltaResponse(
currentVersions map[string]string, // name => version (as consul knows right now)
resourceMap *xdscommon.IndexedResources,
upsert, remove bool,
) (*envoy_discovery_v3.DeltaDiscoveryResponse, map[string]PendingUpdate, error) {
// compute difference
var (
hasRelevantUpdates = false
updates = make(map[string]PendingUpdate)
)
if t.wildcard {
// First find things that need updating or deleting
for name, envoyVers := range t.resourceVersions {
currVers, ok := currentVersions[name]
if !ok {
if remove {
hasRelevantUpdates = true
}
updates[name] = PendingUpdate{Remove: true}
} else if currVers != envoyVers {
if upsert {
hasRelevantUpdates = true
}
updates[name] = PendingUpdate{Version: currVers}
}
}
// Now find new things
for name, currVers := range currentVersions {
if _, known := t.resourceVersions[name]; known {
continue
}
if upsert {
hasRelevantUpdates = true
}
updates[name] = PendingUpdate{Version: currVers}
}
} else {
// First find things that need updating or deleting
// Walk the list of things currently stored in envoy
for name, envoyVers := range t.resourceVersions {
if t.subscribed(name) {
if currVers, ok := currentVersions[name]; ok {
if currVers != envoyVers {
if upsert {
hasRelevantUpdates = true
}
updates[name] = PendingUpdate{Version: currVers}
}
}
}
}
// Now find new things not in envoy yet
for name := range t.subscriptions {
if _, known := t.resourceVersions[name]; known {
continue
}
if currVers, ok := currentVersions[name]; ok {
updates[name] = PendingUpdate{Version: currVers}
if upsert {
hasRelevantUpdates = true
}
}
}
}
if !hasRelevantUpdates && t.sentToEnvoyOnce {
return nil, nil, nil
}
// now turn this into a disco response
resp := &envoy_discovery_v3.DeltaDiscoveryResponse{
// TODO(rb): consider putting something in SystemVersionInfo?
TypeUrl: t.typeURL,
}
realUpdates := make(map[string]PendingUpdate)
for name, obj := range updates {
if obj.Remove {
if remove {
resp.RemovedResources = append(resp.RemovedResources, name)
realUpdates[name] = PendingUpdate{Remove: true}
}
} else if upsert {
resources, ok := resourceMap.Index[t.typeURL]
if !ok {
return nil, nil, fmt.Errorf("unknown type url: %s", t.typeURL)
}
res, ok := resources[name]
if !ok {
return nil, nil, fmt.Errorf("unknown name for type url %q: %s", t.typeURL, name)
}
any, err := anypb.New(res)
if err != nil {
return nil, nil, err
}
resp.Resources = append(resp.Resources, &envoy_discovery_v3.Resource{
Name: name,
Resource: any,
Version: obj.Version,
})
realUpdates[name] = obj
}
}
return resp, realUpdates, nil
}
func (t *xDSDeltaType) ensureChildResend(parentName, childName string) {
if _, exist := t.deltaChild.childType.resourceVersions[childName]; !exist {
return
}
if !t.subscribed(childName) {
return
}
t.generator.Logger.Trace(
"triggering implicit update of resource",
"typeUrl", t.typeURL,
"resource", parentName,
"childTypeUrl", t.deltaChild.childType.typeURL,
"childResource", childName,
)
// resourceVersions tracks the last known version for this childName that Envoy
// has ACKed. By setting this to empty it effectively tells us that Envoy does
// not have any data for that child, and we need to re-send.
t.deltaChild.childType.resourceVersions[childName] = ""
}
func computeResourceVersions(resourceMap *xdscommon.IndexedResources) (map[string]map[string]string, error) {
out := make(map[string]map[string]string)
for typeUrl, resources := range resourceMap.Index {
m, err := hashResourceMap(resources)
if err != nil {
return nil, fmt.Errorf("failed to hash resources for %q: %v", typeUrl, err)
}
out[typeUrl] = m
}
return out, nil
}
func populateChildIndexMap(resourceMap *xdscommon.IndexedResources) error {
// LDS and RDS have a more complicated relationship.
for name, res := range resourceMap.Index[xdscommon.ListenerType] {
listener := res.(*envoy_listener_v3.Listener)
rdsRouteNames, err := extractRdsResourceNames(listener)
if err != nil {
return err
}
resourceMap.ChildIndex[xdscommon.ListenerType][name] = rdsRouteNames
}
// CDS and EDS share exact names.
for name := range resourceMap.Index[xdscommon.ClusterType] {
resourceMap.ChildIndex[xdscommon.ClusterType][name] = []string{name}
}
return nil
}
func hashResourceMap(resources map[string]proto.Message) (map[string]string, error) {
m := make(map[string]string)
for name, res := range resources {
h, err := hashResource(res)
if err != nil {
return nil, fmt.Errorf("failed to hash resource %q: %v", name, err)
}
m[name] = h
}
return m, nil
}
// hashResource will take a resource and create a SHA256 hash sum out of the marshaled bytes
func hashResource(res proto.Message) (string, error) {
h := sha256.New()
marshaller := proto.MarshalOptions{Deterministic: true}
data, err := marshaller.Marshal(res)
if err != nil {
return "", err
}
h.Write(data)
return hex.EncodeToString(h.Sum(nil)), nil
}