You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
consul/test/integration/consul-container/libs/topology/peering_topology.go

314 lines
9.8 KiB

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package topology
import (
"context"
"fmt"
"testing"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
"github.com/hashicorp/consul/api"
libassert "github.com/hashicorp/consul/test/integration/consul-container/libs/assert"
libcluster "github.com/hashicorp/consul/test/integration/consul-container/libs/cluster"
libservice "github.com/hashicorp/consul/test/integration/consul-container/libs/service"
"github.com/hashicorp/consul/test/integration/consul-container/libs/utils"
)
const (
AcceptingPeerName = "accepting-to-dialer"
DialingPeerName = "dialing-to-acceptor"
)
type BuiltCluster struct {
Cluster *libcluster.Cluster
Context *libcluster.BuildContext
Service libservice.Service
Container libservice.Service
Gateway libservice.Service
}
type PeeringClusterSize struct {
AcceptingNumServers int
AcceptingNumClients int
DialingNumServers int
DialingNumClients int
}
// BasicPeeringTwoClustersSetup sets up a scenario for testing peering, which consists of
//
// - an accepting cluster with 3 servers and 1 client agent. The client should be used to
// host a service for export: staticServerSvc.
// - a dialing cluster with 1 server and 1 client. The client should be used to host a
// service connecting to staticServerSvc.
// - Create the peering, export the service from accepting cluster, and verify service
// connectivity.
//
// It returns objects of the accepting cluster, dialing cluster, staticServerSvc, and staticClientSvcSidecar
func BasicPeeringTwoClustersSetup(
t *testing.T,
consulImage string,
consulVersion string,
pcs PeeringClusterSize,
peeringThroughMeshgateway bool,
) (*BuiltCluster, *BuiltCluster) {
acceptingCluster, acceptingCtx, acceptingClient := NewCluster(t, &ClusterConfig{
NumServers: pcs.AcceptingNumServers,
NumClients: pcs.AcceptingNumClients,
BuildOpts: &libcluster.BuildOptions{
Datacenter: "dc1",
ConsulImageName: consulImage,
ConsulVersion: consulVersion,
InjectAutoEncryption: true,
},
ApplyDefaultProxySettings: true,
})
dialingCluster, dialingCtx, dialingClient := NewCluster(t, &ClusterConfig{
NumServers: pcs.DialingNumServers,
NumClients: pcs.DialingNumClients,
BuildOpts: &libcluster.BuildOptions{
Datacenter: "dc2",
ConsulImageName: consulImage,
ConsulVersion: consulVersion,
InjectAutoEncryption: true,
},
ApplyDefaultProxySettings: true,
})
// Create the mesh gateway for dataplane traffic and peering control plane traffic (if enabled)
gwCfg := libservice.GatewayConfig{
Name: "mesh",
Kind: "mesh",
}
acceptingClusterGateway, err := libservice.NewGatewayService(context.Background(), gwCfg, acceptingCluster.Clients()[0])
require.NoError(t, err)
dialingClusterGateway, err := libservice.NewGatewayService(context.Background(), gwCfg, dialingCluster.Clients()[0])
require.NoError(t, err)
// Enable peering control plane traffic through mesh gateway
if peeringThroughMeshgateway {
req := &api.MeshConfigEntry{
Peering: &api.PeeringMeshConfig{
PeerThroughMeshGateways: true,
},
}
configCluster := func(cli *api.Client) error {
libassert.CatalogServiceExists(t, cli, "mesh", nil)
ok, _, err := cli.ConfigEntries().Set(req, &api.WriteOptions{})
if !ok {
return fmt.Errorf("config entry is not set")
}
if err != nil {
return fmt.Errorf("error writing config entry: %s", err)
}
return nil
}
err = configCluster(dialingClient)
require.NoError(t, err)
err = configCluster(acceptingClient)
require.NoError(t, err)
}
require.NoError(t, dialingCluster.PeerWithCluster(acceptingClient, AcceptingPeerName, DialingPeerName))
libassert.PeeringStatus(t, acceptingClient, AcceptingPeerName, api.PeeringStateActive)
// libassert.PeeringExports(t, acceptingClient, acceptingPeerName, 1)
// Register a static-server service in acceptingCluster and export to dialing cluster
var serverService, serverSidecarService libservice.Service
{
clientNode := acceptingCluster.Clients()[0]
// Create a service and proxy instance
var err error
// Create a service and proxy instance
serviceOpts := libservice.ServiceOpts{
Name: libservice.StaticServerServiceName,
ID: "static-server",
Meta: map[string]string{"version": ""},
HTTPPort: 8080,
GRPCPort: 8079,
}
serverService, serverSidecarService, err = libservice.CreateAndRegisterStaticServerAndSidecar(clientNode, &serviceOpts)
require.NoError(t, err)
libassert.CatalogServiceExists(t, acceptingClient, libservice.StaticServerServiceName, nil)
libassert.CatalogServiceExists(t, acceptingClient, "static-server-sidecar-proxy", nil)
require.NoError(t, serverService.Export("default", AcceptingPeerName, acceptingClient))
}
// Register a static-client service in dialing cluster and set upstream to static-server service
var clientSidecarService *libservice.ConnectContainer
{
clientNode := dialingCluster.Clients()[0]
// Create a service and proxy instance
var err error
clientSidecarService, err = libservice.CreateAndRegisterStaticClientSidecar(clientNode, DialingPeerName, true, false, nil)
require.NoError(t, err)
libassert.CatalogServiceExists(t, dialingClient, "static-client-sidecar-proxy", nil)
}
_, adminPort := clientSidecarService.GetAdminAddr()
libassert.AssertUpstreamEndpointStatus(t, adminPort, fmt.Sprintf("static-server.default.%s.external", DialingPeerName), "HEALTHY", 1)
_, port := clientSidecarService.GetAddr()
libassert.HTTPServiceEchoes(t, "localhost", port, "")
libassert.AssertFortioName(t, fmt.Sprintf("http://localhost:%d", port), libservice.StaticServerServiceName, "")
return &BuiltCluster{
Cluster: acceptingCluster,
Context: acceptingCtx,
Service: serverSidecarService,
Container: serverSidecarService,
Gateway: acceptingClusterGateway,
},
&BuiltCluster{
Cluster: dialingCluster,
Context: dialingCtx,
Service: nil,
Container: clientSidecarService,
Gateway: dialingClusterGateway,
}
}
type ClusterConfig struct {
NumServers int
NumClients int
ApplyDefaultProxySettings bool
BuildOpts *libcluster.BuildOptions
Cmd string
LogConsumer *TestLogConsumer
// Exposed Ports are available on the cluster's pause container for the purposes
// of adding external communication to the cluster. An example would be a listener
// on a gateway.
ExposedPorts []int
}
func NewCluster(
t *testing.T,
config *ClusterConfig,
) (*libcluster.Cluster, *libcluster.BuildContext, *api.Client) {
return NewClusterWithConfig(t, config, "", "")
}
// NewCluster creates a cluster with peering enabled. It also creates
// and registers a mesh-gateway at the client agent. The API client returned is
// pointed at the client agent.
// - proxy-defaults.protocol = tcp
func NewClusterWithConfig(
t *testing.T,
config *ClusterConfig,
serverHclConfig string,
clientHclConfig string,
) (*libcluster.Cluster, *libcluster.BuildContext, *api.Client) {
var (
cluster *libcluster.Cluster
err error
)
require.NotEmpty(t, config.BuildOpts.Datacenter)
require.True(t, config.NumServers > 0)
opts := libcluster.BuildOptions{
Datacenter: config.BuildOpts.Datacenter,
InjectAutoEncryption: config.BuildOpts.InjectAutoEncryption,
InjectGossipEncryption: true,
AllowHTTPAnyway: true,
ConsulVersion: config.BuildOpts.ConsulVersion,
ACLEnabled: config.BuildOpts.ACLEnabled,
LogStore: config.BuildOpts.LogStore,
}
ctx := libcluster.NewBuildContext(t, opts)
serverConf := libcluster.NewConfigBuilder(ctx).
Bootstrap(config.NumServers).
Peering(true).
ToAgentConfig(t)
t.Logf("%s server config: \n%s", opts.Datacenter, serverConf.JSON)
// optional
if config.LogConsumer != nil {
serverConf.LogConsumer = config.LogConsumer
}
t.Logf("Cluster config:\n%s", serverConf.JSON)
// optional custom cmd
if config.Cmd != "" {
serverConf.Cmd = append(serverConf.Cmd, config.Cmd)
}
if serverHclConfig != "" {
serverConf.MutatebyAgentConfig(serverHclConfig)
}
if config.ExposedPorts != nil {
cluster, err = libcluster.New(t, []libcluster.Config{*serverConf}, config.ExposedPorts...)
} else {
cluster, err = libcluster.NewN(t, *serverConf, config.NumServers)
}
require.NoError(t, err)
// builder generates certs for us, so copy them back
if opts.InjectAutoEncryption {
cluster.CACert = serverConf.CACert
}
var retryJoin []string
for i := 0; i < config.NumServers; i++ {
retryJoin = append(retryJoin, fmt.Sprintf("agent-%d", i))
}
// Add numClients static clients to register the service
configBuilder := libcluster.NewConfigBuilder(ctx).
Client().
Peering(true).
RetryJoin(retryJoin...)
if cluster.TokenBootstrap != "" {
configBuilder.SetACLToken(cluster.TokenBootstrap)
}
clientConf := configBuilder.ToAgentConfig(t)
t.Logf("%s client config: \n%s", opts.Datacenter, clientConf.JSON)
if clientHclConfig != "" {
clientConf.MutatebyAgentConfig(clientHclConfig)
}
require.NoError(t, cluster.AddN(*clientConf, config.NumClients, true))
// Use the client agent as the HTTP endpoint since we will not rotate it in many tests.
var client *api.Client
if config.NumClients > 0 {
clientNode := cluster.Agents[config.NumServers]
client = clientNode.GetClient()
} else {
client = cluster.Agents[0].GetClient()
}
libcluster.WaitForLeader(t, cluster, client)
libcluster.WaitForMembers(t, client, config.NumServers+config.NumClients)
// Default Proxy Settings
if config.ApplyDefaultProxySettings {
ok, err := utils.ApplyDefaultProxySettings(client)
require.NoError(t, err)
require.True(t, ok)
}
return cluster, ctx, client
}
type TestLogConsumer struct {
Msgs []string
}
func (g *TestLogConsumer) Accept(l testcontainers.Log) {
g.Msgs = append(g.Msgs, string(l.Content))
}