// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1

package xds

import (
	"crypto/sha256"
	"encoding/hex"
	"errors"
	"fmt"
	"os"
	"strconv"
	"sync"
	"sync/atomic"
	"time"

	"github.com/armon/go-metrics"
	envoy_config_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
	envoy_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
	envoy_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
	"github.com/hashicorp/go-hclog"
	goversion "github.com/hashicorp/go-version"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/types/known/anypb"

	"github.com/hashicorp/consul/agent/envoyextensions"
	external "github.com/hashicorp/consul/agent/grpc-external"
	"github.com/hashicorp/consul/agent/grpc-external/limiter"
	"github.com/hashicorp/consul/agent/proxycfg"
	"github.com/hashicorp/consul/agent/xds/configfetcher"
	"github.com/hashicorp/consul/agent/xds/extensionruntime"
	"github.com/hashicorp/consul/agent/xdsv2"
	"github.com/hashicorp/consul/envoyextensions/extensioncommon"
	"github.com/hashicorp/consul/envoyextensions/xdscommon"
	proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
	proxytracker "github.com/hashicorp/consul/internal/mesh/proxy-tracker"
	"github.com/hashicorp/consul/logging"
	pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v2beta1"
	"github.com/hashicorp/consul/proto-public/pbresource"
	"github.com/hashicorp/consul/version"
)

var errOverwhelmed = status.Error(codes.ResourceExhausted, "this server has too many xDS streams open, please try another")

// xdsProtocolLegacyChildResend enables the legacy behavior for the `ensureChildResend` function.
// This environment variable exists as an escape hatch so that users can disable the behavior, if needed.
// Ideally, this is a flag we can remove in 1.19+
var xdsProtocolLegacyChildResend = (os.Getenv("XDS_PROTOCOL_LEGACY_CHILD_RESEND") != "")

type deltaRecvResponse int

const (
	deltaRecvResponseNack deltaRecvResponse = iota
	deltaRecvResponseAck
	deltaRecvNewSubscription
	deltaRecvUnknownType
)

// ADSDeltaStream is a shorter way of referring to this thing...
type ADSDeltaStream = envoy_discovery_v3.AggregatedDiscoveryService_DeltaAggregatedResourcesServer

// DeltaAggregatedResources implements envoy_discovery_v3.AggregatedDiscoveryServiceServer
func (s *Server) DeltaAggregatedResources(stream ADSDeltaStream) error {
	defer s.activeStreams.Increment(stream.Context())()

	// a channel for receiving incoming requests
	reqCh := make(chan *envoy_discovery_v3.DeltaDiscoveryRequest)
	reqStop := int32(0)
	go func() {
		for {
			req, err := stream.Recv()
			if atomic.LoadInt32(&reqStop) != 0 {
				return
			}
			if err != nil {
				s.Logger.Error("Error receiving new DeltaDiscoveryRequest; closing request channel", "error", err)
				close(reqCh)
				return
			}
			reqCh <- req
		}
	}()

	err := s.processDelta(stream, reqCh)
	if err != nil {
		s.Logger.Error("Error handling ADS delta stream", "xdsVersion", "v3", "error", err)
	}

	// prevents writing to a closed channel if send failed on blocked recv
	atomic.StoreInt32(&reqStop, 1)

	return err
}

// getEnvoyConfiguration is a utility function that instantiates the proper
// Envoy resource generator based on whether it was passed a ConfigSource or
// ProxyState implementation of the ProxySnapshot interface and returns the
// generated Envoy configuration.
func getEnvoyConfiguration(proxySnapshot proxysnapshot.ProxySnapshot, logger hclog.Logger, cfgFetcher configfetcher.ConfigFetcher) (map[string][]proto.Message, error) {
	switch proxySnapshot.(type) {
	case *proxycfg.ConfigSnapshot:
		logger.Trace("ProxySnapshot update channel received a ProxySnapshot of type ConfigSnapshot")
		generator := NewResourceGenerator(
			logger,
			cfgFetcher,
			true,
		)

		c := proxySnapshot.(*proxycfg.ConfigSnapshot)
		return generator.AllResourcesFromSnapshot(c)
	case *proxytracker.ProxyState:
		logger.Trace("ProxySnapshot update channel received a ProxySnapshot of type ProxyState")
		generator := xdsv2.NewResourceGenerator(
			logger,
		)
		c := proxySnapshot.(*proxytracker.ProxyState)
		resources, err := generator.AllResourcesFromIR(c)
		if err != nil {
			logger.Error("error generating resources from proxy state template", "err", err)
			return nil, err
		}
		return resources, nil
	default:
		return nil, errors.New("proxysnapshot must be of type ProxyState or ConfigSnapshot")
	}
}

const (
	stateDeltaInit int = iota
	stateDeltaPendingInitialConfig
	stateDeltaRunning
)

