// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package topology

import (
	"context"
	"fmt"
	"testing"

	"github.com/stretchr/testify/require"
	"github.com/testcontainers/testcontainers-go"

	"github.com/hashicorp/consul/api"

	libassert "github.com/hashicorp/consul/test/integration/consul-container/libs/assert"
	libcluster "github.com/hashicorp/consul/test/integration/consul-container/libs/cluster"
	libservice "github.com/hashicorp/consul/test/integration/consul-container/libs/service"
	"github.com/hashicorp/consul/test/integration/consul-container/libs/utils"
)

const (
	AcceptingPeerName = "accepting-to-dialer"
	DialingPeerName   = "dialing-to-acceptor"
)

type BuiltCluster struct {
	Cluster   *libcluster.Cluster
	Context   *libcluster.BuildContext
	Service   libservice.Service
	Container libservice.Service
	Gateway   libservice.Service
}

type PeeringClusterSize struct {
	AcceptingNumServers int
	AcceptingNumClients int
	DialingNumServers   int
	DialingNumClients   int
}

// BasicPeeringTwoClustersSetup sets up a scenario for testing peering, which consists of
//
//   - an accepting cluster with 3 servers and 1 client agent. The client should be used to
//     host a service for export: staticServerSvc.
//   - an dialing cluster with 1 server and 1 client. The client should be used to host a
//     service connecting to staticServerSvc.
//   - Create the peering, export the service from accepting cluster, and verify service
//     connectivity.
//
// It returns objects of the accepting cluster, dialing cluster, staticServerSvc, and staticClientSvcSidecar
func BasicPeeringTwoClustersSetup(
	t *testing.T,
	consulImage string,
	consulVersion string,
	pcs PeeringClusterSize,
	peeringThroughMeshgateway bool,
) (*BuiltCluster, *BuiltCluster) {
	acceptingCluster, acceptingCtx, acceptingClient := NewCluster(t, &ClusterConfig{
		NumServers: pcs.AcceptingNumServers,
		NumClients: pcs.AcceptingNumClients,
		BuildOpts: &libcluster.BuildOptions{
			Datacenter:           "dc1",
			ConsulImageName:      consulImage,
			ConsulVersion:        consulVersion,
			InjectAutoEncryption: true,
		},
		ApplyDefaultProxySettings: true,
	})

	dialingCluster, dialingCtx, dialingClient := NewCluster(t, &ClusterConfig{
		NumServers: pcs.DialingNumServers,
		NumClients: pcs.DialingNumClients,
		BuildOpts: &libcluster.BuildOptions{
			Datacenter:           "dc2",
			ConsulImageName:      consulImage,
			ConsulVersion:        consulVersion,
			InjectAutoEncryption: true,
		},
		ApplyDefaultProxySettings: true,
	})

	// Create the mesh gateway for dataplane traffic and peering control plane traffic (if enabled)
	gwCfg := libservice.GatewayConfig{
		Name: "mesh",
		Kind: "mesh",
	}
	acceptingClusterGateway, err := libservice.NewGatewayService(context.Background(), gwCfg, acceptingCluster.Clients()[0])
	require.NoError(t, err)
	dialingClusterGateway, err := libservice.NewGatewayService(context.Background(), gwCfg, dialingCluster.Clients()[0])
	require.NoError(t, err)

	// Enable peering control plane traffic through mesh gateway
	if peeringThroughMeshgateway {
		req := &api.MeshConfigEntry{
			Peering: &api.PeeringMeshConfig{
				PeerThroughMeshGateways: true,
			},
		}
		configCluster := func(cli *api.Client) error {
			libassert.CatalogServiceExists(t, cli, "mesh", nil)
			ok, _, err := cli.ConfigEntries().Set(req, &api.WriteOptions{})
			if !ok {
				return fmt.Errorf("config entry is not set")
			}

			if err != nil {
				return fmt.Errorf("error writing config entry: %s", err)
			}
			return nil
		}
		err = configCluster(dialingClient)
		require.NoError(t, err)
		err = configCluster(acceptingClient)
		require.NoError(t, err)
	}

	require.NoError(t, dialingCluster.PeerWithCluster(acceptingClient, AcceptingPeerName, DialingPeerName))

	libassert.PeeringStatus(t, acceptingClient, AcceptingPeerName, api.PeeringStateActive)
	// libassert.PeeringExports(t, acceptingClient, acceptingPeerName, 1)

	// Register an static-server service in acceptingCluster and export to dialing cluster
	var serverService, serverSidecarService libservice.Service
	{
		clientNode := acceptingCluster.Clients()[0]

		// Create a service and proxy instance
		var err error
		// Create a service and proxy instance
		serviceOpts := libservice.ServiceOpts{
			Name:     libservice.StaticServerServiceName,
			ID:       "static-server",
			Meta:     map[string]string{"version": ""},
			HTTPPort: 8080,
			GRPCPort: 8079,
		}
		serverService, serverSidecarService, err = libservice.CreateAndRegisterStaticServerAndSidecar(clientNode, &serviceOpts)
		require.NoError(t, err)

		libassert.CatalogServiceExists(t, acceptingClient, libservice.StaticServerServiceName, nil)
		libassert.CatalogServiceExists(t, acceptingClient, "static-server-sidecar-proxy", nil)

		require.NoError(t, serverService.Export("default", AcceptingPeerName, acceptingClient))
	}

	// Register an static-client service in dialing cluster and set upstream to static-server service
	var clientSidecarService *libservice.ConnectContainer
	{
		clientNode := dialingCluster.Clients()[0]

		// Create a service and proxy instance
		var err error
		clientSidecarService, err = libservice.CreateAndRegisterStaticClientSidecar(clientNode, DialingPeerName, true, false)
		require.NoError(t, err)

		libassert.CatalogServiceExists(t, dialingClient, "static-client-sidecar-proxy", nil)

	}

	_, adminPort := clientSidecarService.GetAdminAddr()
	libassert.AssertUpstreamEndpointStatus(t, adminPort, fmt.Sprintf("static-server.default.%s.external", DialingPeerName), "HEALTHY", 1)
	_, port := clientSidecarService.GetAddr()
	libassert.HTTPServiceEchoes(t, "localhost", port, "")
	libassert.AssertFortioName(t, fmt.Sprintf("http://localhost:%d", port), libservice.StaticServerServiceName, "")

	return &BuiltCluster{
			Cluster:   acceptingCluster,
			Context:   acceptingCtx,
			Service:   serverSidecarService,
			Container: serverSidecarService,
			Gateway:   acceptingClusterGateway,
		},
		&BuiltCluster{
			Cluster:   dialingCluster,
			Context:   dialingCtx,
			Service:   nil,
			Container: clientSidecarService,
			Gateway:   dialingClusterGateway,
		}
}

