consul/test/integration/consul-container/libs/service/helpers.go

415 lines
13 KiB
Go
Raw Normal View History

// Copyright (c) HashiCorp, Inc.
[COMPLIANCE] License changes (#18443) * Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Updating the license from MPL to Business Source License Going forward, this project will be licensed under the Business Source License v1.1. Please see our blog post for more details at <Blog URL>, FAQ at www.hashicorp.com/licensing-faq, and details of the license at www.hashicorp.com/bsl. * add missing license headers * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 --------- Co-authored-by: hashicorp-copywrite[bot] <110428419+hashicorp-copywrite[bot]@users.noreply.github.com>
2023-08-11 13:12:13 +00:00
// SPDX-License-Identifier: BUSL-1.1
package service
import (
"context"
"fmt"
"testing"
2023-02-22 17:52:14 +00:00
"github.com/hashicorp/consul/api"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
libcluster "github.com/hashicorp/consul/test/integration/consul-container/libs/cluster"
"github.com/hashicorp/consul/test/integration/consul-container/libs/utils"
)
const (
StaticServerServiceName = "static-server"
StaticServer2ServiceName = "static-server-2"
StaticClientServiceName = "static-client"
)
2023-02-22 17:52:14 +00:00
type Checks struct {
Name string
TTL string
}
type SidecarService struct {
Port int
Proxy ConnectProxy
}
type ConnectProxy struct {
Mode string
2023-02-22 17:52:14 +00:00
}
2023-02-03 15:20:22 +00:00
type ServiceOpts struct {
Name string
ID string
Meta map[string]string
HTTPPort int
GRPCPort int
// if true, register GRPC port instead of HTTP (default)
RegisterGRPC bool
Checks Checks
Connect SidecarService
Namespace string
Partition string
Locality *api.Locality
Upstreams []api.Upstream
2023-02-03 15:20:22 +00:00
}
2023-02-22 17:52:14 +00:00
// createAndRegisterStaticServerAndSidecar register the services and launch static-server containers
func createAndRegisterStaticServerAndSidecar(node libcluster.Agent, httpPort int, grpcPort int, svc *api.AgentServiceRegistration, customContainerCfg func(testcontainers.ContainerRequest) testcontainers.ContainerRequest, containerArgs ...string) (Service, Service, error) {
// Do some trickery to ensure that partial completion is correctly torn
// down, but successful execution is not.
var deferClean utils.ResettableDefer
defer deferClean.Execute()
2023-02-22 17:52:14 +00:00
if err := node.GetClient().Agent().ServiceRegister(svc); err != nil {
return nil, nil, err
}
// Create a service and proxy instance
serverService, err := NewExampleService(context.Background(), svc.ID, httpPort, grpcPort, node, containerArgs...)
if err != nil {
return nil, nil, err
}
deferClean.Add(func() {
_ = serverService.Terminate()
})
sidecarCfg := SidecarConfig{
Name: fmt.Sprintf("%s-sidecar", svc.ID),
ServiceID: svc.ID,
Namespace: svc.Namespace,
Partition: svc.Partition,
EnableTProxy: svc.Connect != nil &&
svc.Connect.SidecarService != nil &&
svc.Connect.SidecarService.Proxy != nil &&
svc.Connect.SidecarService.Proxy.Mode == api.ProxyModeTransparent,
}
serverConnectProxy, err := NewConnectService(context.Background(), sidecarCfg, []int{svc.Port}, node, customContainerCfg) // bindPort not used
if err != nil {
return nil, nil, err
}
deferClean.Add(func() {
_ = serverConnectProxy.Terminate()
})
// disable cleanup functions now that we have an object with a Terminate() function
deferClean.Reset()
return serverService, serverConnectProxy, nil
}
// createAndRegisterCustomServiceAndSidecar creates a custom service from the given testcontainers.ContainerRequest
// and a sidecar proxy for the service. The customContainerCfg parameter is used to mutate the
// testcontainers.ContainerRequest for the sidecar proxy.
func createAndRegisterCustomServiceAndSidecar(node libcluster.Agent,
httpPort int,
grpcPort int,
svc *api.AgentServiceRegistration,
request testcontainers.ContainerRequest,
customContainerCfg func(testcontainers.ContainerRequest) testcontainers.ContainerRequest,
) (Service, Service, error) {
// Do some trickery to ensure that partial completion is correctly torn
// down, but successful execution is not.
var deferClean utils.ResettableDefer
defer deferClean.Execute()
if err := node.GetClient().Agent().ServiceRegister(svc); err != nil {
return nil, nil, err
}
// Create a service and proxy instance
serverService, err := NewCustomService(context.Background(), svc.ID, httpPort, grpcPort, node, request)
if err != nil {
return nil, nil, err
}
deferClean.Add(func() {
_ = serverService.Terminate()
})
sidecarCfg := SidecarConfig{
Name: fmt.Sprintf("%s-sidecar", svc.ID),
ServiceID: svc.ID,
Namespace: svc.Namespace,
EnableTProxy: svc.Connect != nil &&
svc.Connect.SidecarService != nil &&
svc.Connect.SidecarService.Proxy != nil &&
svc.Connect.SidecarService.Proxy.Mode == api.ProxyModeTransparent,
}
serverConnectProxy, err := NewConnectService(context.Background(), sidecarCfg, []int{svc.Port}, node, customContainerCfg) // bindPort not used
if err != nil {
return nil, nil, err
}
deferClean.Add(func() {
_ = serverConnectProxy.Terminate()
})
// disable cleanup functions now that we have an object with a Terminate() function
deferClean.Reset()
return serverService, serverConnectProxy, nil
}
func CreateAndRegisterCustomServiceAndSidecar(node libcluster.Agent,
serviceOpts *ServiceOpts,
request testcontainers.ContainerRequest,
customContainerCfg func(testcontainers.ContainerRequest) testcontainers.ContainerRequest) (Service, Service, error) {
// Register the static-server service and sidecar first to prevent race with sidecar
// trying to get xDS before it's ready
p := serviceOpts.HTTPPort
agentCheck := api.AgentServiceCheck{
Name: "Static Server Listening",
TCP: fmt.Sprintf("127.0.0.1:%d", p),
Interval: "10s",
Status: api.HealthPassing,
}
if serviceOpts.RegisterGRPC {
p = serviceOpts.GRPCPort
agentCheck.TCP = ""
agentCheck.GRPC = fmt.Sprintf("127.0.0.1:%d", p)
}
req := &api.AgentServiceRegistration{
Name: serviceOpts.Name,
ID: serviceOpts.ID,
Port: p,
Connect: &api.AgentServiceConnect{
SidecarService: &api.AgentServiceRegistration{
Proxy: &api.AgentServiceConnectProxyConfig{
Mode: api.ProxyMode(serviceOpts.Connect.Proxy.Mode),
},
},
},
Namespace: serviceOpts.Namespace,
Partition: serviceOpts.Partition,
Locality: serviceOpts.Locality,
Meta: serviceOpts.Meta,
Check: &agentCheck,
}
return createAndRegisterCustomServiceAndSidecar(node, serviceOpts.HTTPPort, serviceOpts.GRPCPort, req, request, customContainerCfg)
}
// CreateAndRegisterStaticServerAndSidecarWithCustomContainerConfig creates an example static server and a sidecar for
// the service. The customContainerCfg parameter is a function of testcontainers.ContainerRequest to
// testcontainers.ContainerRequest which can be used to mutate the container request for the sidecar proxy and inject
// custom configuration and lifecycle hooks.
func CreateAndRegisterStaticServerAndSidecarWithCustomContainerConfig(node libcluster.Agent,
serviceOpts *ServiceOpts,
customContainerCfg func(testcontainers.ContainerRequest) testcontainers.ContainerRequest,
containerArgs ...string) (Service, Service, error) {
2023-02-22 17:52:14 +00:00
// Register the static-server service and sidecar first to prevent race with sidecar
// trying to get xDS before it's ready
p := serviceOpts.HTTPPort
agentCheck := api.AgentServiceCheck{
Name: "Static Server Listening",
TCP: fmt.Sprintf("127.0.0.1:%d", p),
Interval: "10s",
Status: api.HealthPassing,
}
if serviceOpts.RegisterGRPC {
p = serviceOpts.GRPCPort
agentCheck.TCP = ""
agentCheck.GRPC = fmt.Sprintf("127.0.0.1:%d", p)
}
2023-02-22 17:52:14 +00:00
req := &api.AgentServiceRegistration{
Name: serviceOpts.Name,
ID: serviceOpts.ID,
Port: p,
2023-02-22 17:52:14 +00:00
Connect: &api.AgentServiceConnect{
SidecarService: &api.AgentServiceRegistration{
Proxy: &api.AgentServiceConnectProxyConfig{
Mode: api.ProxyMode(serviceOpts.Connect.Proxy.Mode),
},
2023-02-22 17:52:14 +00:00
},
},
Namespace: serviceOpts.Namespace,
Partition: serviceOpts.Partition,
Locality: serviceOpts.Locality,
Meta: serviceOpts.Meta,
Check: &agentCheck,
2023-02-22 17:52:14 +00:00
}
return createAndRegisterStaticServerAndSidecar(node, serviceOpts.HTTPPort, serviceOpts.GRPCPort, req, customContainerCfg, containerArgs...)
}
func CreateAndRegisterStaticServerAndSidecar(node libcluster.Agent, serviceOpts *ServiceOpts, containerArgs ...string) (Service, Service, error) {
return CreateAndRegisterStaticServerAndSidecarWithCustomContainerConfig(node, serviceOpts, nil, containerArgs...)
2023-02-22 17:52:14 +00:00
}
func CreateAndRegisterStaticServerAndSidecarWithChecks(node libcluster.Agent, serviceOpts *ServiceOpts) (Service, Service, error) {
// Register the static-server service and sidecar first to prevent race with sidecar
// trying to get xDS before it's ready
req := &api.AgentServiceRegistration{
Name: serviceOpts.Name,
ID: serviceOpts.ID,
Port: serviceOpts.HTTPPort,
Connect: &api.AgentServiceConnect{
SidecarService: &api.AgentServiceRegistration{
Proxy: &api.AgentServiceConnectProxyConfig{
Mode: api.ProxyMode(serviceOpts.Connect.Proxy.Mode),
},
Port: serviceOpts.Connect.Port,
2023-02-22 17:52:14 +00:00
},
},
Checks: api.AgentServiceChecks{
{
Name: serviceOpts.Checks.Name,
TTL: serviceOpts.Checks.TTL,
},
},
Meta: serviceOpts.Meta,
Namespace: serviceOpts.Namespace,
Partition: serviceOpts.Partition,
Locality: serviceOpts.Locality,
2023-02-22 17:52:14 +00:00
}
return createAndRegisterStaticServerAndSidecar(node, serviceOpts.HTTPPort, serviceOpts.GRPCPort, req, nil)
2023-02-22 17:52:14 +00:00
}
func CreateAndRegisterStaticClientSidecar(
node libcluster.Agent,
peerName string,
localMeshGateway bool,
enableTProxy bool,
serviceOpts *ServiceOpts,
) (*ConnectContainer, error) {
// Do some trickery to ensure that partial completion is correctly torn
// down, but successful execution is not.
var deferClean utils.ResettableDefer
defer deferClean.Execute()
var proxy *api.AgentServiceConnectProxyConfig
if enableTProxy {
proxy = &api.AgentServiceConnectProxyConfig{
Mode: "transparent",
}
} else {
mgwMode := api.MeshGatewayModeRemote
if localMeshGateway {
mgwMode = api.MeshGatewayModeLocal
}
proxy = &api.AgentServiceConnectProxyConfig{
Upstreams: []api.Upstream{{
DestinationName: StaticServerServiceName,
DestinationPeer: peerName,
LocalBindAddress: "0.0.0.0",
LocalBindPort: libcluster.ServiceUpstreamLocalBindPort,
MeshGateway: api.MeshGatewayConfig{
Mode: mgwMode,
},
}},
}
}
// Register the static-client service and sidecar first to prevent race with sidecar
// trying to get xDS before it's ready
req := &api.AgentServiceRegistration{
Name: StaticClientServiceName,
Port: 8080,
Connect: &api.AgentServiceConnect{
SidecarService: &api.AgentServiceRegistration{
Proxy: proxy,
},
},
}
// Set relevant fields for static client if opts are provided
if serviceOpts != nil {
if serviceOpts.Connect.Proxy.Mode != "" {
return nil, fmt.Errorf("this helper does not support directly setting connect proxy mode; use enableTProxy and/or localMeshGateway instead")
}
// These options are defaulted above, so only set them as overrides
if serviceOpts.Name != "" {
req.Name = serviceOpts.Name
}
if serviceOpts.HTTPPort != 0 {
req.Port = serviceOpts.HTTPPort
}
if serviceOpts.Connect.Port != 0 {
req.Connect.SidecarService.Port = serviceOpts.Connect.Port
}
if len(serviceOpts.Upstreams) > 0 {
req.Connect.SidecarService.Proxy.Upstreams = serviceOpts.Upstreams
}
req.Meta = serviceOpts.Meta
req.Namespace = serviceOpts.Namespace
req.Partition = serviceOpts.Partition
req.Locality = serviceOpts.Locality
}
if err := node.GetClient().Agent().ServiceRegister(req); err != nil {
return nil, err
}
// Create a service and proxy instance
sidecarCfg := SidecarConfig{
Name: fmt.Sprintf("%s-sidecar", StaticClientServiceName),
ServiceID: StaticClientServiceName,
EnableTProxy: enableTProxy,
}
clientConnectProxy, err := NewConnectService(context.Background(), sidecarCfg, []int{libcluster.ServiceUpstreamLocalBindPort}, node, nil)
if err != nil {
return nil, err
}
deferClean.Add(func() {
_ = clientConnectProxy.Terminate()
})
// disable cleanup functions now that we have an object with a Terminate() function
deferClean.Reset()
return clientConnectProxy, nil
}
func ClientsCreate(t *testing.T, numClients int, image, version string, cluster *libcluster.Cluster) {
opts := libcluster.BuildOptions{
ConsulImageName: image,
ConsulVersion: version,
}
ctx := libcluster.NewBuildContext(t, opts)
conf := libcluster.NewConfigBuilder(ctx).
Client().
ToAgentConfig(t)
t.Logf("Cluster client config:\n%s", conf.JSON)
require.NoError(t, cluster.AddN(*conf, numClients, true))
}
func ServiceCreate(t *testing.T, client *api.Client, serviceName string) uint64 {
require.NoError(t, client.Agent().ServiceRegister(&api.AgentServiceRegistration{
Name: serviceName,
Port: 9999,
Connect: &api.AgentServiceConnect{
SidecarService: &api.AgentServiceRegistration{
Port: 22005,
},
},
}))
service, meta, err := client.Catalog().Service(serviceName, "", &api.QueryOptions{})
require.NoError(t, err)
require.Len(t, service, 1)
require.Equal(t, serviceName, service[0].ServiceName)
require.Equal(t, 9999, service[0].ServicePort)
return meta.LastIndex
}
func ServiceHealthBlockingQuery(client *api.Client, serviceName string, waitIndex uint64) (chan []*api.ServiceEntry, chan error) {
var (
ch = make(chan []*api.ServiceEntry, 1)
errCh = make(chan error, 1)
)
go func() {
opts := &api.QueryOptions{WaitIndex: waitIndex}
service, q, err := client.Health().Service(serviceName, "", false, opts)
if err == nil && q.QueryBackend != api.QueryBackendStreaming {
err = fmt.Errorf("invalid backend for this test %s", q.QueryBackend)
}
if err != nil {
errCh <- err
} else {
ch <- service
}
}()
return ch, errCh
}