func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discovery_v3.DeltaDiscoveryRequest) error {
	// Handle invalid ACL tokens up-front.
	if _, err := s.authenticate(stream.Context()); err != nil {
		return err
	}

	// Loop state
	var (
		proxySnapshot proxysnapshot.ProxySnapshot
		node          *envoy_config_core_v3.Node
		stateCh       <-chan proxysnapshot.ProxySnapshot
		drainCh       limiter.SessionTerminatedChan
		watchCancel   func()
		nonce         uint64 // xDS requires a unique nonce to correlate response/request pairs
		ready         bool   // set to true after the first snapshot arrives

		streamStartTime = time.Now()
		streamStartOnce sync.Once
	)

	var (
		// resourceMap is the SoTW we are incrementally attempting to sync to envoy.
		//
		// type => name => proto
		resourceMap = xdscommon.EmptyIndexedResources()

		// currentVersions is the xDS versioning represented by Resources.
		//
		// type => name => version (as consul knows right now)
		currentVersions = make(map[string]map[string]string)
	)

	logger := s.Logger.Named(logging.XDS).With("xdsVersion", "v3")

	// need to run a small state machine to get through initial authentication.
	var state = stateDeltaInit

	// Configure handlers for each type of request we currently care about.
	handlers := map[string]*xDSDeltaType{
		xdscommon.ListenerType: newDeltaType(logger, stream, xdscommon.ListenerType, func() bool {
			return proxySnapshot.AllowEmptyListeners()
		}),
		xdscommon.RouteType: newDeltaType(logger, stream, xdscommon.RouteType, func() bool {
			return proxySnapshot.AllowEmptyRoutes()
		}),
		xdscommon.ClusterType: newDeltaType(logger, stream, xdscommon.ClusterType, func() bool {
			return proxySnapshot.AllowEmptyClusters()
		}),
		xdscommon.EndpointType: newDeltaType(logger, stream, xdscommon.EndpointType, nil),
		xdscommon.SecretType:   newDeltaType(logger, stream, xdscommon.SecretType, nil), // TODO allowEmptyFn
	}

	// Endpoints are stored within a Cluster (and Routes
	// are stored within a Listener) so whenever the
	// enclosing resource is updated the inner resource
	// list is cleared implicitly.
	//
	// When this happens we should update our local
	// representation of envoy state to force an update.
	//
	// see: https://github.com/envoyproxy/envoy/issues/13009
	handlers[xdscommon.ListenerType].deltaChild = &xDSDeltaChild{
		childType:     handlers[xdscommon.RouteType],
		childrenNames: make(map[string][]string),
	}
	handlers[xdscommon.ClusterType].deltaChild = &xDSDeltaChild{
		childType:     handlers[xdscommon.EndpointType],
		childrenNames: make(map[string][]string),
	}

	var authTimer <-chan time.Time
	extendAuthTimer := func() {
		authTimer = time.After(s.AuthCheckFrequency)
	}

	checkStreamACLs := func(proxySnap proxysnapshot.ProxySnapshot) error {
		return s.authorize(stream.Context(), proxySnap)
	}

	for {
		select {
		case <-drainCh:
			logger.Debug("draining stream to rebalance load")
			metrics.IncrCounter([]string{"xds", "server", "streamDrained"}, 1)
			return errOverwhelmed
		case <-authTimer:
			// It's been too long since a Discovery{Request,Response} so recheck ACLs.
			if err := checkStreamACLs(proxySnapshot); err != nil {
				return err
			}
			extendAuthTimer()

		case req, ok := <-reqCh:
			if !ok {
				// reqCh is closed when stream.Recv errors which is how we detect client
				// going away. AFAICT the stream.Context() is only canceled once the
				// RPC method returns which it can't until we return from this one so
				// there's no point in blocking on that.
				return nil
			}

			logTraceRequest(logger, "Incremental xDS v3", req)

			if req.TypeUrl == "" {
				return status.Errorf(codes.InvalidArgument, "type URL is required for ADS")
			}

			var proxyFeatures xdscommon.SupportedProxyFeatures
			if node == nil && req.Node != nil {
				node = req.Node
				var err error
				proxyFeatures, err = xdscommon.DetermineSupportedProxyFeatures(req.Node)
				if err != nil {
					return status.Errorf(codes.InvalidArgument, err.Error())
				}
			}

			if handler, ok := handlers[req.TypeUrl]; ok {
				switch handler.Recv(req, proxyFeatures) {
				case deltaRecvNewSubscription:
					logger.Trace("subscribing to type", "typeUrl", req.TypeUrl)

				case deltaRecvResponseNack:
					logger.Trace("got nack response for type", "typeUrl", req.TypeUrl)

					// There is no reason to believe that generating new xDS resources from the same snapshot
					// would lead to an ACK from Envoy. Instead we continue to the top of this for loop and wait
					// for a new request or snapshot.
					continue
				}
			}

		case cs, ok := <-stateCh:
			if !ok {
				// stateCh is closed either when *we* cancel the watch (on-exit via defer)
				// or by the proxycfg.Manager when an irrecoverable error is encountered
				// such as the ACL token getting deleted.
				//
				// We know for sure that this is the latter case, because in the former we
				// would've already exited this loop.
				return status.Error(codes.Aborted, "xDS stream terminated due to an irrecoverable error, please try again")
			}
			proxySnapshot = cs

			newRes, err := getEnvoyConfiguration(proxySnapshot, logger, s.CfgFetcher)
			if err != nil {
				return status.Errorf(codes.Unavailable, "failed to generate all xDS resources from the snapshot: %v", err)
			}

			// index and hash the xDS structures
			newResourceMap := xdscommon.IndexResources(logger, newRes)

			if s.ResourceMapMutateFn != nil {
				s.ResourceMapMutateFn(newResourceMap)
			}

			if newResourceMap, err = s.applyEnvoyExtensions(newResourceMap, proxySnapshot, node); err != nil {
				// err is already the result of calling status.Errorf
				return err
			}

			if err := populateChildIndexMap(newResourceMap); err != nil {
				return status.Errorf(codes.Unavailable, "failed to index xDS resource versions: %v", err)
			}

			newVersions, err := computeResourceVersions(newResourceMap)
			if err != nil {
				return status.Errorf(codes.Unavailable, "failed to compute xDS resource versions: %v", err)
			}

			resourceMap = newResourceMap
			currentVersions = newVersions
			ready = true
		}

		// Trigger state machine
		switch state {
		case stateDeltaInit:
			if node == nil {
				// This can't happen (tm) since stateCh is nil until after the first req
				// is received but lets not panic about it.
				continue
			}

			nodeName := node.GetMetadata().GetFields()["node_name"].GetStringValue()
			if nodeName == "" {
				nodeName = s.NodeName
			}

			// Start authentication process, we need the proxyID
			proxyID := newResourceIDFromEnvoyNode(node)

			// Start watching config for that proxy
			var err error
			options, err := external.QueryOptionsFromContext(stream.Context())
			if err != nil {
				return status.Errorf(codes.Internal, "failed to watch proxy service: %s", err)
			}

			stateCh, drainCh, watchCancel, err = s.ProxyWatcher.Watch(proxyID, nodeName, options.Token)
			switch {
			case errors.Is(err, limiter.ErrCapacityReached):
				return errOverwhelmed
			case err != nil:
				return status.Errorf(codes.Internal, "failed to watch proxy: %s", err)
			}
			// Note that in this case we _intend_ the defer to only be triggered when
			// this whole process method ends (i.e. when streaming RPC aborts) not at
			// the end of the current loop iteration. We have to do it in the loop
			// here since we can't start watching until we get to this state in the
			// state machine.
			defer watchCancel()

			logger = logger.With("service_id", proxyID.Name) // enhance future logs

			logger.Trace("watching proxy, pending initial proxycfg snapshot for xDS")

			// Now wait for the config so we can check ACL
			state = stateDeltaPendingInitialConfig
		case stateDeltaPendingInitialConfig:
			if proxySnapshot == nil {
				// Nothing we can do until we get the initial config
				continue
			}

			// Got config, try to authenticate next.
			state = stateDeltaRunning

			// Upgrade the logger
			loggerName := proxySnapshot.LoggerName()
			if loggerName != "" {
				logger = logger.Named(loggerName)
			}

			logger.Trace("Got initial config snapshot")

			// Let's actually process the config we just got, or we'll miss responding
			fallthrough
		case stateDeltaRunning:
			// Check ACLs on every Discovery{Request,Response}.
			if err := checkStreamACLs(proxySnapshot); err != nil {
				return err
			}
			// For the first time through the state machine, this is when the
			// timer is first started.
			extendAuthTimer()

			if !ready {
				logger.Trace("Skipping delta computation because we haven't gotten a snapshot yet")
				continue
			}

			logger.Trace("Invoking all xDS resource handlers and sending changed data if there are any")

			streamStartOnce.Do(func() {
				metrics.MeasureSince([]string{"xds", "server", "streamStart"}, streamStartTime)
			})

			for _, op := range xDSUpdateOrder {
				if op.TypeUrl == xdscommon.ListenerType || op.TypeUrl == xdscommon.RouteType {
					if clusterHandler := handlers[xdscommon.ClusterType]; clusterHandler.registered && len(clusterHandler.pendingUpdates) > 0 {
						logger.Trace("Skipping delta computation for resource because there are dependent updates pending",
							"typeUrl", op.TypeUrl, "dependent", xdscommon.ClusterType)

						// Receiving an ACK from Envoy will unblock the select statement above,
						// and re-trigger an attempt to send these skipped updates.
						break
					}
					if endpointHandler := handlers[xdscommon.EndpointType]; endpointHandler.registered && len(endpointHandler.pendingUpdates) > 0 {
						logger.Trace("Skipping delta computation for resource because there are dependent updates pending",
							"typeUrl", op.TypeUrl, "dependent", xdscommon.EndpointType)

						// Receiving an ACK from Envoy will unblock the select statement above,
						// and re-trigger an attempt to send these skipped updates.
						break
					}
				}
				err, _ := handlers[op.TypeUrl].SendIfNew(currentVersions[op.TypeUrl], resourceMap, &nonce, op.Upsert, op.Remove)
				if err != nil {
					return status.Errorf(codes.Unavailable,
						"failed to send %sreply for type %q: %v",
						op.errorLogNameReplyPrefix(),
						op.TypeUrl, err)
				}
			}
		}
	}
}

