mirror of https://github.com/hashicorp/consul
upgrade test: peering with http router config entry (#16231)
* upgrade test: peering with http router config entrypull/16253/head
parent
3f22879106
commit
ab5dac3414
|
@ -98,23 +98,26 @@ func AssertEnvoyMetricAtMost(t *testing.T, adminPort int, prefix, metric string,
|
|||
}
|
||||
|
||||
func processMetrics(metrics []string, prefix, metric string, condition func(v int) bool) error {
|
||||
var err error
|
||||
for _, line := range metrics {
|
||||
if strings.Contains(line, prefix) &&
|
||||
strings.Contains(line, metric) {
|
||||
|
||||
var value int
|
||||
metric := strings.Split(line, ":")
|
||||
|
||||
v, err := strconv.Atoi(strings.TrimSpace(metric[1]))
|
||||
value, err = strconv.Atoi(strings.TrimSpace(metric[1]))
|
||||
if err != nil {
|
||||
return fmt.Errorf("err parse metric value %s: %s", metric[1], err)
|
||||
}
|
||||
|
||||
if condition(v) {
|
||||
if condition(value) {
|
||||
return nil
|
||||
} else {
|
||||
return fmt.Errorf("metric value doesn's satisfy condition: %d", value)
|
||||
}
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("error processing stats")
|
||||
return fmt.Errorf("error metric %s %s not found", prefix, metric)
|
||||
}
|
||||
|
||||
// AssertEnvoyMetricAtLeast assert the filered metric by prefix and metric is <= count
|
||||
|
|
|
@ -19,7 +19,7 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
defaultHTTPTimeout = 100 * time.Second
|
||||
defaultHTTPTimeout = 120 * time.Second
|
||||
defaultHTTPWait = defaultWait
|
||||
)
|
||||
|
||||
|
@ -100,20 +100,36 @@ func ServiceLogContains(t *testing.T, service libservice.Service, target string)
|
|||
func AssertFortioName(t *testing.T, urlbase string, name string) {
|
||||
t.Helper()
|
||||
var fortioNameRE = regexp.MustCompile(("\nFORTIO_NAME=(.+)\n"))
|
||||
var body []byte
|
||||
client := &http.Client{
|
||||
Transport: &http.Transport{
|
||||
DisableKeepAlives: true,
|
||||
},
|
||||
}
|
||||
retry.RunWith(&retry.Timer{Timeout: defaultHTTPTimeout, Wait: defaultHTTPWait}, t, func(r *retry.R) {
|
||||
resp, err := http.Get(fmt.Sprintf("%s/debug?env=dump", urlbase))
|
||||
fullurl := fmt.Sprintf("%s/debug?env=dump", urlbase)
|
||||
t.Logf("making call to %s", fullurl)
|
||||
req, err := http.NewRequest("GET", fullurl, nil)
|
||||
if err != nil {
|
||||
r.Fatal("could not make request to service ", fullurl)
|
||||
}
|
||||
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
r.Fatal("could not make call to service ", fullurl)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
r.Error(err)
|
||||
return
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
body, err = io.ReadAll(resp.Body)
|
||||
require.NoError(t, err)
|
||||
|
||||
m := fortioNameRE.FindStringSubmatch(string(body))
|
||||
require.GreaterOrEqual(r, len(m), 2)
|
||||
t.Logf("got response from server name %s", m[1])
|
||||
assert.Equal(r, name, m[1])
|
||||
})
|
||||
m := fortioNameRE.FindStringSubmatch(string(body))
|
||||
require.GreaterOrEqual(t, len(m), 2)
|
||||
assert.Equal(t, name, m[1])
|
||||
}
|
||||
|
||||
// AssertContainerState validates service container status
|
||||
|
|
|
@ -111,7 +111,7 @@ func (g ConnectContainer) GetStatus() (string, error) {
|
|||
// node. The container exposes port serviceBindPort and envoy admin port
|
||||
// (19000) by mapping them onto host ports. The container's name has a prefix
|
||||
// combining datacenter and name.
|
||||
func NewConnectService(ctx context.Context, sidecarServiceName string, serviceName string, serviceBindPort int, node libcluster.Agent) (*ConnectContainer, error) {
|
||||
func NewConnectService(ctx context.Context, sidecarServiceName string, serviceID string, serviceBindPort int, node libcluster.Agent) (*ConnectContainer, error) {
|
||||
nodeConfig := node.GetConfig()
|
||||
if nodeConfig.ScratchDir == "" {
|
||||
return nil, fmt.Errorf("node ScratchDir is required")
|
||||
|
@ -145,7 +145,7 @@ func NewConnectService(ctx context.Context, sidecarServiceName string, serviceNa
|
|||
Name: containerName,
|
||||
Cmd: []string{
|
||||
"consul", "connect", "envoy",
|
||||
"-sidecar-for", serviceName,
|
||||
"-sidecar-for", serviceID,
|
||||
"-admin-bind", fmt.Sprintf("0.0.0.0:%d", adminPort),
|
||||
"--",
|
||||
"--log-level", envoyLogLevel,
|
||||
|
@ -200,7 +200,7 @@ func NewConnectService(ctx context.Context, sidecarServiceName string, serviceNa
|
|||
}
|
||||
|
||||
fmt.Printf("NewConnectService: name %s, mapped App Port %d, service bind port %d\n",
|
||||
serviceName, out.appPort, serviceBindPort)
|
||||
serviceID, out.appPort, serviceBindPort)
|
||||
fmt.Printf("NewConnectService sidecar: name %s, mapped admin port %d, admin port %d\n",
|
||||
sidecarServiceName, out.adminPort, adminPort)
|
||||
|
||||
|
|
|
@ -85,10 +85,13 @@ func (g gatewayContainer) Restart() error {
|
|||
return fmt.Errorf("error get gateway state %s", err)
|
||||
}
|
||||
|
||||
fmt.Printf("Stopping container: %s\n", g.GetName())
|
||||
err = g.container.Stop(g.ctx, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error stop gateway %s", err)
|
||||
}
|
||||
|
||||
fmt.Printf("Starting container: %s\n", g.GetName())
|
||||
err = g.container.Start(g.ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error start gateway %s", err)
|
||||
|
|
|
@ -11,8 +11,9 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
StaticServerServiceName = "static-server"
|
||||
StaticClientServiceName = "static-client"
|
||||
StaticServerServiceName = "static-server"
|
||||
StaticServer2ServiceName = "static-server-2"
|
||||
StaticClientServiceName = "static-client"
|
||||
)
|
||||
|
||||
type ServiceOpts struct {
|
||||
|
@ -62,7 +63,7 @@ func CreateAndRegisterStaticServerAndSidecar(node libcluster.Agent, serviceOpts
|
|||
_ = serverService.Terminate()
|
||||
})
|
||||
|
||||
serverConnectProxy, err := NewConnectService(context.Background(), fmt.Sprintf("%s-sidecar", StaticServerServiceName), serviceOpts.ID, serviceOpts.HTTPPort, node) // bindPort not used
|
||||
serverConnectProxy, err := NewConnectService(context.Background(), fmt.Sprintf("%s-sidecar", serviceOpts.ID), serviceOpts.ID, serviceOpts.HTTPPort, node) // bindPort not used
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
|
|
@ -24,7 +24,7 @@ type BuiltCluster struct {
|
|||
Cluster *libcluster.Cluster
|
||||
Context *libcluster.BuildContext
|
||||
Service libservice.Service
|
||||
Container *libservice.ConnectContainer
|
||||
Container libservice.Service
|
||||
Gateway libservice.Service
|
||||
}
|
||||
|
||||
|
@ -59,7 +59,7 @@ func BasicPeeringTwoClustersSetup(
|
|||
// libassert.PeeringExports(t, acceptingClient, acceptingPeerName, 1)
|
||||
|
||||
// Register an static-server service in acceptingCluster and export to dialing cluster
|
||||
var serverSidecarService libservice.Service
|
||||
var serverService, serverSidecarService libservice.Service
|
||||
var acceptingClusterGateway libservice.Service
|
||||
{
|
||||
clientNode := acceptingCluster.Clients()[0]
|
||||
|
@ -74,13 +74,13 @@ func BasicPeeringTwoClustersSetup(
|
|||
HTTPPort: 8080,
|
||||
GRPCPort: 8079,
|
||||
}
|
||||
serverSidecarService, _, err := libservice.CreateAndRegisterStaticServerAndSidecar(clientNode, &serviceOpts)
|
||||
serverService, serverSidecarService, err = libservice.CreateAndRegisterStaticServerAndSidecar(clientNode, &serviceOpts)
|
||||
require.NoError(t, err)
|
||||
|
||||
libassert.CatalogServiceExists(t, acceptingClient, libservice.StaticServerServiceName)
|
||||
libassert.CatalogServiceExists(t, acceptingClient, "static-server-sidecar-proxy")
|
||||
|
||||
require.NoError(t, serverSidecarService.Export("default", AcceptingPeerName, acceptingClient))
|
||||
require.NoError(t, serverService.Export("default", AcceptingPeerName, acceptingClient))
|
||||
|
||||
// Create the mesh gateway for dataplane traffic
|
||||
acceptingClusterGateway, err = libservice.NewGatewayService(context.Background(), "mesh", "mesh", clientNode)
|
||||
|
@ -115,7 +115,7 @@ func BasicPeeringTwoClustersSetup(
|
|||
Cluster: acceptingCluster,
|
||||
Context: acceptingCtx,
|
||||
Service: serverSidecarService,
|
||||
Container: nil,
|
||||
Container: serverSidecarService,
|
||||
Gateway: acceptingClusterGateway,
|
||||
},
|
||||
&BuiltCluster{
|
||||
|
@ -182,6 +182,7 @@ func NewDialingCluster(
|
|||
// NewPeeringCluster creates a cluster with peering enabled. It also creates
|
||||
// and registers a mesh-gateway at the client agent. The API client returned is
|
||||
// pointed at the client agent.
|
||||
// - proxy-defaults.protocol = tcp
|
||||
func NewPeeringCluster(
|
||||
t *testing.T,
|
||||
numServers int,
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
|
||||
"github.com/hashicorp/consul/api"
|
||||
libassert "github.com/hashicorp/consul/test/integration/consul-container/libs/assert"
|
||||
"github.com/hashicorp/consul/test/integration/consul-container/libs/cluster"
|
||||
libservice "github.com/hashicorp/consul/test/integration/consul-container/libs/service"
|
||||
libtopology "github.com/hashicorp/consul/test/integration/consul-container/libs/topology"
|
||||
"github.com/hashicorp/consul/test/integration/consul-container/libs/utils"
|
||||
|
@ -20,8 +21,11 @@ func TestPeering_UpgradeToTarget_fromLatest(t *testing.T) {
|
|||
t.Parallel()
|
||||
|
||||
type testcase struct {
|
||||
oldversion string
|
||||
targetVersion string
|
||||
oldversion string
|
||||
targetVersion string
|
||||
name string
|
||||
create func(*cluster.Cluster) (libservice.Service, error)
|
||||
extraAssertion func(int)
|
||||
}
|
||||
tcs := []testcase{
|
||||
// {
|
||||
|
@ -33,6 +37,64 @@ func TestPeering_UpgradeToTarget_fromLatest(t *testing.T) {
|
|||
{
|
||||
oldversion: "1.14",
|
||||
targetVersion: utils.TargetVersion,
|
||||
name: "basic",
|
||||
create: func(c *cluster.Cluster) (libservice.Service, error) {
|
||||
return nil, nil
|
||||
},
|
||||
extraAssertion: func(clientUpstreamPort int) {},
|
||||
},
|
||||
{
|
||||
oldversion: "1.14",
|
||||
targetVersion: utils.TargetVersion,
|
||||
name: "http_router",
|
||||
// Create a second static-service at the client agent of accepting cluster and
|
||||
// a service-router that routes /static-server-2 to static-server-2
|
||||
create: func(c *cluster.Cluster) (libservice.Service, error) {
|
||||
serviceOpts := &libservice.ServiceOpts{
|
||||
Name: libservice.StaticServer2ServiceName,
|
||||
ID: "static-server-2",
|
||||
Meta: map[string]string{"version": "v2"},
|
||||
HTTPPort: 8081,
|
||||
GRPCPort: 8078,
|
||||
}
|
||||
_, serverConnectProxy, err := libservice.CreateAndRegisterStaticServerAndSidecar(c.Clients()[0], serviceOpts)
|
||||
libassert.CatalogServiceExists(t, c.Clients()[0].GetClient(), libservice.StaticServer2ServiceName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = c.ConfigEntryWrite(&api.ProxyConfigEntry{
|
||||
Kind: api.ProxyDefaults,
|
||||
Name: "global",
|
||||
Config: map[string]interface{}{
|
||||
"protocol": "http",
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
routerConfigEntry := &api.ServiceRouterConfigEntry{
|
||||
Kind: api.ServiceRouter,
|
||||
Name: libservice.StaticServerServiceName,
|
||||
Routes: []api.ServiceRoute{
|
||||
{
|
||||
Match: &api.ServiceRouteMatch{
|
||||
HTTP: &api.ServiceRouteHTTPMatch{
|
||||
PathPrefix: "/" + libservice.StaticServer2ServiceName + "/",
|
||||
},
|
||||
},
|
||||
Destination: &api.ServiceRouteDestination{
|
||||
Service: libservice.StaticServer2ServiceName,
|
||||
PrefixRewrite: "/",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
err = c.ConfigEntryWrite(routerConfigEntry)
|
||||
return serverConnectProxy, err
|
||||
},
|
||||
extraAssertion: func(clientUpstreamPort int) {
|
||||
libassert.AssertFortioName(t, fmt.Sprintf("http://localhost:%d/static-server-2", clientUpstreamPort), "static-server-2")
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -50,6 +112,12 @@ func TestPeering_UpgradeToTarget_fromLatest(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
|
||||
_, gatewayAdminPort := dialing.Gateway.GetAdminAddr()
|
||||
_, staticClientPort := dialing.Container.GetAddr()
|
||||
|
||||
_, appPort := dialing.Container.GetAddr()
|
||||
_, err = tc.create(acceptingCluster)
|
||||
require.NoError(t, err)
|
||||
tc.extraAssertion(appPort)
|
||||
|
||||
// Upgrade the accepting cluster and assert peering is still ACTIVE
|
||||
require.NoError(t, acceptingCluster.StandardUpgrade(t, context.Background(), tc.targetVersion))
|
||||
|
@ -64,27 +132,33 @@ func TestPeering_UpgradeToTarget_fromLatest(t *testing.T) {
|
|||
// - Register a new static-client service in dialing cluster and
|
||||
// - set upstream to static-server service in peered cluster
|
||||
|
||||
// Restart the gateway & proxy sidecar
|
||||
// Restart the gateway & proxy sidecar, and verify existing connection
|
||||
require.NoError(t, dialing.Gateway.Restart())
|
||||
require.NoError(t, dialing.Container.Restart())
|
||||
|
||||
// Restarted gateway should not have any measurement on data plane traffic
|
||||
libassert.AssertEnvoyMetricAtMost(t, gatewayAdminPort,
|
||||
"cluster.static-server.default.default.accepting-to-dialer.external",
|
||||
"upstream_cx_total", 0)
|
||||
libassert.HTTPServiceEchoes(t, "localhost", staticClientPort, "")
|
||||
|
||||
require.NoError(t, dialing.Container.Restart())
|
||||
libassert.HTTPServiceEchoes(t, "localhost", staticClientPort, "")
|
||||
require.NoError(t, accepting.Container.Restart())
|
||||
libassert.HTTPServiceEchoes(t, "localhost", staticClientPort, "")
|
||||
|
||||
clientSidecarService, err := libservice.CreateAndRegisterStaticClientSidecar(dialingCluster.Servers()[0], libtopology.DialingPeerName, true)
|
||||
require.NoError(t, err)
|
||||
_, port := clientSidecarService.GetAddr()
|
||||
_, adminPort := clientSidecarService.GetAdminAddr()
|
||||
require.NoError(t, clientSidecarService.Restart())
|
||||
libassert.AssertUpstreamEndpointStatus(t, adminPort, fmt.Sprintf("static-server.default.%s.external", libtopology.DialingPeerName), "HEALTHY", 1)
|
||||
libassert.HTTPServiceEchoes(t, "localhost", port, "")
|
||||
libassert.AssertFortioName(t, fmt.Sprintf("http://localhost:%d", port), "static-server")
|
||||
|
||||
// TODO: restart static-server-2's sidecar
|
||||
tc.extraAssertion(appPort)
|
||||
}
|
||||
|
||||
for _, tc := range tcs {
|
||||
t.Run(fmt.Sprintf("upgrade from %s to %s", tc.oldversion, tc.targetVersion),
|
||||
t.Run(fmt.Sprintf("%s upgrade from %s to %s", tc.name, tc.oldversion, tc.targetVersion),
|
||||
func(t *testing.T) {
|
||||
run(t, tc)
|
||||
})
|
Loading…
Reference in New Issue