diff --git a/test/integration/consul-container/libs/assert/envoy.go b/test/integration/consul-container/libs/assert/envoy.go index 367656dfb0..1a562542f4 100644 --- a/test/integration/consul-container/libs/assert/envoy.go +++ b/test/integration/consul-container/libs/assert/envoy.go @@ -2,12 +2,16 @@ package assert import ( "fmt" + "io" + "net/url" + "strconv" "strings" "testing" "time" + "github.com/hashicorp/go-cleanhttp" + "github.com/hashicorp/consul/sdk/testutil/retry" - libservice "github.com/hashicorp/consul/test/integration/consul-container/libs/service" "github.com/hashicorp/consul/test/integration/consul-container/libs/utils" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -24,7 +28,7 @@ func GetEnvoyListenerTCPFilters(t *testing.T, adminPort int) { } retry.RunWith(failer(), t, func(r *retry.R) { - dump, err = libservice.GetEnvoyConfigDump(adminPort, "") + dump, err = GetEnvoyOutput(adminPort, "config_dump", map[string]string{}) if err != nil { r.Fatal("could not fetch envoy configuration") } @@ -61,18 +65,86 @@ func AssertUpstreamEndpointStatus(t *testing.T, adminPort int, clusterName, heal } retry.RunWith(failer(), t, func(r *retry.R) { - clusters, err = libservice.GetEnvoyClusters(adminPort) + clusters, err = GetEnvoyOutput(adminPort, "clusters", map[string]string{"format": "json"}) if err != nil { - r.Fatal("could not fetch envoy configuration") + r.Fatal("could not fetch envoy clusters") } filter := fmt.Sprintf(`.cluster_statuses[] | select(.name|contains("%s")) | [.host_statuses[].health_status.eds_health_status] | [select(.[] == "%s")] | length`, clusterName, healthStatus) results, err := utils.JQFilter(clusters, filter) - require.NoError(r, err, "could not parse envoy configuration") + require.NoErrorf(r, err, "could not found cluster name %s", clusterName) require.Equal(r, count, len(results)) }) } +// AssertEnvoyMetricAtMost assert the filered metric by prefix and metric is >= count +func AssertEnvoyMetricAtMost(t *testing.T, adminPort int, prefix, metric string, count int) { + var ( + stats string + err error + ) + failer := func() *retry.Timer { + return &retry.Timer{Timeout: 30 * time.Second, Wait: 500 * time.Millisecond} + } + + retry.RunWith(failer(), t, func(r *retry.R) { + stats, err = GetEnvoyOutput(adminPort, "stats", nil) + if err != nil { + r.Fatal("could not fetch envoy stats") + } + lines := strings.Split(stats, "\n") + err = processMetrics(lines, prefix, metric, func(v int) bool { + return v <= count + }) + require.NoError(r, err) + }) +} + +func processMetrics(metrics []string, prefix, metric string, condition func(v int) bool) error { + for _, line := range metrics { + if strings.Contains(line, prefix) && + strings.Contains(line, metric) { + + metric := strings.Split(line, ":") + fmt.Println(metric[1]) + + v, err := strconv.Atoi(strings.TrimSpace(metric[1])) + if err != nil { + return fmt.Errorf("err parse metric value %s: %s", metric[1], err) + } + + if condition(v) { + return nil + } + } + } + return fmt.Errorf("error processing stats") +} + +// AssertEnvoyMetricAtLeast assert the filered metric by prefix and metric is <= count +func AssertEnvoyMetricAtLeast(t *testing.T, adminPort int, prefix, metric string, count int) { + var ( + stats string + err error + ) + failer := func() *retry.Timer { + return &retry.Timer{Timeout: 30 * time.Second, Wait: 500 * time.Millisecond} + } + + retry.RunWith(failer(), t, func(r *retry.R) { + stats, err = GetEnvoyOutput(adminPort, "stats", nil) + if err != nil { + r.Fatal("could not fetch envoy stats") + } + lines := strings.Split(stats, "\n") + + err = processMetrics(lines, prefix, metric, func(v int) bool { + return v >= count + }) + require.NoError(r, err) + }) +} + // GetEnvoyHTTPrbacFilters validates that proxy was configured with an http connection manager // this assertion is currently unused current tests use http protocol func GetEnvoyHTTPrbacFilters(t *testing.T, port int) { @@ -85,7 +157,7 @@ func GetEnvoyHTTPrbacFilters(t *testing.T, port int) { } retry.RunWith(failer(), t, func(r *retry.R) { - dump, err = libservice.GetEnvoyConfigDump(port, "") + dump, err = GetEnvoyOutput(port, "config_dump", map[string]string{}) if err != nil { r.Fatal("could not fetch envoy configuration") } @@ -117,3 +189,33 @@ func sanitizeResult(s string) []string { result := strings.Split(strings.ReplaceAll(s, `,`, " "), " ") return append(result[:0], result[1:]...) } + +func GetEnvoyOutput(port int, path string, query map[string]string) (string, error) { + client := cleanhttp.DefaultClient() + var u url.URL + u.Host = fmt.Sprintf("localhost:%d", port) + u.Scheme = "http" + if path != "" { + u.Path = path + } + q := u.Query() + for k, v := range query { + q.Add(k, v) + } + if query != nil { + u.RawQuery = q.Encode() + } + + res, err := client.Get(u.String()) + if err != nil { + return "", err + } + defer res.Body.Close() + + body, err := io.ReadAll(res.Body) + if err != nil { + return "", err + } + + return string(body), nil +} diff --git a/test/integration/consul-container/libs/service/connect.go b/test/integration/consul-container/libs/service/connect.go index 80e9498bbe..9763e372ff 100644 --- a/test/integration/consul-container/libs/service/connect.go +++ b/test/integration/consul-container/libs/service/connect.go @@ -39,6 +39,10 @@ func (g ConnectContainer) GetAddr() (string, int) { return g.ip, g.appPort } +func (g ConnectContainer) Restart() error { + return fmt.Errorf("Restart Unimplemented by ConnectContainer") +} + func (g ConnectContainer) GetLogs() (string, error) { rc, err := g.container.Logs(context.Background()) if err != nil { diff --git a/test/integration/consul-container/libs/service/examples.go b/test/integration/consul-container/libs/service/examples.go index a7181def4c..82c0136080 100644 --- a/test/integration/consul-container/libs/service/examples.go +++ b/test/integration/consul-container/libs/service/examples.go @@ -49,6 +49,10 @@ func (g exampleContainer) GetAddr() (string, int) { return g.ip, g.httpPort } +func (g exampleContainer) Restart() error { + return fmt.Errorf("Restart Unimplemented by ConnectContainer") +} + func (g exampleContainer) GetLogs() (string, error) { rc, err := g.container.Logs(context.Background()) if err != nil { diff --git a/test/integration/consul-container/libs/service/gateway.go b/test/integration/consul-container/libs/service/gateway.go index 90a79fe1a7..d94f5d1aa0 100644 --- a/test/integration/consul-container/libs/service/gateway.go +++ b/test/integration/consul-container/libs/service/gateway.go @@ -79,6 +79,23 @@ func (g gatewayContainer) GetAdminAddr() (string, int) { return "localhost", g.adminPort } +func (g gatewayContainer) Restart() error { + _, err := g.container.State(context.Background()) + if err != nil { + return fmt.Errorf("error get gateway state %s", err) + } + + err = g.container.Stop(context.Background(), nil) + if err != nil { + return fmt.Errorf("error stop gateway %s", err) + } + err = g.container.Start(context.Background()) + if err != nil { + return fmt.Errorf("error start gateway %s", err) + } + return nil +} + func NewGatewayService(ctx context.Context, name string, kind string, node libcluster.Agent) (Service, error) { nodeConfig := node.GetConfig() if nodeConfig.ScratchDir == "" { diff --git a/test/integration/consul-container/libs/service/helpers.go b/test/integration/consul-container/libs/service/helpers.go index 21c06a3a21..7f7174c46c 100644 --- a/test/integration/consul-container/libs/service/helpers.go +++ b/test/integration/consul-container/libs/service/helpers.go @@ -3,9 +3,6 @@ package service import ( "context" "fmt" - "io" - - "github.com/hashicorp/go-cleanhttp" "github.com/hashicorp/consul/api" @@ -124,39 +121,3 @@ func CreateAndRegisterStaticClientSidecar( return clientConnectProxy, nil } - -func GetEnvoyConfigDump(port int, filter string) (string, error) { - client := cleanhttp.DefaultClient() - url := fmt.Sprintf("http://localhost:%d/config_dump?%s", port, filter) - - res, err := client.Get(url) - if err != nil { - return "", err - } - defer res.Body.Close() - - body, err := io.ReadAll(res.Body) - if err != nil { - return "", err - } - - return string(body), nil -} - -func GetEnvoyClusters(port int) (string, error) { - client := cleanhttp.DefaultClient() - url := fmt.Sprintf("http://localhost:%d/clusters?format=json", port) - - res, err := client.Get(url) - if err != nil { - return "", err - } - defer res.Body.Close() - - body, err := io.ReadAll(res.Body) - if err != nil { - return "", err - } - - return string(body), nil -} diff --git a/test/integration/consul-container/libs/service/service.go b/test/integration/consul-container/libs/service/service.go index 305182d73f..75f85a85bf 100644 --- a/test/integration/consul-container/libs/service/service.go +++ b/test/integration/consul-container/libs/service/service.go @@ -14,4 +14,5 @@ type Service interface { GetServiceName() string Start() (err error) Terminate() error + Restart() error } diff --git a/test/integration/consul-container/libs/topology/peering_topology.go b/test/integration/consul-container/libs/topology/peering_topology.go index 5d60ae539d..bfe5489f24 100644 --- a/test/integration/consul-container/libs/topology/peering_topology.go +++ b/test/integration/consul-container/libs/topology/peering_topology.go @@ -25,6 +25,7 @@ type BuiltCluster struct { Context *libcluster.BuildContext Service libservice.Service Container *libservice.ConnectContainer + Gateway libservice.Service } // BasicPeeringTwoClustersSetup sets up a scenario for testing peering, which consists of @@ -50,6 +51,7 @@ func BasicPeeringTwoClustersSetup( // Register an static-server service in acceptingCluster and export to dialing cluster var serverSidecarService libservice.Service + var acceptingClusterGateway libservice.Service { clientNode := acceptingCluster.Clients()[0] @@ -62,10 +64,15 @@ func BasicPeeringTwoClustersSetup( libassert.CatalogServiceExists(t, acceptingClient, "static-server-sidecar-proxy") require.NoError(t, serverSidecarService.Export("default", AcceptingPeerName, acceptingClient)) + + // Create the mesh gateway for dataplane traffic + acceptingClusterGateway, err = libservice.NewGatewayService(context.Background(), "mesh", "mesh", clientNode) + require.NoError(t, err) } // Register an static-client service in dialing cluster and set upstream to static-server service var clientSidecarService *libservice.ConnectContainer + var dialingClusterGateway libservice.Service { clientNode := dialingCluster.Clients()[0] @@ -75,6 +82,10 @@ func BasicPeeringTwoClustersSetup( require.NoError(t, err) libassert.CatalogServiceExists(t, dialingClient, "static-client-sidecar-proxy") + + // Create the mesh gateway for dataplane traffic + dialingClusterGateway, err = libservice.NewGatewayService(context.Background(), "mesh", "mesh", clientNode) + require.NoError(t, err) } _, adminPort := clientSidecarService.GetAdminAddr() @@ -87,12 +98,14 @@ func BasicPeeringTwoClustersSetup( Context: acceptingCtx, Service: serverSidecarService, Container: nil, + Gateway: acceptingClusterGateway, }, &BuiltCluster{ Cluster: dialingCluster, Context: dialingCtx, Service: nil, Container: clientSidecarService, + Gateway: dialingClusterGateway, } } @@ -204,9 +217,5 @@ func NewPeeringCluster( require.NoError(t, err) require.True(t, ok) - // Create the mesh gateway for dataplane traffic - _, err = libservice.NewGatewayService(context.Background(), "mesh", "mesh", clientNode) - require.NoError(t, err) - return cluster, ctx, client } diff --git a/test/integration/consul-container/test/peering/rotate_server_and_ca_then_fail_test.go b/test/integration/consul-container/test/peering/rotate_server_and_ca_then_fail_test.go index a460e99118..7172f4def9 100644 --- a/test/integration/consul-container/test/peering/rotate_server_and_ca_then_fail_test.go +++ b/test/integration/consul-container/test/peering/rotate_server_and_ca_then_fail_test.go @@ -195,7 +195,7 @@ func verifySidecarHasTwoRootCAs(t *testing.T, sidecar libservice.Service) { } retry.RunWith(failer(), t, func(r *retry.R) { - dump, err := libservice.GetEnvoyConfigDump(adminPort, "include_eds") + dump, err := libassert.GetEnvoyOutput(adminPort, "config_dump", map[string]string{}) require.NoError(r, err, "could not fetch envoy configuration") // Make sure there are two certs in the sidecar diff --git a/test/integration/consul-container/test/upgrade/peer_control_plane_mgw_test.go b/test/integration/consul-container/test/upgrade/peer_control_plane_mgw_test.go new file mode 100644 index 0000000000..e62f8b2db9 --- /dev/null +++ b/test/integration/consul-container/test/upgrade/peer_control_plane_mgw_test.go @@ -0,0 +1,121 @@ +package upgrade + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/api" + libassert "github.com/hashicorp/consul/test/integration/consul-container/libs/assert" + libservice "github.com/hashicorp/consul/test/integration/consul-container/libs/service" + libtopology "github.com/hashicorp/consul/test/integration/consul-container/libs/topology" + "github.com/hashicorp/consul/test/integration/consul-container/libs/utils" +) + +// TestPeering_Upgrade_ControlPlane_MGW verifies the peering control plane traffic go through the mesh gateway +// PeerThroughMeshGateways can be inheritted by the upgraded cluster. +// +// 1. Create the basic peering topology of one dialing cluster and one accepting cluster +// 2. Set PeerThroughMeshGateways = true +// 3. Upgrade both clusters +// 4. Verify the peering is re-established through mesh gateway +func TestPeering_Upgrade_ControlPlane_MGW(t *testing.T) { + t.Parallel() + + type testcase struct { + oldversion string + targetVersion string + } + tcs := []testcase{ + // { + // TODO: API changed from 1.13 to 1.14 in , PeerName to Peer + // exportConfigEntry + // oldversion: "1.13", + // targetVersion: *utils.TargetVersion, + // }, + { + oldversion: "1.14", + targetVersion: utils.TargetVersion, + }, + } + + run := func(t *testing.T, tc testcase) { + accepting, dialing := libtopology.BasicPeeringTwoClustersSetup(t, tc.oldversion) + var ( + acceptingCluster = accepting.Cluster + dialingCluster = dialing.Cluster + ) + + dialingClient, err := dialingCluster.GetClient(nil, false) + require.NoError(t, err) + + acceptingClient, err := acceptingCluster.GetClient(nil, false) + require.NoError(t, err) + + // Enable peering control plane traffic through mesh gateway + req := &api.MeshConfigEntry{ + Peering: &api.PeeringMeshConfig{ + PeerThroughMeshGateways: true, + }, + } + ok, _, err := dialingClient.ConfigEntries().Set(req, &api.WriteOptions{}) + require.True(t, ok) + require.NoError(t, err) + ok, _, err = acceptingClient.ConfigEntries().Set(req, &api.WriteOptions{}) + require.True(t, ok) + require.NoError(t, err) + + // Verify control plane endpoints and traffic in gateway + _, gatewayAdminPort := dialing.Gateway.GetAdminAddr() + libassert.AssertUpstreamEndpointStatus(t, gatewayAdminPort, "server.dc1.peering", "HEALTHY", 1) + libassert.AssertUpstreamEndpointStatus(t, gatewayAdminPort, "server.dc2.peering", "HEALTHY", 1) + libassert.AssertEnvoyMetricAtLeast(t, gatewayAdminPort, + "cluster.static-server.default.default.accepting-to-dialer.external", + "upstream_cx_total", 1) + + // Upgrade the accepting cluster and assert peering is still ACTIVE + require.NoError(t, acceptingCluster.StandardUpgrade(t, context.Background(), tc.targetVersion)) + libassert.PeeringStatus(t, acceptingClient, libtopology.AcceptingPeerName, api.PeeringStateActive) + libassert.PeeringStatus(t, dialingClient, libtopology.DialingPeerName, api.PeeringStateActive) + + require.NoError(t, dialingCluster.StandardUpgrade(t, context.Background(), tc.targetVersion)) + libassert.PeeringStatus(t, acceptingClient, libtopology.AcceptingPeerName, api.PeeringStateActive) + libassert.PeeringStatus(t, dialingClient, libtopology.DialingPeerName, api.PeeringStateActive) + + // POST upgrade validation + // - Restarted mesh gateway can receive consul generated configuration + // - control plane traffic is through mesh gateway + // - Register a new static-client service in dialing cluster and + // - set upstream to static-server service in peered cluster + + // Restart the gateway + err = dialing.Gateway.Restart() + require.NoError(t, err) + + // Restarted gateway should not have any measurement on data plane traffic + libassert.AssertEnvoyMetricAtMost(t, gatewayAdminPort, + "cluster.static-server.default.default.accepting-to-dialer.external", + "upstream_cx_total", 0) + // control plane metrics should be observed + libassert.AssertEnvoyMetricAtLeast(t, gatewayAdminPort, + "cluster.server.dc1.peering", + "upstream_cx_total", 1) + + clientSidecarService, err := libservice.CreateAndRegisterStaticClientSidecar(dialingCluster.Servers()[0], libtopology.DialingPeerName, true) + require.NoError(t, err) + _, port := clientSidecarService.GetAddr() + _, adminPort := clientSidecarService.GetAdminAddr() + libassert.AssertUpstreamEndpointStatus(t, adminPort, fmt.Sprintf("static-server.default.%s.external", libtopology.DialingPeerName), "HEALTHY", 1) + libassert.HTTPServiceEchoes(t, "localhost", port, "") + } + + for _, tc := range tcs { + t.Run(fmt.Sprintf("upgrade from %s to %s", tc.oldversion, tc.targetVersion), + func(t *testing.T) { + run(t, tc) + }) + // time.Sleep(3 * time.Second) + } +}