// newResourceIDFromEnvoyNode is a utility function that allows creating a
// Resource ID from an Envoy proxy node so that existing delta calls can easily
// use ProxyWatcher interface arguments for Watch().
func newResourceIDFromEnvoyNode(node *envoy_config_core_v3.Node) *pbresource.ID {
	entMeta := parseEnterpriseMeta(node)

	return &pbresource.ID{
		Name: node.Id,
		Tenancy: &pbresource.Tenancy{
			Namespace: entMeta.NamespaceOrDefault(),
			Partition: entMeta.PartitionOrDefault(),
		},
		Type: pbmesh.ProxyStateTemplateType,
	}
}

func (s *Server) applyEnvoyExtensions(resources *xdscommon.IndexedResources, proxySnapshot proxysnapshot.ProxySnapshot, node *envoy_config_core_v3.Node) (*xdscommon.IndexedResources, error) {
	// TODO(proxystate)
	// This is a workaround for now as envoy extensions are not yet supported with ProxyState.
	// For now, we cast to proxycfg.ConfigSnapshot and no-op if it's the pbmesh.ProxyState type.
	var snapshot *proxycfg.ConfigSnapshot
	switch proxySnapshot.(type) {
	//TODO(proxystate): implement envoy extensions for ProxyState
	case *proxytracker.ProxyState:
		return resources, nil
	case *proxycfg.ConfigSnapshot:
		snapshot = proxySnapshot.(*proxycfg.ConfigSnapshot)
	default:
		return nil, status.Errorf(codes.InvalidArgument,
			"unsupported config snapshot type to apply envoy extensions to %T", proxySnapshot)
	}
	var err error
	envoyVersion := xdscommon.DetermineEnvoyVersionFromNode(node)
	consulVersion, err := goversion.NewVersion(version.Version)

	if err != nil {
		return nil, status.Errorf(codes.InvalidArgument, "failed to parse Consul version")
	}

	serviceConfigs := extensionruntime.GetRuntimeConfigurations(snapshot)
	for _, cfgs := range serviceConfigs {
		for _, cfg := range cfgs {
			resources, err = validateAndApplyEnvoyExtension(s.Logger, snapshot, resources, cfg, envoyVersion, consulVersion)

			if err != nil {
				return nil, err
			}
		}
	}

	return resources, nil
}