type ClusterConfig struct {
	NumServers                int
	NumClients                int
	ApplyDefaultProxySettings bool
	BuildOpts                 *libcluster.BuildOptions
	Cmd                       string
	LogConsumer               *TestLogConsumer
	Ports                     []int
}

// NewCluster creates a cluster with peering enabled. It also creates
// and registers a mesh-gateway at the client agent. The API client returned is
// pointed at the client agent.
// - proxy-defaults.protocol = tcp
func NewCluster(
	t *testing.T,
	config *ClusterConfig,
) (*libcluster.Cluster, *libcluster.BuildContext, *api.Client) {
	var (
		cluster *libcluster.Cluster
		err     error
	)
	require.NotEmpty(t, config.BuildOpts.Datacenter)
	require.True(t, config.NumServers > 0)

	opts := libcluster.BuildOptions{
		Datacenter:             config.BuildOpts.Datacenter,
		InjectAutoEncryption:   config.BuildOpts.InjectAutoEncryption,
		InjectGossipEncryption: true,
		AllowHTTPAnyway:        true,
		ConsulVersion:          config.BuildOpts.ConsulVersion,
		ACLEnabled:             config.BuildOpts.ACLEnabled,
		LogStore:               config.BuildOpts.LogStore,
	}
	ctx := libcluster.NewBuildContext(t, opts)

	serverConf := libcluster.NewConfigBuilder(ctx).
		Bootstrap(config.NumServers).
		Peering(true).
		ToAgentConfig(t)
	t.Logf("%s server config: \n%s", opts.Datacenter, serverConf.JSON)

	// optional
	if config.LogConsumer != nil {
		serverConf.LogConsumer = config.LogConsumer
	}

	t.Logf("Cluster config:\n%s", serverConf.JSON)

	// optional custom cmd
	if config.Cmd != "" {
		serverConf.Cmd = append(serverConf.Cmd, config.Cmd)
	}

	if config.Ports != nil {
		cluster, err = libcluster.New(t, []libcluster.Config{*serverConf}, config.Ports...)
	} else {
		cluster, err = libcluster.NewN(t, *serverConf, config.NumServers)
	}
	require.NoError(t, err)
	// builder generates certs for us, so copy them back
	if opts.InjectAutoEncryption {
		cluster.CACert = serverConf.CACert
	}

	var retryJoin []string
	for i := 0; i < config.NumServers; i++ {
		retryJoin = append(retryJoin, fmt.Sprintf("agent-%d", i))
	}

	// Add numClients static clients to register the service
	configbuiilder := libcluster.NewConfigBuilder(ctx).
		Client().
		Peering(true).
		RetryJoin(retryJoin...)
	clientConf := configbuiilder.ToAgentConfig(t)
	t.Logf("%s client config: \n%s", opts.Datacenter, clientConf.JSON)

	require.NoError(t, cluster.AddN(*clientConf, config.NumClients, true))

	// Use the client agent as the HTTP endpoint since we will not rotate it in many tests.
	var client *api.Client
	if config.NumClients > 0 {
		clientNode := cluster.Agents[config.NumServers]
		client = clientNode.GetClient()
	} else {
		client = cluster.Agents[0].GetClient()
	}
	libcluster.WaitForLeader(t, cluster, client)
	libcluster.WaitForMembers(t, client, config.NumServers+config.NumClients)

	// Default Proxy Settings
	if config.ApplyDefaultProxySettings {
		ok, err := utils.ApplyDefaultProxySettings(client)
		require.NoError(t, err)
		require.True(t, ok)
	}

	return cluster, ctx, client
}

type TestLogConsumer struct {
	Msgs []string
}

func (g *TestLogConsumer) Accept(l testcontainers.Log) {
	g.Msgs = append(g.Msgs, string(l.Content))
}