backport of commit 7e236479c0 (#16272)

Co-authored-by: cskh <hui.kang@hashicorp.com>
pull/16384/head
hc-github-team-consul-core 2023-02-22 15:58:17 -05:00 committed by GitHub
parent 76f2bc5c4c
commit 8ba11d7380
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 67 additions and 29 deletions

View File

@ -132,7 +132,7 @@ func AssertEnvoyMetricAtLeast(t *testing.T, adminPort int, prefix, metric string
err error
)
failer := func() *retry.Timer {
return &retry.Timer{Timeout: 30 * time.Second, Wait: 500 * time.Millisecond}
return &retry.Timer{Timeout: 60 * time.Second, Wait: 500 * time.Millisecond}
}
retry.RunWith(failer(), t, func(r *retry.R) {

View File

@ -112,6 +112,13 @@ func (g ConnectContainer) Start() error {
return g.container.Start(g.ctx)
}
func (g ConnectContainer) Stop() error {
if g.container == nil {
return fmt.Errorf("container has not been initialized")
}
return g.container.Stop(context.Background(), nil)
}
func (g ConnectContainer) Terminate() error {
return cluster.TerminateContainer(g.ctx, g.container, true)
}

View File

@ -105,6 +105,13 @@ func (g exampleContainer) Start() error {
return g.container.Start(context.Background())
}
func (g exampleContainer) Stop() error {
if g.container == nil {
return fmt.Errorf("container has not been initialized")
}
return g.container.Stop(context.Background(), nil)
}
func (c exampleContainer) Terminate() error {
return cluster.TerminateContainer(c.ctx, c.container, true)
}

View File

@ -90,6 +90,13 @@ func (g gatewayContainer) Start() error {
return g.container.Start(context.Background())
}
func (g gatewayContainer) Stop() error {
if g.container == nil {
return fmt.Errorf("container has not been initialized")
}
return g.container.Stop(context.Background(), nil)
}
func (c gatewayContainer) Terminate() error {
return cluster.TerminateContainer(c.ctx, c.container, true)
}

View File

@ -19,6 +19,7 @@ type Service interface {
GetName() string
GetServiceName() string
Start() (err error)
Stop() (err error)
Terminate() error
Restart() error
GetStatus() (string, error)

View File

@ -41,6 +41,7 @@ type BuiltCluster struct {
func BasicPeeringTwoClustersSetup(
t *testing.T,
consulVersion string,
peeringThroughMeshgateway bool,
) (*BuiltCluster, *BuiltCluster) {
// acceptingCluster, acceptingCtx, acceptingClient := NewPeeringCluster(t, "dc1", 3, consulVersion, true)
acceptingCluster, acceptingCtx, acceptingClient := NewPeeringCluster(t, 3, &libcluster.BuildOptions{
@ -53,6 +54,38 @@ func BasicPeeringTwoClustersSetup(
ConsulVersion: consulVersion,
InjectAutoEncryption: true,
})
// Create the mesh gateway for dataplane traffic and peering control plane traffic (if enabled)
acceptingClusterGateway, err := libservice.NewGatewayService(context.Background(), "mesh", "mesh", acceptingCluster.Clients()[0])
require.NoError(t, err)
dialingClusterGateway, err := libservice.NewGatewayService(context.Background(), "mesh", "mesh", dialingCluster.Clients()[0])
require.NoError(t, err)
// Enable peering control plane traffic through mesh gateway
if peeringThroughMeshgateway {
req := &api.MeshConfigEntry{
Peering: &api.PeeringMeshConfig{
PeerThroughMeshGateways: true,
},
}
configCluster := func(cli *api.Client) error {
libassert.CatalogServiceExists(t, cli, "mesh")
ok, _, err := cli.ConfigEntries().Set(req, &api.WriteOptions{})
if !ok {
return fmt.Errorf("config entry is not set")
}
if err != nil {
return fmt.Errorf("error writing config entry: %s", err)
}
return nil
}
err = configCluster(dialingClient)
require.NoError(t, err)
err = configCluster(acceptingClient)
require.NoError(t, err)
}
require.NoError(t, dialingCluster.PeerWithCluster(acceptingClient, AcceptingPeerName, DialingPeerName))
libassert.PeeringStatus(t, acceptingClient, AcceptingPeerName, api.PeeringStateActive)
@ -60,7 +93,6 @@ func BasicPeeringTwoClustersSetup(
// Register an static-server service in acceptingCluster and export to dialing cluster
var serverService, serverSidecarService libservice.Service
var acceptingClusterGateway libservice.Service
{
clientNode := acceptingCluster.Clients()[0]
@ -81,15 +113,10 @@ func BasicPeeringTwoClustersSetup(
libassert.CatalogServiceExists(t, acceptingClient, "static-server-sidecar-proxy")
require.NoError(t, serverService.Export("default", AcceptingPeerName, acceptingClient))
// Create the mesh gateway for dataplane traffic
acceptingClusterGateway, err = libservice.NewGatewayService(context.Background(), "mesh", "mesh", clientNode)
require.NoError(t, err)
}
// Register an static-client service in dialing cluster and set upstream to static-server service
var clientSidecarService *libservice.ConnectContainer
var dialingClusterGateway libservice.Service
{
clientNode := dialingCluster.Clients()[0]
@ -100,9 +127,6 @@ func BasicPeeringTwoClustersSetup(
libassert.CatalogServiceExists(t, dialingClient, "static-client-sidecar-proxy")
// Create the mesh gateway for dataplane traffic
dialingClusterGateway, err = libservice.NewGatewayService(context.Background(), "mesh", "mesh", clientNode)
require.NoError(t, err)
}
_, adminPort := clientSidecarService.GetAdminAddr()

View File

@ -50,7 +50,7 @@ import (
func TestPeering_RotateServerAndCAThenFail_(t *testing.T) {
t.Parallel()
accepting, dialing := libtopology.BasicPeeringTwoClustersSetup(t, utils.TargetVersion)
accepting, dialing := libtopology.BasicPeeringTwoClustersSetup(t, utils.TargetVersion, false)
var (
acceptingCluster = accepting.Cluster
dialingCluster = dialing.Cluster

View File

@ -42,7 +42,7 @@ func TestPeering_Upgrade_ControlPlane_MGW(t *testing.T) {
}
run := func(t *testing.T, tc testcase) {
accepting, dialing := libtopology.BasicPeeringTwoClustersSetup(t, tc.oldversion)
accepting, dialing := libtopology.BasicPeeringTwoClustersSetup(t, tc.oldversion, true)
var (
acceptingCluster = accepting.Cluster
dialingCluster = dialing.Cluster
@ -54,19 +54,6 @@ func TestPeering_Upgrade_ControlPlane_MGW(t *testing.T) {
acceptingClient, err := acceptingCluster.GetClient(nil, false)
require.NoError(t, err)
// Enable peering control plane traffic through mesh gateway
req := &api.MeshConfigEntry{
Peering: &api.PeeringMeshConfig{
PeerThroughMeshGateways: true,
},
}
ok, _, err := dialingClient.ConfigEntries().Set(req, &api.WriteOptions{})
require.True(t, ok)
require.NoError(t, err)
ok, _, err = acceptingClient.ConfigEntries().Set(req, &api.WriteOptions{})
require.True(t, ok)
require.NoError(t, err)
// Verify control plane endpoints and traffic in gateway
_, gatewayAdminPort := dialing.Gateway.GetAdminAddr()
libassert.AssertUpstreamEndpointStatus(t, gatewayAdminPort, "server.dc1.peering", "HEALTHY", 1)
@ -74,6 +61,9 @@ func TestPeering_Upgrade_ControlPlane_MGW(t *testing.T) {
libassert.AssertEnvoyMetricAtLeast(t, gatewayAdminPort,
"cluster.static-server.default.default.accepting-to-dialer.external",
"upstream_cx_total", 1)
libassert.AssertEnvoyMetricAtLeast(t, gatewayAdminPort,
"cluster.server.dc1.peering",
"upstream_cx_total", 1)
// Upgrade the accepting cluster and assert peering is still ACTIVE
require.NoError(t, acceptingCluster.StandardUpgrade(t, context.Background(), tc.targetVersion))
@ -90,11 +80,12 @@ func TestPeering_Upgrade_ControlPlane_MGW(t *testing.T) {
// - Register a new static-client service in dialing cluster and
// - set upstream to static-server service in peered cluster
// Restart the gateway & proxy sidecar
// Stop the accepting gateway and restart dialing gateway
// to force peering control plane traffic through dialing mesh gateway
require.NoError(t, accepting.Gateway.Stop())
require.NoError(t, dialing.Gateway.Restart())
require.NoError(t, dialing.Container.Restart())
// Restarted gateway should not have any measurement on data plane traffic
// Restarted dialing gateway should not have any measurement on data plane traffic
libassert.AssertEnvoyMetricAtMost(t, gatewayAdminPort,
"cluster.static-server.default.default.accepting-to-dialer.external",
"upstream_cx_total", 0)
@ -102,6 +93,7 @@ func TestPeering_Upgrade_ControlPlane_MGW(t *testing.T) {
libassert.AssertEnvoyMetricAtLeast(t, gatewayAdminPort,
"cluster.server.dc1.peering",
"upstream_cx_total", 1)
require.NoError(t, accepting.Gateway.Start())
clientSidecarService, err := libservice.CreateAndRegisterStaticClientSidecar(dialingCluster.Servers()[0], libtopology.DialingPeerName, true)
require.NoError(t, err)

View File

@ -221,7 +221,7 @@ func TestPeering_UpgradeToTarget_fromLatest(t *testing.T) {
}
run := func(t *testing.T, tc testcase) {
accepting, dialing := libtopology.BasicPeeringTwoClustersSetup(t, tc.oldversion)
accepting, dialing := libtopology.BasicPeeringTwoClustersSetup(t, tc.oldversion, false)
var (
acceptingCluster = accepting.Cluster
dialingCluster = dialing.Cluster