diff --git a/test/integration/consul-container/libs/assert/service.go b/test/integration/consul-container/libs/assert/service.go index ba46821ffd..15a03be3b6 100644 --- a/test/integration/consul-container/libs/assert/service.go +++ b/test/integration/consul-container/libs/assert/service.go @@ -49,9 +49,17 @@ func CatalogNodeExists(t *testing.T, c *api.Client, nodeName string) { }) } +func HTTPServiceEchoes(t *testing.T, ip string, port int, path string) { + doHTTPServiceEchoes(t, ip, port, path, nil) +} + +func HTTPServiceEchoesResHeader(t *testing.T, ip string, port int, path string, expectedResHeader map[string]string) { + doHTTPServiceEchoes(t, ip, port, path, expectedResHeader) +} + // HTTPServiceEchoes verifies that a post to the given ip/port combination returns the data // in the response body. Optional path can be provided to differentiate requests. -func HTTPServiceEchoes(t *testing.T, ip string, port int, path string) { +func doHTTPServiceEchoes(t *testing.T, ip string, port int, path string, expectedResHeader map[string]string) { const phrase = "hello" failer := func() *retry.Timer { @@ -82,6 +90,24 @@ func HTTPServiceEchoes(t *testing.T, ip string, port int, path string) { if !strings.Contains(string(body), phrase) { r.Fatal("received an incorrect response ", string(body)) } + + for k, v := range expectedResHeader { + if headerValues, ok := res.Header[k]; !ok { + r.Fatal("expected header not found", k) + } else { + found := false + for _, value := range headerValues { + if value == v { + found = true + break + } + } + + if !found { + r.Fatalf("header %s value not match want %s got %s ", k, v, headerValues) + } + } + } }) } diff --git a/test/integration/consul-container/libs/cluster/container.go b/test/integration/consul-container/libs/cluster/container.go index c9ce7792b6..bd4416a35b 100644 --- a/test/integration/consul-container/libs/cluster/container.go +++ b/test/integration/consul-container/libs/cluster/container.go @@ -26,8 +26,9 @@ const bootLogLine = "Consul agent running" const disableRYUKEnv = "TESTCONTAINERS_RYUK_DISABLED" // Exposed ports info -const MaxEnvoyOnNode = 10 // the max number of Envoy sidecar can run along with the agent, base is 19000 -const ServiceUpstreamLocalBindPort = 5000 // local bind Port of service's upstream +const MaxEnvoyOnNode = 10 // the max number of Envoy sidecar can run along with the agent, base is 19000 +const ServiceUpstreamLocalBindPort = 5000 // local bind Port of service's upstream +const ServiceUpstreamLocalBindPort2 = 5001 // local bind Port of service's upstream, for services with 2 upstreams // consulContainerNode implements the Agent interface by running a Consul agent // in a container. @@ -530,6 +531,7 @@ func newContainerRequest(config Config, opts containerOpts) (podRequest, consulR // Envoy upstream listener pod.ExposedPorts = append(pod.ExposedPorts, fmt.Sprintf("%d/tcp", ServiceUpstreamLocalBindPort)) + pod.ExposedPorts = append(pod.ExposedPorts, fmt.Sprintf("%d/tcp", ServiceUpstreamLocalBindPort2)) // Reserve the exposed ports for Envoy admin port, e.g., 19000 - 19009 basePort := 19000 diff --git a/test/integration/consul-container/libs/service/connect.go b/test/integration/consul-container/libs/service/connect.go index b5a8087d2d..49a340bd2a 100644 --- a/test/integration/consul-container/libs/service/connect.go +++ b/test/integration/consul-container/libs/service/connect.go @@ -14,7 +14,6 @@ import ( "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/test/integration/consul-container/libs/cluster" - libcluster "github.com/hashicorp/consul/test/integration/consul-container/libs/cluster" "github.com/hashicorp/consul/test/integration/consul-container/libs/utils" ) @@ -23,7 +22,7 @@ type ConnectContainer struct { ctx context.Context container testcontainers.Container ip string - appPort int + appPort []int externalAdminPort int internalAdminPort int mappedPublicPort int @@ -52,6 +51,10 @@ func (g ConnectContainer) Export(partition, peer string, client *api.Client) err } func (g ConnectContainer) GetAddr() (string, int) { + return g.ip, g.appPort[0] +} + +func (g ConnectContainer) GetAddrs() (string, []int) { return g.ip, g.appPort } @@ -139,7 +142,7 @@ func (g ConnectContainer) GetStatus() (string, error) { // node. The container exposes port serviceBindPort and envoy admin port // (19000) by mapping them onto host ports. The container's name has a prefix // combining datacenter and name. -func NewConnectService(ctx context.Context, sidecarServiceName string, serviceID string, serviceBindPort int, node libcluster.Agent) (*ConnectContainer, error) { +func NewConnectService(ctx context.Context, sidecarServiceName string, serviceID string, serviceBindPorts []int, node cluster.Agent) (*ConnectContainer, error) { nodeConfig := node.GetConfig() if nodeConfig.ScratchDir == "" { return nil, fmt.Errorf("node ScratchDir is required") @@ -209,11 +212,19 @@ func NewConnectService(ctx context.Context, sidecarServiceName string, serviceID } var ( - appPortStr = strconv.Itoa(serviceBindPort) + appPortStrs []string adminPortStr = strconv.Itoa(internalAdminPort) ) - info, err := cluster.LaunchContainerOnNode(ctx, node, req, []string{appPortStr, adminPortStr}) + for _, port := range serviceBindPorts { + appPortStrs = append(appPortStrs, strconv.Itoa(port)) + } + + // expose the app ports and the envoy adminPortStr on the agent container + exposedPorts := make([]string, len(appPortStrs)) + copy(exposedPorts, appPortStrs) + exposedPorts = append(exposedPorts, adminPortStr) + info, err := cluster.LaunchContainerOnNode(ctx, node, req, exposedPorts) if err != nil { return nil, err } @@ -222,14 +233,17 @@ func NewConnectService(ctx context.Context, sidecarServiceName string, serviceID ctx: ctx, container: info.Container, ip: info.IP, - appPort: info.MappedPorts[appPortStr].Int(), externalAdminPort: info.MappedPorts[adminPortStr].Int(), internalAdminPort: internalAdminPort, serviceName: sidecarServiceName, } - fmt.Printf("NewConnectService: name %s, mapped App Port %d, service bind port %d\n", - serviceID, out.appPort, serviceBindPort) + for _, port := range appPortStrs { + out.appPort = append(out.appPort, info.MappedPorts[port].Int()) + } + + fmt.Printf("NewConnectService: name %s, mapped App Port %d, service bind port %v\n", + serviceID, out.appPort, serviceBindPorts) fmt.Printf("NewConnectService sidecar: name %s, mapped admin port %d, admin port %d\n", sidecarServiceName, out.externalAdminPort, internalAdminPort) diff --git a/test/integration/consul-container/libs/service/examples.go b/test/integration/consul-container/libs/service/examples.go index da075f5aec..3d6258eaa9 100644 --- a/test/integration/consul-container/libs/service/examples.go +++ b/test/integration/consul-container/libs/service/examples.go @@ -64,6 +64,10 @@ func (g exampleContainer) GetAddr() (string, int) { return g.ip, g.httpPort } +func (g exampleContainer) GetAddrs() (string, []int) { + return "", nil +} + func (g exampleContainer) Restart() error { return fmt.Errorf("Restart Unimplemented by ConnectContainer") } diff --git a/test/integration/consul-container/libs/service/gateway.go b/test/integration/consul-container/libs/service/gateway.go index 70897fc7b0..7028a61292 100644 --- a/test/integration/consul-container/libs/service/gateway.go +++ b/test/integration/consul-container/libs/service/gateway.go @@ -53,6 +53,10 @@ func (g gatewayContainer) GetAddr() (string, int) { return g.ip, g.port } +func (g gatewayContainer) GetAddrs() (string, []int) { + return "", nil +} + func (g gatewayContainer) GetLogs() (string, error) { rc, err := g.container.Logs(context.Background()) if err != nil { diff --git a/test/integration/consul-container/libs/service/helpers.go b/test/integration/consul-container/libs/service/helpers.go index 55ad94bb7f..8de49e45c2 100644 --- a/test/integration/consul-container/libs/service/helpers.go +++ b/test/integration/consul-container/libs/service/helpers.go @@ -61,7 +61,7 @@ func CreateAndRegisterStaticServerAndSidecar(node libcluster.Agent, serviceOpts _ = serverService.Terminate() }) - serverConnectProxy, err := NewConnectService(context.Background(), fmt.Sprintf("%s-sidecar", serviceOpts.ID), serviceOpts.ID, serviceOpts.HTTPPort, node) // bindPort not used + serverConnectProxy, err := NewConnectService(context.Background(), fmt.Sprintf("%s-sidecar", serviceOpts.ID), serviceOpts.ID, []int{serviceOpts.HTTPPort}, node) // bindPort not used if err != nil { return nil, nil, err } @@ -117,7 +117,7 @@ func CreateAndRegisterStaticClientSidecar( } // Create a service and proxy instance - clientConnectProxy, err := NewConnectService(context.Background(), fmt.Sprintf("%s-sidecar", StaticClientServiceName), StaticClientServiceName, libcluster.ServiceUpstreamLocalBindPort, node) + clientConnectProxy, err := NewConnectService(context.Background(), fmt.Sprintf("%s-sidecar", StaticClientServiceName), StaticClientServiceName, []int{libcluster.ServiceUpstreamLocalBindPort}, node) if err != nil { return nil, err } diff --git a/test/integration/consul-container/libs/service/service.go b/test/integration/consul-container/libs/service/service.go index 99da558226..f32bd67ff8 100644 --- a/test/integration/consul-container/libs/service/service.go +++ b/test/integration/consul-container/libs/service/service.go @@ -12,6 +12,7 @@ type Service interface { // Export a service to the peering cluster Export(partition, peer string, client *api.Client) error GetAddr() (string, int) + GetAddrs() (string, []int) // GetAdminAddr returns the external admin address GetAdminAddr() (string, int) GetLogs() (string, error) diff --git a/test/integration/consul-container/test/upgrade/peering_http_test.go b/test/integration/consul-container/test/upgrade/peering_http_test.go index fe91f76530..b57ed1d1de 100644 --- a/test/integration/consul-container/test/upgrade/peering_http_test.go +++ b/test/integration/consul-container/test/upgrade/peering_http_test.go @@ -21,10 +21,15 @@ func TestPeering_UpgradeToTarget_fromLatest(t *testing.T) { t.Parallel() type testcase struct { - oldversion string - targetVersion string - name string - create func(*cluster.Cluster) (libservice.Service, error) + oldversion string + targetVersion string + name string + // create creates addtional resources in peered clusters depending on cases, e.g., static-client, + // static server, and config-entries. It returns the proxy services, an assertation function to + // be called to verify the resources. + create func(*cluster.Cluster, *cluster.Cluster) (libservice.Service, libservice.Service, func(), error) + // extraAssertion adds additional assertion function to the common resources across cases. + // common resources includes static-client in dialing cluster, and static-server in accepting cluster. extraAssertion func(int) } tcs := []testcase{ @@ -38,8 +43,8 @@ func TestPeering_UpgradeToTarget_fromLatest(t *testing.T) { oldversion: "1.14", targetVersion: utils.TargetVersion, name: "basic", - create: func(c *cluster.Cluster) (libservice.Service, error) { - return nil, nil + create: func(accepting *cluster.Cluster, dialing *cluster.Cluster) (libservice.Service, libservice.Service, func(), error) { + return nil, nil, func() {}, nil }, extraAssertion: func(clientUpstreamPort int) {}, }, @@ -49,7 +54,8 @@ func TestPeering_UpgradeToTarget_fromLatest(t *testing.T) { name: "http_router", // Create a second static-service at the client agent of accepting cluster and // a service-router that routes /static-server-2 to static-server-2 - create: func(c *cluster.Cluster) (libservice.Service, error) { + create: func(accepting *cluster.Cluster, dialing *cluster.Cluster) (libservice.Service, libservice.Service, func(), error) { + c := accepting serviceOpts := &libservice.ServiceOpts{ Name: libservice.StaticServer2ServiceName, ID: "static-server-2", @@ -60,7 +66,7 @@ func TestPeering_UpgradeToTarget_fromLatest(t *testing.T) { _, serverConnectProxy, err := libservice.CreateAndRegisterStaticServerAndSidecar(c.Clients()[0], serviceOpts) libassert.CatalogServiceExists(t, c.Clients()[0].GetClient(), libservice.StaticServer2ServiceName) if err != nil { - return nil, err + return nil, nil, nil, err } err = c.ConfigEntryWrite(&api.ProxyConfigEntry{ Kind: api.ProxyDefaults, @@ -70,7 +76,7 @@ func TestPeering_UpgradeToTarget_fromLatest(t *testing.T) { }, }) if err != nil { - return nil, err + return nil, nil, nil, err } routerConfigEntry := &api.ServiceRouterConfigEntry{ Kind: api.ServiceRouter, @@ -90,12 +96,127 @@ func TestPeering_UpgradeToTarget_fromLatest(t *testing.T) { }, } err = c.ConfigEntryWrite(routerConfigEntry) - return serverConnectProxy, err + return serverConnectProxy, nil, func() {}, err }, extraAssertion: func(clientUpstreamPort int) { libassert.AssertFortioName(t, fmt.Sprintf("http://localhost:%d/static-server-2", clientUpstreamPort), "static-server-2") }, }, + { + oldversion: "1.14", + targetVersion: utils.TargetVersion, + name: "http splitter and resolver", + // In addtional to the basic topology, this case provisions the following + // services in the dialing cluster: + // + // - a new static-client at server_0 that has two upstreams: split-static-server (5000) + // and peer-static-server (5001) + // - a local static-server service at client_0 + // - service-splitter named split-static-server w/ 2 services: "local-static-server" and + // "peer-static-server". + // - service-resolved named local-static-server + // - service-resolved named peer-static-server + create: func(accepting *cluster.Cluster, dialing *cluster.Cluster) (libservice.Service, libservice.Service, func(), error) { + err := dialing.ConfigEntryWrite(&api.ProxyConfigEntry{ + Kind: api.ProxyDefaults, + Name: "global", + Config: map[string]interface{}{ + "protocol": "http", + }, + }) + if err != nil { + return nil, nil, nil, err + } + + clientConnectProxy, err := createAndRegisterStaticClientSidecarWithSplittingUpstreams(dialing) + if err != nil { + return nil, nil, nil, fmt.Errorf("error creating client connect proxy in cluster %s", dialing.NetworkName) + } + + // make a resolver for service peer-static-server + resolverConfigEntry := &api.ServiceResolverConfigEntry{ + Kind: api.ServiceResolver, + Name: "peer-static-server", + Redirect: &api.ServiceResolverRedirect{ + Service: libservice.StaticServerServiceName, + Peer: libtopology.DialingPeerName, + }, + } + err = dialing.ConfigEntryWrite(resolverConfigEntry) + if err != nil { + return nil, nil, nil, fmt.Errorf("error writing resolver config entry for %s", resolverConfigEntry.Name) + } + + // make a splitter for service split-static-server + splitter := &api.ServiceSplitterConfigEntry{ + Kind: api.ServiceSplitter, + Name: "split-static-server", + Splits: []api.ServiceSplit{ + { + Weight: 50, + Service: "local-static-server", + ResponseHeaders: &api.HTTPHeaderModifiers{ + Set: map[string]string{ + "x-test-split": "local", + }, + }, + }, + { + Weight: 50, + Service: "peer-static-server", + ResponseHeaders: &api.HTTPHeaderModifiers{ + Set: map[string]string{ + "x-test-split": "peer", + }, + }, + }, + }, + } + err = dialing.ConfigEntryWrite(splitter) + if err != nil { + return nil, nil, nil, fmt.Errorf("error writing splitter config entry for %s", splitter.Name) + } + + // make a resolver for service local-static-server + resolverConfigEntry = &api.ServiceResolverConfigEntry{ + Kind: api.ServiceResolver, + Name: "local-static-server", + Redirect: &api.ServiceResolverRedirect{ + Service: libservice.StaticServerServiceName, + }, + } + err = dialing.ConfigEntryWrite(resolverConfigEntry) + if err != nil { + return nil, nil, nil, fmt.Errorf("error writing resolver config entry for %s", resolverConfigEntry.Name) + } + + // Make a static-server in dialing cluster + serviceOpts := &libservice.ServiceOpts{ + Name: libservice.StaticServerServiceName, + ID: "static-server", + HTTPPort: 8081, + GRPCPort: 8078, + } + _, serverConnectProxy, err := libservice.CreateAndRegisterStaticServerAndSidecar(dialing.Clients()[0], serviceOpts) + libassert.CatalogServiceExists(t, dialing.Clients()[0].GetClient(), libservice.StaticServerServiceName) + if err != nil { + return nil, nil, nil, err + } + + _, appPorts := clientConnectProxy.GetAddrs() + assertionFn := func() { + libassert.HTTPServiceEchoesResHeader(t, "localhost", appPorts[0], "", map[string]string{ + "X-Test-Split": "local", + }) + libassert.HTTPServiceEchoesResHeader(t, "localhost", appPorts[0], "", map[string]string{ + "X-Test-Split": "peer", + }) + libassert.HTTPServiceEchoes(t, "localhost", appPorts[0], "") + } + return serverConnectProxy, clientConnectProxy, assertionFn, nil + }, + extraAssertion: func(clientUpstreamPort int) {}, + }, } run := func(t *testing.T, tc testcase) { @@ -115,7 +236,7 @@ func TestPeering_UpgradeToTarget_fromLatest(t *testing.T) { _, staticClientPort := dialing.Container.GetAddr() _, appPort := dialing.Container.GetAddr() - _, err = tc.create(acceptingCluster) + _, secondClientProxy, assertionAdditionalResources, err := tc.create(acceptingCluster, dialingCluster) require.NoError(t, err) tc.extraAssertion(appPort) @@ -145,6 +266,12 @@ func TestPeering_UpgradeToTarget_fromLatest(t *testing.T) { require.NoError(t, accepting.Container.Restart()) libassert.HTTPServiceEchoes(t, "localhost", staticClientPort, "") + // restart the secondClientProxy if exist + if secondClientProxy != nil { + require.NoError(t, secondClientProxy.Restart()) + } + assertionAdditionalResources() + clientSidecarService, err := libservice.CreateAndRegisterStaticClientSidecar(dialingCluster.Servers()[0], libtopology.DialingPeerName, true) require.NoError(t, err) _, port := clientSidecarService.GetAddr() @@ -165,3 +292,64 @@ func TestPeering_UpgradeToTarget_fromLatest(t *testing.T) { // time.Sleep(3 * time.Second) } } + +// createAndRegisterStaticClientSidecarWithSplittingUpstreams creates a static-client-1 that +// has two upstreams: split-static-server (5000) and peer-static-server (5001) +func createAndRegisterStaticClientSidecarWithSplittingUpstreams(c *cluster.Cluster) (*libservice.ConnectContainer, error) { + // Do some trickery to ensure that partial completion is correctly torn + // down, but successful execution is not. + var deferClean utils.ResettableDefer + defer deferClean.Execute() + + node := c.Servers()[0] + mgwMode := api.MeshGatewayModeLocal + + // Register the static-client service and sidecar first to prevent race with sidecar + // trying to get xDS before it's ready + req := &api.AgentServiceRegistration{ + Name: libservice.StaticClientServiceName, + Port: 8080, + Connect: &api.AgentServiceConnect{ + SidecarService: &api.AgentServiceRegistration{ + Proxy: &api.AgentServiceConnectProxyConfig{ + Upstreams: []api.Upstream{ + { + DestinationName: "split-static-server", + LocalBindAddress: "0.0.0.0", + LocalBindPort: cluster.ServiceUpstreamLocalBindPort, + MeshGateway: api.MeshGatewayConfig{ + Mode: mgwMode, + }, + }, + { + DestinationName: "peer-static-server", + LocalBindAddress: "0.0.0.0", + LocalBindPort: cluster.ServiceUpstreamLocalBindPort2, + MeshGateway: api.MeshGatewayConfig{ + Mode: mgwMode, + }, + }, + }, + }, + }, + }, + } + + if err := node.GetClient().Agent().ServiceRegister(req); err != nil { + return nil, err + } + + // Create a service and proxy instance + clientConnectProxy, err := libservice.NewConnectService(context.Background(), fmt.Sprintf("%s-sidecar", libservice.StaticClientServiceName), libservice.StaticClientServiceName, []int{cluster.ServiceUpstreamLocalBindPort, cluster.ServiceUpstreamLocalBindPort2}, node) + if err != nil { + return nil, err + } + deferClean.Add(func() { + _ = clientConnectProxy.Terminate() + }) + + // disable cleanup functions now that we have an object with a Terminate() function + deferClean.Reset() + + return clientConnectProxy, nil +}