func validateAndApplyEnvoyExtension(logger hclog.Logger, cfgSnap *proxycfg.ConfigSnapshot, resources *xdscommon.IndexedResources, runtimeConfig extensioncommon.RuntimeConfig, envoyVersion, consulVersion *goversion.Version) (*xdscommon.IndexedResources, error) {
	logFn := logger.Warn
	if runtimeConfig.EnvoyExtension.Required {
		logFn = logger.Error
	}

	svc := runtimeConfig.ServiceName

	errorParams := []interface{}{
		"extension", runtimeConfig.EnvoyExtension.Name,
		"service", svc.Name,
		"namespace", svc.Namespace,
		"partition", svc.Partition,
	}

	getMetricLabels := func(err error) []metrics.Label {
		return []metrics.Label{
			{Name: "extension", Value: runtimeConfig.EnvoyExtension.Name},
			{Name: "version", Value: "builtin/" + version.Version},
			{Name: "service", Value: cfgSnap.Service},
			{Name: "partition", Value: cfgSnap.ProxyID.PartitionOrDefault()},
			{Name: "namespace", Value: cfgSnap.ProxyID.NamespaceOrDefault()},
			{Name: "error", Value: strconv.FormatBool(err != nil)},
		}
	}

	ext := runtimeConfig.EnvoyExtension

	if v := ext.EnvoyVersion; v != "" {
		c, err := goversion.NewConstraint(v)
		if err != nil {
			logFn("failed to parse Envoy extension version constraint", errorParams...)

			if ext.Required {
				return nil, status.Errorf(codes.InvalidArgument, "failed to parse Envoy version constraint for extension %q for service %q", ext.Name, svc.Name)
			}
			return resources, nil
		}

		if !c.Check(envoyVersion) {
			logger.Info("skipping envoy extension due to Envoy version constraint violation", errorParams...)
			return resources, nil
		}
	}

	if v := ext.ConsulVersion; v != "" {
		c, err := goversion.NewConstraint(v)
		if err != nil {
			logFn("failed to parse Consul extension version constraint", errorParams...)

			if ext.Required {
				return nil, status.Errorf(codes.InvalidArgument, "failed to parse Consul version constraint for extension %q for service %q", ext.Name, svc.Name)
			}
			return resources, nil
		}

		if !c.Check(consulVersion) {
			logger.Info("skipping envoy extension due to Consul version constraint violation", errorParams...)
			return resources, nil
		}
	}

	now := time.Now()
	extender, err := envoyextensions.ConstructExtension(ext)
	metrics.MeasureSinceWithLabels([]string{"envoy_extension", "validate_arguments"}, now, getMetricLabels(err))
	if err != nil {
		errorParams = append(errorParams, "error", err)
		logFn("failed to construct extension", errorParams...)

		if ext.Required {
			return nil, status.Errorf(codes.InvalidArgument, "failed to construct extension %q for service %q", ext.Name, svc.Name)
		}

		return resources, nil
	}

	now = time.Now()
	err = extender.Validate(&runtimeConfig)
	metrics.MeasureSinceWithLabels([]string{"envoy_extension", "validate"}, now, getMetricLabels(err))
	if err != nil {
		errorParams = append(errorParams, "error", err)
		logFn("failed to validate extension arguments", errorParams...)

		if ext.Required {
			return nil, status.Errorf(codes.InvalidArgument, "failed to validate arguments for extension %q for service %q", ext.Name, svc.Name)
		}

		return resources, nil
	}

	now = time.Now()
	resources, err = applyEnvoyExtension(extender, resources, &runtimeConfig)
	metrics.MeasureSinceWithLabels([]string{"envoy_extension", "extend"}, now, getMetricLabels(err))
	if err != nil {
		errorParams = append(errorParams, "error", err)
		logFn("failed to apply envoy extension", errorParams...)

		if ext.Required {
			return nil, status.Errorf(codes.InvalidArgument, "failed to patch xDS resources in the %q extension: %v", ext.Name, err)
		}
	}

	return resources, nil
}

