mirror of https://github.com/hashicorp/consul
Upgrade test: peering control plane traffic through mesh gateway (#16091)
parent
5fa9ab28dc
commit
ffb81782de
|
@ -2,12 +2,16 @@ package assert
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/url"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/go-cleanhttp"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||||
libservice "github.com/hashicorp/consul/test/integration/consul-container/libs/service"
|
|
||||||
"github.com/hashicorp/consul/test/integration/consul-container/libs/utils"
|
"github.com/hashicorp/consul/test/integration/consul-container/libs/utils"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
@ -24,7 +28,7 @@ func GetEnvoyListenerTCPFilters(t *testing.T, adminPort int) {
|
||||||
}
|
}
|
||||||
|
|
||||||
retry.RunWith(failer(), t, func(r *retry.R) {
|
retry.RunWith(failer(), t, func(r *retry.R) {
|
||||||
dump, err = libservice.GetEnvoyConfigDump(adminPort, "")
|
dump, err = GetEnvoyOutput(adminPort, "config_dump", map[string]string{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.Fatal("could not fetch envoy configuration")
|
r.Fatal("could not fetch envoy configuration")
|
||||||
}
|
}
|
||||||
|
@ -61,18 +65,86 @@ func AssertUpstreamEndpointStatus(t *testing.T, adminPort int, clusterName, heal
|
||||||
}
|
}
|
||||||
|
|
||||||
retry.RunWith(failer(), t, func(r *retry.R) {
|
retry.RunWith(failer(), t, func(r *retry.R) {
|
||||||
clusters, err = libservice.GetEnvoyClusters(adminPort)
|
clusters, err = GetEnvoyOutput(adminPort, "clusters", map[string]string{"format": "json"})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.Fatal("could not fetch envoy configuration")
|
r.Fatal("could not fetch envoy clusters")
|
||||||
}
|
}
|
||||||
|
|
||||||
filter := fmt.Sprintf(`.cluster_statuses[] | select(.name|contains("%s")) | [.host_statuses[].health_status.eds_health_status] | [select(.[] == "%s")] | length`, clusterName, healthStatus)
|
filter := fmt.Sprintf(`.cluster_statuses[] | select(.name|contains("%s")) | [.host_statuses[].health_status.eds_health_status] | [select(.[] == "%s")] | length`, clusterName, healthStatus)
|
||||||
results, err := utils.JQFilter(clusters, filter)
|
results, err := utils.JQFilter(clusters, filter)
|
||||||
require.NoError(r, err, "could not parse envoy configuration")
|
require.NoErrorf(r, err, "could not found cluster name %s", clusterName)
|
||||||
require.Equal(r, count, len(results))
|
require.Equal(r, count, len(results))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AssertEnvoyMetricAtMost assert the filered metric by prefix and metric is >= count
|
||||||
|
func AssertEnvoyMetricAtMost(t *testing.T, adminPort int, prefix, metric string, count int) {
|
||||||
|
var (
|
||||||
|
stats string
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
failer := func() *retry.Timer {
|
||||||
|
return &retry.Timer{Timeout: 30 * time.Second, Wait: 500 * time.Millisecond}
|
||||||
|
}
|
||||||
|
|
||||||
|
retry.RunWith(failer(), t, func(r *retry.R) {
|
||||||
|
stats, err = GetEnvoyOutput(adminPort, "stats", nil)
|
||||||
|
if err != nil {
|
||||||
|
r.Fatal("could not fetch envoy stats")
|
||||||
|
}
|
||||||
|
lines := strings.Split(stats, "\n")
|
||||||
|
err = processMetrics(lines, prefix, metric, func(v int) bool {
|
||||||
|
return v <= count
|
||||||
|
})
|
||||||
|
require.NoError(r, err)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func processMetrics(metrics []string, prefix, metric string, condition func(v int) bool) error {
|
||||||
|
for _, line := range metrics {
|
||||||
|
if strings.Contains(line, prefix) &&
|
||||||
|
strings.Contains(line, metric) {
|
||||||
|
|
||||||
|
metric := strings.Split(line, ":")
|
||||||
|
fmt.Println(metric[1])
|
||||||
|
|
||||||
|
v, err := strconv.Atoi(strings.TrimSpace(metric[1]))
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("err parse metric value %s: %s", metric[1], err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if condition(v) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return fmt.Errorf("error processing stats")
|
||||||
|
}
|
||||||
|
|
||||||
|
// AssertEnvoyMetricAtLeast assert the filered metric by prefix and metric is <= count
|
||||||
|
func AssertEnvoyMetricAtLeast(t *testing.T, adminPort int, prefix, metric string, count int) {
|
||||||
|
var (
|
||||||
|
stats string
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
failer := func() *retry.Timer {
|
||||||
|
return &retry.Timer{Timeout: 30 * time.Second, Wait: 500 * time.Millisecond}
|
||||||
|
}
|
||||||
|
|
||||||
|
retry.RunWith(failer(), t, func(r *retry.R) {
|
||||||
|
stats, err = GetEnvoyOutput(adminPort, "stats", nil)
|
||||||
|
if err != nil {
|
||||||
|
r.Fatal("could not fetch envoy stats")
|
||||||
|
}
|
||||||
|
lines := strings.Split(stats, "\n")
|
||||||
|
|
||||||
|
err = processMetrics(lines, prefix, metric, func(v int) bool {
|
||||||
|
return v >= count
|
||||||
|
})
|
||||||
|
require.NoError(r, err)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// GetEnvoyHTTPrbacFilters validates that proxy was configured with an http connection manager
|
// GetEnvoyHTTPrbacFilters validates that proxy was configured with an http connection manager
|
||||||
// this assertion is currently unused current tests use http protocol
|
// this assertion is currently unused current tests use http protocol
|
||||||
func GetEnvoyHTTPrbacFilters(t *testing.T, port int) {
|
func GetEnvoyHTTPrbacFilters(t *testing.T, port int) {
|
||||||
|
@ -85,7 +157,7 @@ func GetEnvoyHTTPrbacFilters(t *testing.T, port int) {
|
||||||
}
|
}
|
||||||
|
|
||||||
retry.RunWith(failer(), t, func(r *retry.R) {
|
retry.RunWith(failer(), t, func(r *retry.R) {
|
||||||
dump, err = libservice.GetEnvoyConfigDump(port, "")
|
dump, err = GetEnvoyOutput(port, "config_dump", map[string]string{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.Fatal("could not fetch envoy configuration")
|
r.Fatal("could not fetch envoy configuration")
|
||||||
}
|
}
|
||||||
|
@ -117,3 +189,33 @@ func sanitizeResult(s string) []string {
|
||||||
result := strings.Split(strings.ReplaceAll(s, `,`, " "), " ")
|
result := strings.Split(strings.ReplaceAll(s, `,`, " "), " ")
|
||||||
return append(result[:0], result[1:]...)
|
return append(result[:0], result[1:]...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func GetEnvoyOutput(port int, path string, query map[string]string) (string, error) {
|
||||||
|
client := cleanhttp.DefaultClient()
|
||||||
|
var u url.URL
|
||||||
|
u.Host = fmt.Sprintf("localhost:%d", port)
|
||||||
|
u.Scheme = "http"
|
||||||
|
if path != "" {
|
||||||
|
u.Path = path
|
||||||
|
}
|
||||||
|
q := u.Query()
|
||||||
|
for k, v := range query {
|
||||||
|
q.Add(k, v)
|
||||||
|
}
|
||||||
|
if query != nil {
|
||||||
|
u.RawQuery = q.Encode()
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err := client.Get(u.String())
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
defer res.Body.Close()
|
||||||
|
|
||||||
|
body, err := io.ReadAll(res.Body)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
return string(body), nil
|
||||||
|
}
|
||||||
|
|
|
@ -39,6 +39,10 @@ func (g ConnectContainer) GetAddr() (string, int) {
|
||||||
return g.ip, g.appPort
|
return g.ip, g.appPort
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (g ConnectContainer) Restart() error {
|
||||||
|
return fmt.Errorf("Restart Unimplemented by ConnectContainer")
|
||||||
|
}
|
||||||
|
|
||||||
func (g ConnectContainer) GetLogs() (string, error) {
|
func (g ConnectContainer) GetLogs() (string, error) {
|
||||||
rc, err := g.container.Logs(context.Background())
|
rc, err := g.container.Logs(context.Background())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -49,6 +49,10 @@ func (g exampleContainer) GetAddr() (string, int) {
|
||||||
return g.ip, g.httpPort
|
return g.ip, g.httpPort
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (g exampleContainer) Restart() error {
|
||||||
|
return fmt.Errorf("Restart Unimplemented by ConnectContainer")
|
||||||
|
}
|
||||||
|
|
||||||
func (g exampleContainer) GetLogs() (string, error) {
|
func (g exampleContainer) GetLogs() (string, error) {
|
||||||
rc, err := g.container.Logs(context.Background())
|
rc, err := g.container.Logs(context.Background())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -79,6 +79,23 @@ func (g gatewayContainer) GetAdminAddr() (string, int) {
|
||||||
return "localhost", g.adminPort
|
return "localhost", g.adminPort
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (g gatewayContainer) Restart() error {
|
||||||
|
_, err := g.container.State(context.Background())
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error get gateway state %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = g.container.Stop(context.Background(), nil)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error stop gateway %s", err)
|
||||||
|
}
|
||||||
|
err = g.container.Start(context.Background())
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error start gateway %s", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func NewGatewayService(ctx context.Context, name string, kind string, node libcluster.Agent) (Service, error) {
|
func NewGatewayService(ctx context.Context, name string, kind string, node libcluster.Agent) (Service, error) {
|
||||||
nodeConfig := node.GetConfig()
|
nodeConfig := node.GetConfig()
|
||||||
if nodeConfig.ScratchDir == "" {
|
if nodeConfig.ScratchDir == "" {
|
||||||
|
|
|
@ -3,9 +3,6 @@ package service
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
|
||||||
|
|
||||||
"github.com/hashicorp/go-cleanhttp"
|
|
||||||
|
|
||||||
"github.com/hashicorp/consul/api"
|
"github.com/hashicorp/consul/api"
|
||||||
|
|
||||||
|
@ -124,39 +121,3 @@ func CreateAndRegisterStaticClientSidecar(
|
||||||
|
|
||||||
return clientConnectProxy, nil
|
return clientConnectProxy, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetEnvoyConfigDump(port int, filter string) (string, error) {
|
|
||||||
client := cleanhttp.DefaultClient()
|
|
||||||
url := fmt.Sprintf("http://localhost:%d/config_dump?%s", port, filter)
|
|
||||||
|
|
||||||
res, err := client.Get(url)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
defer res.Body.Close()
|
|
||||||
|
|
||||||
body, err := io.ReadAll(res.Body)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
return string(body), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func GetEnvoyClusters(port int) (string, error) {
|
|
||||||
client := cleanhttp.DefaultClient()
|
|
||||||
url := fmt.Sprintf("http://localhost:%d/clusters?format=json", port)
|
|
||||||
|
|
||||||
res, err := client.Get(url)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
defer res.Body.Close()
|
|
||||||
|
|
||||||
body, err := io.ReadAll(res.Body)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
return string(body), nil
|
|
||||||
}
|
|
||||||
|
|
|
@ -14,4 +14,5 @@ type Service interface {
|
||||||
GetServiceName() string
|
GetServiceName() string
|
||||||
Start() (err error)
|
Start() (err error)
|
||||||
Terminate() error
|
Terminate() error
|
||||||
|
Restart() error
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,6 +25,7 @@ type BuiltCluster struct {
|
||||||
Context *libcluster.BuildContext
|
Context *libcluster.BuildContext
|
||||||
Service libservice.Service
|
Service libservice.Service
|
||||||
Container *libservice.ConnectContainer
|
Container *libservice.ConnectContainer
|
||||||
|
Gateway libservice.Service
|
||||||
}
|
}
|
||||||
|
|
||||||
// BasicPeeringTwoClustersSetup sets up a scenario for testing peering, which consists of
|
// BasicPeeringTwoClustersSetup sets up a scenario for testing peering, which consists of
|
||||||
|
@ -50,6 +51,7 @@ func BasicPeeringTwoClustersSetup(
|
||||||
|
|
||||||
// Register an static-server service in acceptingCluster and export to dialing cluster
|
// Register an static-server service in acceptingCluster and export to dialing cluster
|
||||||
var serverSidecarService libservice.Service
|
var serverSidecarService libservice.Service
|
||||||
|
var acceptingClusterGateway libservice.Service
|
||||||
{
|
{
|
||||||
clientNode := acceptingCluster.Clients()[0]
|
clientNode := acceptingCluster.Clients()[0]
|
||||||
|
|
||||||
|
@ -62,10 +64,15 @@ func BasicPeeringTwoClustersSetup(
|
||||||
libassert.CatalogServiceExists(t, acceptingClient, "static-server-sidecar-proxy")
|
libassert.CatalogServiceExists(t, acceptingClient, "static-server-sidecar-proxy")
|
||||||
|
|
||||||
require.NoError(t, serverSidecarService.Export("default", AcceptingPeerName, acceptingClient))
|
require.NoError(t, serverSidecarService.Export("default", AcceptingPeerName, acceptingClient))
|
||||||
|
|
||||||
|
// Create the mesh gateway for dataplane traffic
|
||||||
|
acceptingClusterGateway, err = libservice.NewGatewayService(context.Background(), "mesh", "mesh", clientNode)
|
||||||
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Register an static-client service in dialing cluster and set upstream to static-server service
|
// Register an static-client service in dialing cluster and set upstream to static-server service
|
||||||
var clientSidecarService *libservice.ConnectContainer
|
var clientSidecarService *libservice.ConnectContainer
|
||||||
|
var dialingClusterGateway libservice.Service
|
||||||
{
|
{
|
||||||
clientNode := dialingCluster.Clients()[0]
|
clientNode := dialingCluster.Clients()[0]
|
||||||
|
|
||||||
|
@ -75,6 +82,10 @@ func BasicPeeringTwoClustersSetup(
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
libassert.CatalogServiceExists(t, dialingClient, "static-client-sidecar-proxy")
|
libassert.CatalogServiceExists(t, dialingClient, "static-client-sidecar-proxy")
|
||||||
|
|
||||||
|
// Create the mesh gateway for dataplane traffic
|
||||||
|
dialingClusterGateway, err = libservice.NewGatewayService(context.Background(), "mesh", "mesh", clientNode)
|
||||||
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, adminPort := clientSidecarService.GetAdminAddr()
|
_, adminPort := clientSidecarService.GetAdminAddr()
|
||||||
|
@ -87,12 +98,14 @@ func BasicPeeringTwoClustersSetup(
|
||||||
Context: acceptingCtx,
|
Context: acceptingCtx,
|
||||||
Service: serverSidecarService,
|
Service: serverSidecarService,
|
||||||
Container: nil,
|
Container: nil,
|
||||||
|
Gateway: acceptingClusterGateway,
|
||||||
},
|
},
|
||||||
&BuiltCluster{
|
&BuiltCluster{
|
||||||
Cluster: dialingCluster,
|
Cluster: dialingCluster,
|
||||||
Context: dialingCtx,
|
Context: dialingCtx,
|
||||||
Service: nil,
|
Service: nil,
|
||||||
Container: clientSidecarService,
|
Container: clientSidecarService,
|
||||||
|
Gateway: dialingClusterGateway,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -204,9 +217,5 @@ func NewPeeringCluster(
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.True(t, ok)
|
require.True(t, ok)
|
||||||
|
|
||||||
// Create the mesh gateway for dataplane traffic
|
|
||||||
_, err = libservice.NewGatewayService(context.Background(), "mesh", "mesh", clientNode)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
return cluster, ctx, client
|
return cluster, ctx, client
|
||||||
}
|
}
|
||||||
|
|
|
@ -195,7 +195,7 @@ func verifySidecarHasTwoRootCAs(t *testing.T, sidecar libservice.Service) {
|
||||||
}
|
}
|
||||||
|
|
||||||
retry.RunWith(failer(), t, func(r *retry.R) {
|
retry.RunWith(failer(), t, func(r *retry.R) {
|
||||||
dump, err := libservice.GetEnvoyConfigDump(adminPort, "include_eds")
|
dump, err := libassert.GetEnvoyOutput(adminPort, "config_dump", map[string]string{})
|
||||||
require.NoError(r, err, "could not fetch envoy configuration")
|
require.NoError(r, err, "could not fetch envoy configuration")
|
||||||
|
|
||||||
// Make sure there are two certs in the sidecar
|
// Make sure there are two certs in the sidecar
|
||||||
|
|
|
@ -0,0 +1,121 @@
|
||||||
|
package upgrade
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/api"
|
||||||
|
libassert "github.com/hashicorp/consul/test/integration/consul-container/libs/assert"
|
||||||
|
libservice "github.com/hashicorp/consul/test/integration/consul-container/libs/service"
|
||||||
|
libtopology "github.com/hashicorp/consul/test/integration/consul-container/libs/topology"
|
||||||
|
"github.com/hashicorp/consul/test/integration/consul-container/libs/utils"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestPeering_Upgrade_ControlPlane_MGW verifies the peering control plane traffic go through the mesh gateway
|
||||||
|
// PeerThroughMeshGateways can be inheritted by the upgraded cluster.
|
||||||
|
//
|
||||||
|
// 1. Create the basic peering topology of one dialing cluster and one accepting cluster
|
||||||
|
// 2. Set PeerThroughMeshGateways = true
|
||||||
|
// 3. Upgrade both clusters
|
||||||
|
// 4. Verify the peering is re-established through mesh gateway
|
||||||
|
func TestPeering_Upgrade_ControlPlane_MGW(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
type testcase struct {
|
||||||
|
oldversion string
|
||||||
|
targetVersion string
|
||||||
|
}
|
||||||
|
tcs := []testcase{
|
||||||
|
// {
|
||||||
|
// TODO: API changed from 1.13 to 1.14 in , PeerName to Peer
|
||||||
|
// exportConfigEntry
|
||||||
|
// oldversion: "1.13",
|
||||||
|
// targetVersion: *utils.TargetVersion,
|
||||||
|
// },
|
||||||
|
{
|
||||||
|
oldversion: "1.14",
|
||||||
|
targetVersion: utils.TargetVersion,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
run := func(t *testing.T, tc testcase) {
|
||||||
|
accepting, dialing := libtopology.BasicPeeringTwoClustersSetup(t, tc.oldversion)
|
||||||
|
var (
|
||||||
|
acceptingCluster = accepting.Cluster
|
||||||
|
dialingCluster = dialing.Cluster
|
||||||
|
)
|
||||||
|
|
||||||
|
dialingClient, err := dialingCluster.GetClient(nil, false)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
acceptingClient, err := acceptingCluster.GetClient(nil, false)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Enable peering control plane traffic through mesh gateway
|
||||||
|
req := &api.MeshConfigEntry{
|
||||||
|
Peering: &api.PeeringMeshConfig{
|
||||||
|
PeerThroughMeshGateways: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
ok, _, err := dialingClient.ConfigEntries().Set(req, &api.WriteOptions{})
|
||||||
|
require.True(t, ok)
|
||||||
|
require.NoError(t, err)
|
||||||
|
ok, _, err = acceptingClient.ConfigEntries().Set(req, &api.WriteOptions{})
|
||||||
|
require.True(t, ok)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Verify control plane endpoints and traffic in gateway
|
||||||
|
_, gatewayAdminPort := dialing.Gateway.GetAdminAddr()
|
||||||
|
libassert.AssertUpstreamEndpointStatus(t, gatewayAdminPort, "server.dc1.peering", "HEALTHY", 1)
|
||||||
|
libassert.AssertUpstreamEndpointStatus(t, gatewayAdminPort, "server.dc2.peering", "HEALTHY", 1)
|
||||||
|
libassert.AssertEnvoyMetricAtLeast(t, gatewayAdminPort,
|
||||||
|
"cluster.static-server.default.default.accepting-to-dialer.external",
|
||||||
|
"upstream_cx_total", 1)
|
||||||
|
|
||||||
|
// Upgrade the accepting cluster and assert peering is still ACTIVE
|
||||||
|
require.NoError(t, acceptingCluster.StandardUpgrade(t, context.Background(), tc.targetVersion))
|
||||||
|
libassert.PeeringStatus(t, acceptingClient, libtopology.AcceptingPeerName, api.PeeringStateActive)
|
||||||
|
libassert.PeeringStatus(t, dialingClient, libtopology.DialingPeerName, api.PeeringStateActive)
|
||||||
|
|
||||||
|
require.NoError(t, dialingCluster.StandardUpgrade(t, context.Background(), tc.targetVersion))
|
||||||
|
libassert.PeeringStatus(t, acceptingClient, libtopology.AcceptingPeerName, api.PeeringStateActive)
|
||||||
|
libassert.PeeringStatus(t, dialingClient, libtopology.DialingPeerName, api.PeeringStateActive)
|
||||||
|
|
||||||
|
// POST upgrade validation
|
||||||
|
// - Restarted mesh gateway can receive consul generated configuration
|
||||||
|
// - control plane traffic is through mesh gateway
|
||||||
|
// - Register a new static-client service in dialing cluster and
|
||||||
|
// - set upstream to static-server service in peered cluster
|
||||||
|
|
||||||
|
// Restart the gateway
|
||||||
|
err = dialing.Gateway.Restart()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Restarted gateway should not have any measurement on data plane traffic
|
||||||
|
libassert.AssertEnvoyMetricAtMost(t, gatewayAdminPort,
|
||||||
|
"cluster.static-server.default.default.accepting-to-dialer.external",
|
||||||
|
"upstream_cx_total", 0)
|
||||||
|
// control plane metrics should be observed
|
||||||
|
libassert.AssertEnvoyMetricAtLeast(t, gatewayAdminPort,
|
||||||
|
"cluster.server.dc1.peering",
|
||||||
|
"upstream_cx_total", 1)
|
||||||
|
|
||||||
|
clientSidecarService, err := libservice.CreateAndRegisterStaticClientSidecar(dialingCluster.Servers()[0], libtopology.DialingPeerName, true)
|
||||||
|
require.NoError(t, err)
|
||||||
|
_, port := clientSidecarService.GetAddr()
|
||||||
|
_, adminPort := clientSidecarService.GetAdminAddr()
|
||||||
|
libassert.AssertUpstreamEndpointStatus(t, adminPort, fmt.Sprintf("static-server.default.%s.external", libtopology.DialingPeerName), "HEALTHY", 1)
|
||||||
|
libassert.HTTPServiceEchoes(t, "localhost", port, "")
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tcs {
|
||||||
|
t.Run(fmt.Sprintf("upgrade from %s to %s", tc.oldversion, tc.targetVersion),
|
||||||
|
func(t *testing.T) {
|
||||||
|
run(t, tc)
|
||||||
|
})
|
||||||
|
// time.Sleep(3 * time.Second)
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue