From ab5dac3414fc2cf40bf294677fe2403d70771715 Mon Sep 17 00:00:00 2001 From: cskh Date: Mon, 13 Feb 2023 14:09:12 -0500 Subject: [PATCH] upgrade test: peering with http router config entry (#16231) * upgrade test: peering with http router config entry --- .../consul-container/libs/assert/envoy.go | 11 ++- .../consul-container/libs/assert/service.go | 34 +++++-- .../consul-container/libs/service/connect.go | 6 +- .../consul-container/libs/service/gateway.go | 3 + .../consul-container/libs/service/helpers.go | 7 +- .../libs/topology/peering_topology.go | 11 +-- ...t.go => peering_control_plane_mgw_test.go} | 0 ...eers_http_test.go => peering_http_test.go} | 88 +++++++++++++++++-- 8 files changed, 129 insertions(+), 31 deletions(-) rename test/integration/consul-container/test/upgrade/{peer_control_plane_mgw_test.go => peering_control_plane_mgw_test.go} (100%) rename test/integration/consul-container/test/upgrade/{peers_http_test.go => peering_http_test.go} (52%) diff --git a/test/integration/consul-container/libs/assert/envoy.go b/test/integration/consul-container/libs/assert/envoy.go index 774633f58c..e62118c4f1 100644 --- a/test/integration/consul-container/libs/assert/envoy.go +++ b/test/integration/consul-container/libs/assert/envoy.go @@ -98,23 +98,26 @@ func AssertEnvoyMetricAtMost(t *testing.T, adminPort int, prefix, metric string, } func processMetrics(metrics []string, prefix, metric string, condition func(v int) bool) error { + var err error for _, line := range metrics { if strings.Contains(line, prefix) && strings.Contains(line, metric) { - + var value int metric := strings.Split(line, ":") - v, err := strconv.Atoi(strings.TrimSpace(metric[1])) + value, err = strconv.Atoi(strings.TrimSpace(metric[1])) if err != nil { return fmt.Errorf("err parse metric value %s: %s", metric[1], err) } - if condition(v) { + if condition(value) { return nil + } else { + return fmt.Errorf("metric value doesn's satisfy condition: %d", value) } } } - return fmt.Errorf("error processing stats") + return fmt.Errorf("error metric %s %s not found", prefix, metric) } // AssertEnvoyMetricAtLeast assert the filered metric by prefix and metric is <= count diff --git a/test/integration/consul-container/libs/assert/service.go b/test/integration/consul-container/libs/assert/service.go index 1110cfe09d..ba46821ffd 100644 --- a/test/integration/consul-container/libs/assert/service.go +++ b/test/integration/consul-container/libs/assert/service.go @@ -19,7 +19,7 @@ import ( ) const ( - defaultHTTPTimeout = 100 * time.Second + defaultHTTPTimeout = 120 * time.Second defaultHTTPWait = defaultWait ) @@ -100,20 +100,36 @@ func ServiceLogContains(t *testing.T, service libservice.Service, target string) func AssertFortioName(t *testing.T, urlbase string, name string) { t.Helper() var fortioNameRE = regexp.MustCompile(("\nFORTIO_NAME=(.+)\n")) - var body []byte + client := &http.Client{ + Transport: &http.Transport{ + DisableKeepAlives: true, + }, + } retry.RunWith(&retry.Timer{Timeout: defaultHTTPTimeout, Wait: defaultHTTPWait}, t, func(r *retry.R) { - resp, err := http.Get(fmt.Sprintf("%s/debug?env=dump", urlbase)) + fullurl := fmt.Sprintf("%s/debug?env=dump", urlbase) + t.Logf("making call to %s", fullurl) + req, err := http.NewRequest("GET", fullurl, nil) + if err != nil { + r.Fatal("could not make request to service ", fullurl) + } + + resp, err := client.Do(req) + if err != nil { + r.Fatal("could not make call to service ", fullurl) + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) if err != nil { r.Error(err) return } - defer resp.Body.Close() - body, err = io.ReadAll(resp.Body) - require.NoError(t, err) + + m := fortioNameRE.FindStringSubmatch(string(body)) + require.GreaterOrEqual(r, len(m), 2) + t.Logf("got response from server name %s", m[1]) + assert.Equal(r, name, m[1]) }) - m := fortioNameRE.FindStringSubmatch(string(body)) - require.GreaterOrEqual(t, len(m), 2) - assert.Equal(t, name, m[1]) } // AssertContainerState validates service container status diff --git a/test/integration/consul-container/libs/service/connect.go b/test/integration/consul-container/libs/service/connect.go index b01eca7c7b..3fa9bcbde2 100644 --- a/test/integration/consul-container/libs/service/connect.go +++ b/test/integration/consul-container/libs/service/connect.go @@ -111,7 +111,7 @@ func (g ConnectContainer) GetStatus() (string, error) { // node. The container exposes port serviceBindPort and envoy admin port // (19000) by mapping them onto host ports. The container's name has a prefix // combining datacenter and name. -func NewConnectService(ctx context.Context, sidecarServiceName string, serviceName string, serviceBindPort int, node libcluster.Agent) (*ConnectContainer, error) { +func NewConnectService(ctx context.Context, sidecarServiceName string, serviceID string, serviceBindPort int, node libcluster.Agent) (*ConnectContainer, error) { nodeConfig := node.GetConfig() if nodeConfig.ScratchDir == "" { return nil, fmt.Errorf("node ScratchDir is required") @@ -145,7 +145,7 @@ func NewConnectService(ctx context.Context, sidecarServiceName string, serviceNa Name: containerName, Cmd: []string{ "consul", "connect", "envoy", - "-sidecar-for", serviceName, + "-sidecar-for", serviceID, "-admin-bind", fmt.Sprintf("0.0.0.0:%d", adminPort), "--", "--log-level", envoyLogLevel, @@ -200,7 +200,7 @@ func NewConnectService(ctx context.Context, sidecarServiceName string, serviceNa } fmt.Printf("NewConnectService: name %s, mapped App Port %d, service bind port %d\n", - serviceName, out.appPort, serviceBindPort) + serviceID, out.appPort, serviceBindPort) fmt.Printf("NewConnectService sidecar: name %s, mapped admin port %d, admin port %d\n", sidecarServiceName, out.adminPort, adminPort) diff --git a/test/integration/consul-container/libs/service/gateway.go b/test/integration/consul-container/libs/service/gateway.go index 450ef99cbd..5da2281338 100644 --- a/test/integration/consul-container/libs/service/gateway.go +++ b/test/integration/consul-container/libs/service/gateway.go @@ -85,10 +85,13 @@ func (g gatewayContainer) Restart() error { return fmt.Errorf("error get gateway state %s", err) } + fmt.Printf("Stopping container: %s\n", g.GetName()) err = g.container.Stop(g.ctx, nil) if err != nil { return fmt.Errorf("error stop gateway %s", err) } + + fmt.Printf("Starting container: %s\n", g.GetName()) err = g.container.Start(g.ctx) if err != nil { return fmt.Errorf("error start gateway %s", err) diff --git a/test/integration/consul-container/libs/service/helpers.go b/test/integration/consul-container/libs/service/helpers.go index 197990f5e4..54a16249ee 100644 --- a/test/integration/consul-container/libs/service/helpers.go +++ b/test/integration/consul-container/libs/service/helpers.go @@ -11,8 +11,9 @@ import ( ) const ( - StaticServerServiceName = "static-server" - StaticClientServiceName = "static-client" + StaticServerServiceName = "static-server" + StaticServer2ServiceName = "static-server-2" + StaticClientServiceName = "static-client" ) type ServiceOpts struct { @@ -62,7 +63,7 @@ func CreateAndRegisterStaticServerAndSidecar(node libcluster.Agent, serviceOpts _ = serverService.Terminate() }) - serverConnectProxy, err := NewConnectService(context.Background(), fmt.Sprintf("%s-sidecar", StaticServerServiceName), serviceOpts.ID, serviceOpts.HTTPPort, node) // bindPort not used + serverConnectProxy, err := NewConnectService(context.Background(), fmt.Sprintf("%s-sidecar", serviceOpts.ID), serviceOpts.ID, serviceOpts.HTTPPort, node) // bindPort not used if err != nil { return nil, nil, err } diff --git a/test/integration/consul-container/libs/topology/peering_topology.go b/test/integration/consul-container/libs/topology/peering_topology.go index c808a347f4..1c764c45c5 100644 --- a/test/integration/consul-container/libs/topology/peering_topology.go +++ b/test/integration/consul-container/libs/topology/peering_topology.go @@ -24,7 +24,7 @@ type BuiltCluster struct { Cluster *libcluster.Cluster Context *libcluster.BuildContext Service libservice.Service - Container *libservice.ConnectContainer + Container libservice.Service Gateway libservice.Service } @@ -59,7 +59,7 @@ func BasicPeeringTwoClustersSetup( // libassert.PeeringExports(t, acceptingClient, acceptingPeerName, 1) // Register an static-server service in acceptingCluster and export to dialing cluster - var serverSidecarService libservice.Service + var serverService, serverSidecarService libservice.Service var acceptingClusterGateway libservice.Service { clientNode := acceptingCluster.Clients()[0] @@ -74,13 +74,13 @@ func BasicPeeringTwoClustersSetup( HTTPPort: 8080, GRPCPort: 8079, } - serverSidecarService, _, err := libservice.CreateAndRegisterStaticServerAndSidecar(clientNode, &serviceOpts) + serverService, serverSidecarService, err = libservice.CreateAndRegisterStaticServerAndSidecar(clientNode, &serviceOpts) require.NoError(t, err) libassert.CatalogServiceExists(t, acceptingClient, libservice.StaticServerServiceName) libassert.CatalogServiceExists(t, acceptingClient, "static-server-sidecar-proxy") - require.NoError(t, serverSidecarService.Export("default", AcceptingPeerName, acceptingClient)) + require.NoError(t, serverService.Export("default", AcceptingPeerName, acceptingClient)) // Create the mesh gateway for dataplane traffic acceptingClusterGateway, err = libservice.NewGatewayService(context.Background(), "mesh", "mesh", clientNode) @@ -115,7 +115,7 @@ func BasicPeeringTwoClustersSetup( Cluster: acceptingCluster, Context: acceptingCtx, Service: serverSidecarService, - Container: nil, + Container: serverSidecarService, Gateway: acceptingClusterGateway, }, &BuiltCluster{ @@ -182,6 +182,7 @@ func NewDialingCluster( // NewPeeringCluster creates a cluster with peering enabled. It also creates // and registers a mesh-gateway at the client agent. The API client returned is // pointed at the client agent. +// - proxy-defaults.protocol = tcp func NewPeeringCluster( t *testing.T, numServers int, diff --git a/test/integration/consul-container/test/upgrade/peer_control_plane_mgw_test.go b/test/integration/consul-container/test/upgrade/peering_control_plane_mgw_test.go similarity index 100% rename from test/integration/consul-container/test/upgrade/peer_control_plane_mgw_test.go rename to test/integration/consul-container/test/upgrade/peering_control_plane_mgw_test.go diff --git a/test/integration/consul-container/test/upgrade/peers_http_test.go b/test/integration/consul-container/test/upgrade/peering_http_test.go similarity index 52% rename from test/integration/consul-container/test/upgrade/peers_http_test.go rename to test/integration/consul-container/test/upgrade/peering_http_test.go index 3bf8636314..aec03a3edb 100644 --- a/test/integration/consul-container/test/upgrade/peers_http_test.go +++ b/test/integration/consul-container/test/upgrade/peering_http_test.go @@ -9,6 +9,7 @@ import ( "github.com/hashicorp/consul/api" libassert "github.com/hashicorp/consul/test/integration/consul-container/libs/assert" + "github.com/hashicorp/consul/test/integration/consul-container/libs/cluster" libservice "github.com/hashicorp/consul/test/integration/consul-container/libs/service" libtopology "github.com/hashicorp/consul/test/integration/consul-container/libs/topology" "github.com/hashicorp/consul/test/integration/consul-container/libs/utils" @@ -20,8 +21,11 @@ func TestPeering_UpgradeToTarget_fromLatest(t *testing.T) { t.Parallel() type testcase struct { - oldversion string - targetVersion string + oldversion string + targetVersion string + name string + create func(*cluster.Cluster) (libservice.Service, error) + extraAssertion func(int) } tcs := []testcase{ // { @@ -33,6 +37,64 @@ func TestPeering_UpgradeToTarget_fromLatest(t *testing.T) { { oldversion: "1.14", targetVersion: utils.TargetVersion, + name: "basic", + create: func(c *cluster.Cluster) (libservice.Service, error) { + return nil, nil + }, + extraAssertion: func(clientUpstreamPort int) {}, + }, + { + oldversion: "1.14", + targetVersion: utils.TargetVersion, + name: "http_router", + // Create a second static-service at the client agent of accepting cluster and + // a service-router that routes /static-server-2 to static-server-2 + create: func(c *cluster.Cluster) (libservice.Service, error) { + serviceOpts := &libservice.ServiceOpts{ + Name: libservice.StaticServer2ServiceName, + ID: "static-server-2", + Meta: map[string]string{"version": "v2"}, + HTTPPort: 8081, + GRPCPort: 8078, + } + _, serverConnectProxy, err := libservice.CreateAndRegisterStaticServerAndSidecar(c.Clients()[0], serviceOpts) + libassert.CatalogServiceExists(t, c.Clients()[0].GetClient(), libservice.StaticServer2ServiceName) + if err != nil { + return nil, err + } + err = c.ConfigEntryWrite(&api.ProxyConfigEntry{ + Kind: api.ProxyDefaults, + Name: "global", + Config: map[string]interface{}{ + "protocol": "http", + }, + }) + if err != nil { + return nil, err + } + routerConfigEntry := &api.ServiceRouterConfigEntry{ + Kind: api.ServiceRouter, + Name: libservice.StaticServerServiceName, + Routes: []api.ServiceRoute{ + { + Match: &api.ServiceRouteMatch{ + HTTP: &api.ServiceRouteHTTPMatch{ + PathPrefix: "/" + libservice.StaticServer2ServiceName + "/", + }, + }, + Destination: &api.ServiceRouteDestination{ + Service: libservice.StaticServer2ServiceName, + PrefixRewrite: "/", + }, + }, + }, + } + err = c.ConfigEntryWrite(routerConfigEntry) + return serverConnectProxy, err + }, + extraAssertion: func(clientUpstreamPort int) { + libassert.AssertFortioName(t, fmt.Sprintf("http://localhost:%d/static-server-2", clientUpstreamPort), "static-server-2") + }, }, } @@ -50,6 +112,12 @@ func TestPeering_UpgradeToTarget_fromLatest(t *testing.T) { require.NoError(t, err) _, gatewayAdminPort := dialing.Gateway.GetAdminAddr() + _, staticClientPort := dialing.Container.GetAddr() + + _, appPort := dialing.Container.GetAddr() + _, err = tc.create(acceptingCluster) + require.NoError(t, err) + tc.extraAssertion(appPort) // Upgrade the accepting cluster and assert peering is still ACTIVE require.NoError(t, acceptingCluster.StandardUpgrade(t, context.Background(), tc.targetVersion)) @@ -64,27 +132,33 @@ func TestPeering_UpgradeToTarget_fromLatest(t *testing.T) { // - Register a new static-client service in dialing cluster and // - set upstream to static-server service in peered cluster - // Restart the gateway & proxy sidecar + // Restart the gateway & proxy sidecar, and verify existing connection require.NoError(t, dialing.Gateway.Restart()) - require.NoError(t, dialing.Container.Restart()) - // Restarted gateway should not have any measurement on data plane traffic libassert.AssertEnvoyMetricAtMost(t, gatewayAdminPort, "cluster.static-server.default.default.accepting-to-dialer.external", "upstream_cx_total", 0) + libassert.HTTPServiceEchoes(t, "localhost", staticClientPort, "") + + require.NoError(t, dialing.Container.Restart()) + libassert.HTTPServiceEchoes(t, "localhost", staticClientPort, "") + require.NoError(t, accepting.Container.Restart()) + libassert.HTTPServiceEchoes(t, "localhost", staticClientPort, "") clientSidecarService, err := libservice.CreateAndRegisterStaticClientSidecar(dialingCluster.Servers()[0], libtopology.DialingPeerName, true) require.NoError(t, err) _, port := clientSidecarService.GetAddr() _, adminPort := clientSidecarService.GetAdminAddr() - require.NoError(t, clientSidecarService.Restart()) libassert.AssertUpstreamEndpointStatus(t, adminPort, fmt.Sprintf("static-server.default.%s.external", libtopology.DialingPeerName), "HEALTHY", 1) libassert.HTTPServiceEchoes(t, "localhost", port, "") libassert.AssertFortioName(t, fmt.Sprintf("http://localhost:%d", port), "static-server") + + // TODO: restart static-server-2's sidecar + tc.extraAssertion(appPort) } for _, tc := range tcs { - t.Run(fmt.Sprintf("upgrade from %s to %s", tc.oldversion, tc.targetVersion), + t.Run(fmt.Sprintf("%s upgrade from %s to %s", tc.name, tc.oldversion, tc.targetVersion), func(t *testing.T) { run(t, tc) })