// applyEnvoyExtension safely checks whether an extension can be applied, and if so attempts to apply it.
//
// applyEnvoyExtension makes a copy of the provided IndexedResources, then applies the given extension to them.
// The copy ensures against partial application if a non-required extension modifies a resource then fails at a later
// stage; this is necessary because IndexedResources and its proto messages are all passed by reference, and
// non-required extensions do not lead to a terminal failure in xDS updates.
//
// If the application is successful, the modified copy is returned. If not, the original and an error is returned.
// Returning resources in either case allows for applying extensions in a loop and reporting on non-required extension
// failures simultaneously.
func applyEnvoyExtension(extender extensioncommon.EnvoyExtender, resources *xdscommon.IndexedResources, runtimeConfig *extensioncommon.RuntimeConfig) (r *xdscommon.IndexedResources, e error) {
	// Don't panic due to an extension misbehaving.
	defer func() {
		if err := recover(); err != nil {
			r = resources
			e = fmt.Errorf("attempt to apply Envoy extension %q caused an unexpected panic: %v",
				runtimeConfig.EnvoyExtension.Name, err)
		}
	}()

	// First check whether the extension is eligible for application in the current environment.
	// Do this before copying indexed resources for the sake of efficiency.
	if !extender.CanApply(runtimeConfig) {
		return resources, nil
	}

	newResources, err := extender.Extend(xdscommon.Clone(resources), runtimeConfig)
	if err != nil {
		return resources, err
	}

	return newResources, nil
}

// https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#eventual-consistency-considerations
var xDSUpdateOrder = []xDSUpdateOperation{
	// 1. SDS updates (if any) can be pushed here with no harm.
	{TypeUrl: xdscommon.SecretType, Upsert: true},
	// 2. CDS updates (if any) must always be pushed before the following types.
	{TypeUrl: xdscommon.ClusterType, Upsert: true},
	// 3. EDS updates (if any) must arrive after CDS updates for the respective clusters.
	{TypeUrl: xdscommon.EndpointType, Upsert: true},
	// 4. LDS updates must arrive after corresponding CDS/EDS updates.
	{TypeUrl: xdscommon.ListenerType, Upsert: true, Remove: true},
	// 5. RDS updates related to the newly added listeners must arrive after CDS/EDS/LDS updates.
	{TypeUrl: xdscommon.RouteType, Upsert: true, Remove: true},
	// 6. (NOT IMPLEMENTED YET IN CONSUL) VHDS updates (if any) related to the newly added RouteConfigurations must arrive after RDS updates.
	// {},
	// 7. Stale CDS clusters, related EDS endpoints (ones no longer being referenced) and SDS secrets can then be removed.
	{TypeUrl: xdscommon.ClusterType, Remove: true},
	{TypeUrl: xdscommon.EndpointType, Remove: true},
	{TypeUrl: xdscommon.SecretType, Remove: true},
	// xDS updates can be pushed independently if no new
	// clusters/routes/listeners are added or if it’s acceptable to
	// temporarily drop traffic during updates. Note that in case of
	// LDS updates, the listeners will be warmed before they receive
	// traffic, i.e. the dependent routes are fetched through RDS if
	// configured. Clusters are warmed when adding/removing/updating
	// clusters. On the other hand, routes are not warmed, i.e., the
	// management plane must ensure that clusters referenced by a route
	// are in place, before pushing the updates for a route.
}

type xDSUpdateOperation struct {
	TypeUrl string
	Upsert  bool
	Remove  bool
}

func (op *xDSUpdateOperation) errorLogNameReplyPrefix() string {
	switch {
	case op.Upsert && op.Remove:
		return "upsert/remove "
	case op.Upsert:
		return "upsert "
	case op.Remove:
		return "remove "
	default:
		return ""
	}
}

type xDSDeltaChild struct {
	// childType is a type that in Envoy is actually stored within this type.
	// Upserts of THIS type should potentially trigger dependent named
	// resources within the child to be re-configured.
	childType *xDSDeltaType

	// childrenNames is map of parent resource names to a list of associated child resource
	// names.
	childrenNames map[string][]string
}

type xDSDeltaType struct {
	logger       hclog.Logger
	stream       ADSDeltaStream
	typeURL      string
	allowEmptyFn func() bool

	// deltaChild contains data for an xDS child type if there is one.
	// For example, endpoints are a child type of clusters.
	deltaChild *xDSDeltaChild

	// registered indicates if this type has been requested at least once by
	// the proxy
	registered bool

	// wildcard indicates that this type was requested with no preference for
	// specific resource names. subscribe/unsubscribe are ignored.
	wildcard bool

	// sentToEnvoyOnce is true after we've sent one response to envoy.
	sentToEnvoyOnce bool

	// subscriptions is the set of currently subscribed envoy resources.
	// If wildcard == true, this will be empty.
	subscriptions map[string]struct{}

	// resourceVersions is the current view of CONFIRMED/ACKed updates to
	// envoy's view of the loaded resources.
	//
	// name => version
	resourceVersions map[string]string

	// pendingUpdates is a set of un-ACKed updates to the 'resourceVersions'
	// map. Once we get an ACK from envoy we'll update the resourceVersions map
	// and strike the entry from this map.
	//
	// nonce -> name -> {version}
	pendingUpdates map[string]map[string]PendingUpdate
}

func (t *xDSDeltaType) subscribed(name string) bool {
	if t.wildcard {
		return true
	}
	_, subscribed := t.subscriptions[name]
	return subscribed
}

type PendingUpdate struct {
	Remove  bool
	Version string
}

func newDeltaType(
	logger hclog.Logger,
	stream ADSDeltaStream,
	typeUrl string,
	allowEmptyFn func() bool,
) *xDSDeltaType {
	return &xDSDeltaType{
		logger:           logger,
		stream:           stream,
		typeURL:          typeUrl,
		allowEmptyFn:     allowEmptyFn,
		subscriptions:    make(map[string]struct{}),
		resourceVersions: make(map[string]string),
		pendingUpdates:   make(map[string]map[string]PendingUpdate),
	}
}

// Recv handles new discovery requests from envoy.
//
// Returns true the first time a type receives a request.
func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest, sf xdscommon.SupportedProxyFeatures) deltaRecvResponse {
	if t == nil {
		return deltaRecvUnknownType // not something we care about
	}

	registeredThisTime := false
	if !t.registered {
		// We are in the wildcard mode if the first request of a particular
		// type has empty subscription list
		t.wildcard = len(req.ResourceNamesSubscribe) == 0
		t.registered = true
		registeredThisTime = true
	}

	/*
		DeltaDiscoveryRequest can be sent in the following situations:

		Initial message in a xDS bidirectional gRPC stream.

		As an ACK or NACK response to a previous DeltaDiscoveryResponse. In
		this case the response_nonce is set to the nonce value in the Response.
		ACK or NACK is determined by the absence or presence of error_detail.

		Spontaneous DeltaDiscoveryRequests from the client. This can be done to
		dynamically add or remove elements from the tracked resource_names set.
		In this case response_nonce must be omitted.

	*/

	/*
		DeltaDiscoveryRequest plays two independent roles. Any
		DeltaDiscoveryRequest can be either or both of:
	*/

	if req.ResponseNonce != "" {
		/*
			[2] (N)ACKing an earlier resource update from the server (using
			response_nonce, with presence of error_detail making it a NACK).
		*/
		if req.ErrorDetail == nil {
			t.logger.Trace("got ok response from envoy proxy", "nonce", req.ResponseNonce)
			t.ack(req.ResponseNonce)
		} else {
			t.logger.Error("got error response from envoy proxy", "nonce", req.ResponseNonce,
				"error", status.ErrorProto(req.ErrorDetail))
			t.nack(req.ResponseNonce)
			return deltaRecvResponseNack
		}
	}

	if registeredThisTime && len(req.InitialResourceVersions) > 0 {
		/*
			Additionally, the first message (for a given type_url) of a
			reconnected gRPC stream has a third role:

			[3] informing the server of the resources (and their versions) that
			the client already possesses, using the initial_resource_versions
			field.
		*/
		t.logger.Trace("setting initial resource versions for stream",
			"resources", req.InitialResourceVersions)
		t.resourceVersions = req.InitialResourceVersions
		if !t.wildcard {
			for k := range req.InitialResourceVersions {
				t.subscriptions[k] = struct{}{}
			}
		}
	}

	if !t.wildcard {
		/*
			[1] informing the server of what resources the client has
			gained/lost interest in (using resource_names_subscribe and
			resource_names_unsubscribe), or
		*/
		for _, name := range req.ResourceNamesSubscribe {
			// A resource_names_subscribe field may contain resource names that
			// the server believes the client is already subscribed to, and
			// furthermore has the most recent versions of. However, the server
			// must still provide those resources in the response; due to
			// implementation details hidden from the server, the client may
			// have “forgotten” those resources despite apparently remaining
			// subscribed.
			//
			// NOTE: the server must respond with all resources listed in
			// resource_names_subscribe, even if it believes the client has the
			// most recent version of them. The reason: the client may have
			// dropped them, but then regained interest before it had a chance
			// to send the unsubscribe message.
			//
			// We handle that here by ALWAYS wiping the version so the diff
			// decides to send the value.
			_, alreadySubscribed := t.subscriptions[name]
			t.subscriptions[name] = struct{}{}

			// Reset the tracked version so we force a reply.
			if _, alreadyTracked := t.resourceVersions[name]; alreadyTracked {
				t.resourceVersions[name] = ""
			}

			// Certain xDS types are children of other types, meaning that if Envoy subscribes to a parent.
			// We MUST assume that if Envoy ever had data for the children of this parent, then the child's
			// data is gone.
			if t.deltaChild != nil && t.deltaChild.childType.registered {
				for _, childName := range t.deltaChild.childrenNames[name] {
					t.ensureChildResend(name, childName)
				}
			}

			if alreadySubscribed {
				t.logger.Trace("re-subscribing resource for stream", "resource", name)
			} else {
				t.logger.Trace("subscribing resource for stream", "resource", name)
			}
		}

		for _, name := range req.ResourceNamesUnsubscribe {
			if _, ok := t.subscriptions[name]; !ok {
				continue
			}
			delete(t.subscriptions, name)
			t.logger.Trace("unsubscribing resource for stream", "resource", name)
			// NOTE: we'll let the normal differential comparison handle cleaning up resourceVersions
		}
	}

	if registeredThisTime {
		return deltaRecvNewSubscription
	}
	return deltaRecvResponseAck
}

func (t *xDSDeltaType) ack(nonce string) {
	pending, ok := t.pendingUpdates[nonce]
	if !ok {
		return
	}

	for name, obj := range pending {
		if obj.Remove {
			delete(t.resourceVersions, name)
			continue
		}

		t.resourceVersions[name] = obj.Version
	}
	t.sentToEnvoyOnce = true
	delete(t.pendingUpdates, nonce)
}

func (t *xDSDeltaType) nack(nonce string) {
	delete(t.pendingUpdates, nonce)
}

func (t *xDSDeltaType) SendIfNew(
	currentVersions map[string]string, // type => name => version (as consul knows right now)
	resourceMap *xdscommon.IndexedResources,
	nonce *uint64,
	upsert, remove bool,
) (error, bool) {
	if t == nil || !t.registered {
		return nil, false
	}

	// Wait for Envoy to catch up with this delta type before sending something new.
	if len(t.pendingUpdates) > 0 {
		return nil, false
	}

	logger := t.logger.With("typeUrl", t.typeURL)

	allowEmpty := t.allowEmptyFn != nil && t.allowEmptyFn()

	// Zero length resource responses should be ignored and are the result of no
	// data yet. Notice that this caused a bug originally where we had zero
	// healthy endpoints for an upstream that would cause Envoy to hang waiting
	// for the EDS response. This is fixed though by ensuring we send an explicit
	// empty LoadAssignment resource for the cluster rather than allowing junky
	// empty resources.
	if len(currentVersions) == 0 && !allowEmpty {
		// Nothing to send yet
		return nil, false
	}

	resp, updates, err := t.createDeltaResponse(currentVersions, resourceMap, upsert, remove)
	if err != nil {
		return err, false
	}

	if resp == nil {
		return nil, false
	}

	*nonce++
	resp.Nonce = fmt.Sprintf("%08x", *nonce)

	logTraceResponse(t.logger, "Incremental xDS v3", resp)

	logger.Trace("sending response", "nonce", resp.Nonce)
	if err := t.stream.Send(resp); err != nil {
		return err, false
	}
	logger.Trace("sent response", "nonce", resp.Nonce)

	// Certain xDS types are children of other types, meaning that if an update is pushed for a parent,
	// we MUST send new data for all its children. Envoy will NOT re-subscribe to the child data upon
	// receiving updates for the parent, so we need to handle this ourselves.
	//
	// Note that we do not check whether the deltaChild.childType is registered here, since we send
	// parent types before child types, meaning that it's expected on first send of a parent that
	// there are no subscriptions for the child type.
	if t.deltaChild != nil {
		for name := range updates {
			if children, ok := resourceMap.ChildIndex[t.typeURL][name]; ok {
				// Capture the relevant child resource names on this pending update so
				// we can know the linked children if Envoy ever re-subscribes to the parent resource.
				t.deltaChild.childrenNames[name] = children

				for _, childName := range children {
					t.ensureChildResend(name, childName)
				}
			}
		}
	}
	t.pendingUpdates[resp.Nonce] = updates

	return nil, true
}

func (t *xDSDeltaType) createDeltaResponse(
	currentVersions map[string]string, // name => version (as consul knows right now)
	resourceMap *xdscommon.IndexedResources,
	upsert, remove bool,
) (*envoy_discovery_v3.DeltaDiscoveryResponse, map[string]PendingUpdate, error) {
	// compute difference
	var (
		hasRelevantUpdates = false
		updates            = make(map[string]PendingUpdate)
	)

	if t.wildcard {
		// First find things that need updating or deleting
		for name, envoyVers := range t.resourceVersions {
			currVers, ok := currentVersions[name]
			if !ok {
				if remove {
					hasRelevantUpdates = true
				}
				updates[name] = PendingUpdate{Remove: true}
			} else if currVers != envoyVers {
				if upsert {
					hasRelevantUpdates = true
				}
				updates[name] = PendingUpdate{Version: currVers}
			}
		}

		// Now find new things
		for name, currVers := range currentVersions {
			if _, known := t.resourceVersions[name]; known {
				continue
			}
			if upsert {
				hasRelevantUpdates = true
			}
			updates[name] = PendingUpdate{Version: currVers}
		}
	} else {
		// First find things that need updating or deleting

		// Walk the list of things currently stored in envoy
		for name, envoyVers := range t.resourceVersions {
			if t.subscribed(name) {
				if currVers, ok := currentVersions[name]; ok {
					if currVers != envoyVers {
						if upsert {
							hasRelevantUpdates = true
						}
						updates[name] = PendingUpdate{Version: currVers}
					}
				}
			}
		}

		// Now find new things not in envoy yet
		for name := range t.subscriptions {
			if _, known := t.resourceVersions[name]; known {
				continue
			}
			if currVers, ok := currentVersions[name]; ok {
				updates[name] = PendingUpdate{Version: currVers}
				if upsert {
					hasRelevantUpdates = true
				}
			}
		}
	}

	if !hasRelevantUpdates && t.sentToEnvoyOnce {
		return nil, nil, nil
	}

	// now turn this into a disco response
	resp := &envoy_discovery_v3.DeltaDiscoveryResponse{
		// TODO(rb): consider putting something in SystemVersionInfo?
		TypeUrl: t.typeURL,
	}
	realUpdates := make(map[string]PendingUpdate)
	for name, obj := range updates {
		if obj.Remove {
			if remove {
				resp.RemovedResources = append(resp.RemovedResources, name)
				realUpdates[name] = PendingUpdate{Remove: true}
			}
		} else if upsert {
			resources, ok := resourceMap.Index[t.typeURL]
			if !ok {
				return nil, nil, fmt.Errorf("unknown type url: %s", t.typeURL)
			}
			res, ok := resources[name]
			if !ok {
				return nil, nil, fmt.Errorf("unknown name for type url %q: %s", t.typeURL, name)
			}
			any, err := anypb.New(res)
			if err != nil {
				return nil, nil, err
			}

			resp.Resources = append(resp.Resources, &envoy_discovery_v3.Resource{
				Name:     name,
				Resource: any,
				Version:  obj.Version,
			})
			realUpdates[name] = obj
		}
	}

	return resp, realUpdates, nil
}

func (t *xDSDeltaType) ensureChildResend(parentName, childName string) {
	if !t.subscribed(childName) {
		return
	}
	t.logger.Trace(
		"triggering implicit update of resource",
		"typeUrl", t.typeURL,
		"resource", parentName,
		"childTypeUrl", t.deltaChild.childType.typeURL,
		"childResource", childName,
	)
	// resourceVersions tracks the last known version for this childName that Envoy
	// has ACKed. By setting this to empty it effectively tells us that Envoy does
	// not have any data for that child, and we need to re-send.
	if _, exist := t.deltaChild.childType.resourceVersions[childName]; exist {
		t.deltaChild.childType.resourceVersions[childName] = ""
	}

	if xdsProtocolLegacyChildResend {
		return
		// TODO: This legacy behavior can be removed in 1.19, provided there are no outstanding issues.
		//
		// In this legacy mode, there is a confirmed race condition:
		// - Send update endpoints
		// - Send update cluster
		// - Recv ACK endpoints
		// - Recv ACK cluster
		//
		// When this situation happens, Envoy wipes the child endpoints when the cluster is updated,
		// but it would never receive new ones. The endpoints would not be resent, because their hash
		// never changed since the previous ACK.
		//
		// Due to ambiguity with the Envoy protocol [https://github.com/envoyproxy/envoy/issues/13009],
		// it's difficult to state with certainty that no other unexpected side-effects are possible.
		// This legacy escape hatch is left in-place in case some other complex race condition crops up.
		//
		// Longer-term, we should modify the hash of children to include the parent hash so that this
		// behavior is implicitly handled, rather than being an edge case.
	}

	// pendingUpdates can contain newer versions that have been sent to Envoy but
	// that we haven't processed an ACK for yet. These need to be cleared out, too,
	// so that they aren't moved to resourceVersions by ack()
	for nonce := range t.deltaChild.childType.pendingUpdates {
		delete(t.deltaChild.childType.pendingUpdates[nonce], childName)
	}
}

func computeResourceVersions(resourceMap *xdscommon.IndexedResources) (map[string]map[string]string, error) {
	out := make(map[string]map[string]string)
	for typeUrl, resources := range resourceMap.Index {
		m, err := hashResourceMap(resources)
		if err != nil {
			return nil, fmt.Errorf("failed to hash resources for %q: %v", typeUrl, err)
		}
		out[typeUrl] = m
	}
	return out, nil
}

func populateChildIndexMap(resourceMap *xdscommon.IndexedResources) error {
	// LDS and RDS have a more complicated relationship.
	for name, res := range resourceMap.Index[xdscommon.ListenerType] {
		listener := res.(*envoy_listener_v3.Listener)
		rdsRouteNames, err := extractRdsResourceNames(listener)
		if err != nil {
			return err
		}
		resourceMap.ChildIndex[xdscommon.ListenerType][name] = rdsRouteNames
	}

	// CDS and EDS share exact names.
	for name := range resourceMap.Index[xdscommon.ClusterType] {
		resourceMap.ChildIndex[xdscommon.ClusterType][name] = []string{name}
	}

	return nil
}

func hashResourceMap(resources map[string]proto.Message) (map[string]string, error) {
	m := make(map[string]string)
	for name, res := range resources {
		h, err := hashResource(res)
		if err != nil {
			return nil, fmt.Errorf("failed to hash resource %q: %v", name, err)
		}
		m[name] = h
	}
	return m, nil
}

// hashResource will take a resource and create a SHA256 hash sum out of the marshaled bytes
func hashResource(res proto.Message) (string, error) {
	h := sha256.New()
	marshaller := proto.MarshalOptions{Deterministic: true}

	data, err := marshaller.Marshal(res)
	if err != nil {
		return "", err
	}
	h.Write(data)

	return hex.EncodeToString(h.Sum(nil)), nil
}