You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
consul/test-integ/topoutil/asserter.go

534 lines
18 KiB

[COMPLIANCE] License changes (#18443) * Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Updating the license from MPL to Business Source License Going forward, this project will be licensed under the Business Source License v1.1. Please see our blog post for more details at <Blog URL>, FAQ at www.hashicorp.com/licensing-faq, and details of the license at www.hashicorp.com/bsl. * add missing license headers * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 --------- Co-authored-by: hashicorp-copywrite[bot] <110428419+hashicorp-copywrite[bot]@users.noreply.github.com>
1 year ago
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package topoutil
import (
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"os"
"regexp"
"strings"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/proto-public/pbresource"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/sdk/testutil/retry"
libassert "github.com/hashicorp/consul/test/integration/consul-container/libs/assert"
"github.com/hashicorp/consul/test/integration/consul-container/libs/utils"
"github.com/hashicorp/consul/testing/deployer/topology"
)
// Asserter is a utility to help in reducing boilerplate in invoking test
// assertions against consul-topology Sprawl components.
//
// The methods should largely take in *topology.Service instances in lieu of
// ip/ports if there is only one port that makes sense for the assertion (such
// as use of the envoy admin port 19000).
//
// If it's up to the test (like picking an upstream) leave port as an argument
// but still take the service and use that to grab the local ip from the
// topology.Node.
type Asserter struct {
sp SprawlLite
}
// *sprawl.Sprawl satisfies this. We don't need anything else.
type SprawlLite interface {
HTTPClientForCluster(clusterName string) (*http.Client, error)
APIClientForNode(clusterName string, nid topology.NodeID, token string) (*api.Client, error)
APIClientForCluster(clusterName string, token string) (*api.Client, error)
ResourceServiceClientForCluster(clusterName string) pbresource.ResourceServiceClient
Topology() *topology.Topology
}
// NewAsserter creates a new assertion helper for the provided sprawl.
func NewAsserter(sp SprawlLite) *Asserter {
return &Asserter{
sp: sp,
}
}
func (a *Asserter) mustGetHTTPClient(t testutil.TestingTB, cluster string) *http.Client {
client, err := a.httpClientFor(cluster)
require.NoError(t, err)
return client
}
func (a *Asserter) mustGetAPIClient(t testutil.TestingTB, cluster string) *api.Client {
clu := a.sp.Topology().Clusters[cluster]
cl, err := a.sp.APIClientForCluster(clu.Name, "")
require.NoError(t, err)
return cl
}
// httpClientFor returns a pre-configured http.Client that proxies requests
// through the embedded squid instance in each LAN.
//
// Use this in methods below to magically pick the right proxied http client
// given the home of each node being checked.
func (a *Asserter) httpClientFor(cluster string) (*http.Client, error) {
client, err := a.sp.HTTPClientForCluster(cluster)
if err != nil {
return nil, err
}
return client, nil
}
// UpstreamEndpointStatus validates that proxy was configured with provided clusterName in the healthStatus
//
// Exposes libassert.UpstreamEndpointStatus for use against a Sprawl.
//
// NOTE: this doesn't take a port b/c you always want to use the envoy admin port.
func (a *Asserter) UpstreamEndpointStatus(
t *testing.T,
workload *topology.Workload,
clusterName string,
healthStatus string,
count int,
) {
t.Helper()
node := workload.Node
ip := node.LocalAddress()
port := workload.EnvoyAdminPort
addr := fmt.Sprintf("%s:%d", ip, port)
client := a.mustGetHTTPClient(t, node.Cluster)
libassert.AssertUpstreamEndpointStatusWithClient(t, client, addr, clusterName, healthStatus, count)
}
func (a *Asserter) getEnvoyClient(t *testing.T, workload *topology.Workload) (client *http.Client, addr string) {
node := workload.Node
ip := node.LocalAddress()
port := workload.EnvoyAdminPort
addr = fmt.Sprintf("%s:%d", ip, port)
client = a.mustGetHTTPClient(t, node.Cluster)
return client, addr
}
// AssertEnvoyRunningWithClient asserts that envoy is running by querying its stats page
func (a *Asserter) AssertEnvoyRunningWithClient(t *testing.T, workload *topology.Workload) {
t.Helper()
client, addr := a.getEnvoyClient(t, workload)
libassert.AssertEnvoyRunningWithClient(t, client, addr)
}
// AssertEnvoyPresentsCertURIWithClient makes GET request to /certs endpoint and validates that
// two certificates URI is available in the response
func (a *Asserter) AssertEnvoyPresentsCertURIWithClient(t *testing.T, workload *topology.Workload) {
t.Helper()
client, addr := a.getEnvoyClient(t, workload)
libassert.AssertEnvoyPresentsCertURIWithClient(t, client, addr, workload.ID.Name)
}
func (a *Asserter) AssertEnvoyHTTPrbacFiltersContainIntentions(t *testing.T, workload *topology.Workload) {
t.Helper()
client, addr := a.getEnvoyClient(t, workload)
var (
dump string
err error
)
failer := func() *retry.Timer {
return &retry.Timer{Timeout: 30 * time.Second, Wait: 1 * time.Second}
}
retry.RunWith(failer(), t, func(r *retry.R) {
dump, _, err = libassert.GetEnvoyOutputWithClient(client, addr, "config_dump", map[string]string{})
if err != nil {
r.Fatal("could not fetch envoy configuration")
}
})
// the steps below validate that the json result from envoy config dump configured active listeners with rbac and http filters
filter := `.configs[2].dynamic_listeners[].active_state.listener | "\(.name) \( .filter_chains[].filters[] | select(.name == "envoy.filters.network.http_connection_manager") | .typed_config.http_filters | map(.name) | join(","))"`
errorStateFilter := `.configs[2].dynamic_listeners[].error_state"`
configErrorStates, _ := utils.JQFilter(dump, errorStateFilter)
require.Nil(t, configErrorStates, "there should not be any error states on listener configuration")
results, err := utils.JQFilter(dump, filter)
require.NoError(t, err, "could not parse envoy configuration")
var filteredResult []string
for _, result := range results {
parts := strings.Split(strings.ReplaceAll(result, `,`, " "), " ")
sanitizedResult := append(parts[:0], parts[1:]...)
filteredResult = append(filteredResult, sanitizedResult...)
}
require.Contains(t, filteredResult, "envoy.filters.http.rbac")
require.Contains(t, filteredResult, "envoy.filters.http.router")
require.Contains(t, dump, "intentions")
}
// HTTPServiceEchoes verifies that a post to the given ip/port combination
// returns the data in the response body. Optional path can be provided to
// differentiate requests.
//
// Exposes libassert.HTTPServiceEchoes for use against a Sprawl.
//
// NOTE: this takes a port b/c you may want to reach this via your choice of upstream.
func (a *Asserter) HTTPServiceEchoes(
t *testing.T,
workload *topology.Workload,
port int,
path string,
) {
t.Helper()
require.True(t, port > 0)
node := workload.Node
ip := node.LocalAddress()
addr := fmt.Sprintf("%s:%d", ip, port)
client := a.mustGetHTTPClient(t, node.Cluster)
libassert.HTTPServiceEchoesWithClient(t, client, addr, path)
}
// HTTPServiceEchoesResHeader verifies that a post to the given ip/port combination
// returns the data in the response body with expected response headers.
// Optional path can be provided to differentiate requests.
//
// Exposes libassert.HTTPServiceEchoes for use against a Sprawl.
//
// NOTE: this takes a port b/c you may want to reach this via your choice of upstream.
func (a *Asserter) HTTPServiceEchoesResHeader(
t *testing.T,
workload *topology.Workload,
port int,
path string,
expectedResHeader map[string]string,
) {
t.Helper()
require.True(t, port > 0)
node := workload.Node
ip := node.LocalAddress()
addr := fmt.Sprintf("%s:%d", ip, port)
client := a.mustGetHTTPClient(t, node.Cluster)
libassert.HTTPServiceEchoesResHeaderWithClient(t, client, addr, path, expectedResHeader)
}
func (a *Asserter) HTTPStatus(
t *testing.T,
workload *topology.Workload,
port int,
status int,
) {
t.Helper()
require.True(t, port > 0)
node := workload.Node
ip := node.LocalAddress()
addr := fmt.Sprintf("%s:%d", ip, port)
client := a.mustGetHTTPClient(t, node.Cluster)
url := "http://" + addr
retry.RunWith(&retry.Timer{Timeout: 30 * time.Second, Wait: 500 * time.Millisecond}, t, func(r *retry.R) {
resp, err := client.Get(url)
if err != nil {
r.Fatalf("could not make request to %q: %v", url, err)
}
defer resp.Body.Close()
if resp.StatusCode != status {
r.Fatalf("expected status %d, got %d", status, resp.StatusCode)
}
})
}
// asserts that the service sid in cluster and exported by peer localPeerName is passing health checks,
func (a *Asserter) HealthyWithPeer(t *testing.T, cluster string, sid topology.ID, peerName string) {
t.Helper()
cl := a.mustGetAPIClient(t, cluster)
retry.RunWith(&retry.Timer{Timeout: time.Minute * 1, Wait: time.Millisecond * 500}, t, func(r *retry.R) {
svcs, _, err := cl.Health().Service(
sid.Name,
"",
true,
utils.CompatQueryOpts(&api.QueryOptions{
Partition: sid.Partition,
Namespace: sid.Namespace,
Peer: peerName,
}),
)
require.NoError(r, err)
assert.GreaterOrEqual(r, len(svcs), 1)
})
}
type testingT interface {
require.TestingT
Helper()
}
// does a fortio /fetch2 to the given fortio service, targetting the given upstream. Returns
// the body, and response with response.Body already Closed.
//
// We treat 400, 503, and 504s as retryable errors
func (a *Asserter) fortioFetch2Upstream(
t testutil.TestingTB,
client *http.Client,
addr string,
us *topology.Upstream,
path string,
) (body []byte, res *http.Response) {
t.Helper()
err, res := getFortioFetch2UpstreamResponse(t, client, addr, us, path, nil)
require.NoError(t, err)
defer res.Body.Close()
// not sure when these happen, suspect it's when the mesh gateway in the peer is not yet ready
require.NotEqual(t, http.StatusServiceUnavailable, res.StatusCode)
require.NotEqual(t, http.StatusGatewayTimeout, res.StatusCode)
// not sure when this happens, suspect it's when envoy hasn't configured the local upstream yet
require.NotEqual(t, http.StatusBadRequest, res.StatusCode)
body, err = io.ReadAll(res.Body)
require.NoError(t, err)
return body, res
}
func getFortioFetch2UpstreamResponse(t testutil.TestingTB, client *http.Client, addr string, us *topology.Upstream, path string, headers map[string]string) (error, *http.Response) {
actualURL := fmt.Sprintf("http://localhost:%d/%s", us.LocalPort, path)
url := fmt.Sprintf("http://%s/fortio/fetch2?url=%s", addr,
url.QueryEscape(actualURL),
)
req, err := http.NewRequest(http.MethodPost, url, nil)
require.NoError(t, err)
for h, v := range headers {
req.Header.Set(h, v)
}
res, err := client.Do(req)
require.NoError(t, err)
return err, res
}
// uses the /fortio/fetch2 endpoint to do a header echo check against an
// upstream fortio
func (a *Asserter) FortioFetch2HeaderEcho(t *testing.T, fortioWrk *topology.Workload, us *topology.Upstream) {
const kPassphrase = "x-passphrase"
const passphrase = "hello"
path := (fmt.Sprintf("/?header=%s:%s", kPassphrase, passphrase))
var (
node = fortioWrk.Node
addr = fmt.Sprintf("%s:%d", node.LocalAddress(), fortioWrk.Port)
client = a.mustGetHTTPClient(t, node.Cluster)
)
retry.RunWith(&retry.Timer{Timeout: 60 * time.Second, Wait: time.Millisecond * 500}, t, func(r *retry.R) {
_, res := a.fortioFetch2Upstream(r, client, addr, us, path)
require.Equal(r, http.StatusOK, res.StatusCode)
v := res.Header.Get(kPassphrase)
require.Equal(r, passphrase, v)
})
}
// similar to libassert.AssertFortioName,
// uses the /fortio/fetch2 endpoint to hit the debug endpoint on the upstream,
// and assert that the FORTIO_NAME == name
func (a *Asserter) FortioFetch2FortioName(
t *testing.T,
fortioWrk *topology.Workload,
us *topology.Upstream,
clusterName string,
sid topology.ID,
) {
t.Helper()
var (
node = fortioWrk.Node
addr = fmt.Sprintf("%s:%d", node.LocalAddress(), fortioWrk.Port)
client = a.mustGetHTTPClient(t, node.Cluster)
)
var fortioNameRE = regexp.MustCompile(("\nFORTIO_NAME=(.+)\n"))
path := "/debug?env=dump"
retry.RunWith(&retry.Timer{Timeout: 60 * time.Second, Wait: time.Millisecond * 500}, t, func(r *retry.R) {
body, res := a.fortioFetch2Upstream(r, client, addr, us, path)
require.Equal(r, http.StatusOK, res.StatusCode)
// TODO: not sure we should retry these?
m := fortioNameRE.FindStringSubmatch(string(body))
require.GreaterOrEqual(r, len(m), 2)
// TODO: dedupe from NewFortioService
require.Equal(r, fmt.Sprintf("%s::%s", clusterName, sid.String()), m[1])
})
}
func (a *Asserter) FortioFetch2ServiceUnavailable(t *testing.T, fortioWrk *topology.Workload, us *topology.Upstream) {
const kPassphrase = "x-passphrase"
const passphrase = "hello"
path := (fmt.Sprintf("/?header=%s:%s", kPassphrase, passphrase))
a.FortioFetch2ServiceStatusCodes(t, fortioWrk, us, path, nil, []int{http.StatusServiceUnavailable})
}
// FortioFetch2ServiceStatusCodes uses the /fortio/fetch2 endpoint to do a header echo check against a upstream
// fortio and asserts that the returned status code matches the desired one(s)
func (a *Asserter) FortioFetch2ServiceStatusCodes(t *testing.T, fortioWrk *topology.Workload, us *topology.Upstream, path string, headers map[string]string, statuses []int) {
var (
node = fortioWrk.Node
addr = fmt.Sprintf("%s:%d", node.LocalAddress(), fortioWrk.Port)
client = a.mustGetHTTPClient(t, node.Cluster)
)
retry.RunWith(&retry.Timer{Timeout: 60 * time.Second, Wait: time.Millisecond * 500}, t, func(r *retry.R) {
_, res := getFortioFetch2UpstreamResponse(r, client, addr, us, path, headers)
defer res.Body.Close()
require.Contains(r, statuses, res.StatusCode)
})
}
// CatalogServiceExists is the same as libassert.CatalogServiceExists, except that it uses
// a proxied API client
func (a *Asserter) CatalogServiceExists(t *testing.T, cluster string, svc string, opts *api.QueryOptions) {
t.Helper()
cl := a.mustGetAPIClient(t, cluster)
libassert.CatalogServiceExists(t, cl, svc, opts)
}
// HealthServiceEntries asserts the service has the expected number of instances and checks
func (a *Asserter) HealthServiceEntries(t *testing.T, cluster string, node *topology.Node, svc string, passingOnly bool, opts *api.QueryOptions, expectedInstance int, expectedChecks int) []*api.ServiceEntry {
t.Helper()
cl, err := a.sp.APIClientForNode(cluster, node.ID(), "")
require.NoError(t, err)
health := cl.Health()
var serviceEntries []*api.ServiceEntry
retry.RunWith(&retry.Timer{Timeout: 60 * time.Second, Wait: time.Millisecond * 500}, t, func(r *retry.R) {
serviceEntries, _, err = health.Service(svc, "", passingOnly, opts)
require.NoError(r, err)
require.Equalf(r, expectedInstance, len(serviceEntries), "dc: %s, service: %s", cluster, serviceEntries[0].Service.Service)
require.Equalf(r, expectedChecks, len(serviceEntries[0].Checks), "dc: %s, service: %s", cluster, serviceEntries[0].Service.Service)
})
return serviceEntries
}
// TokenExist asserts the token exists in the cluster and identical to the expected token
func (a *Asserter) TokenExist(t *testing.T, cluster string, node *topology.Node, expectedToken *api.ACLToken) {
t.Helper()
cl, err := a.sp.APIClientForNode(cluster, node.ID(), "")
require.NoError(t, err)
acl := cl.ACL()
retry.RunWith(&retry.Timer{Timeout: 60 * time.Second, Wait: time.Millisecond * 500}, t, func(r *retry.R) {
retrievedToken, _, err := acl.TokenRead(expectedToken.AccessorID, &api.QueryOptions{})
require.NoError(r, err)
require.True(r, cmp.Equal(expectedToken, retrievedToken), "token %s", expectedToken.Description)
})
}
// AutopilotHealth asserts the autopilot health endpoint return expected state
func (a *Asserter) AutopilotHealth(t *testing.T, cluster *topology.Cluster, leaderName string, expectedHealthy bool) {
t.Helper()
retry.RunWith(&retry.Timer{Timeout: 60 * time.Second, Wait: time.Millisecond * 10}, t, func(r *retry.R) {
var out *api.OperatorHealthReply
for _, node := range cluster.Nodes {
if node.Name == leaderName {
client, err := a.sp.APIClientForNode(cluster.Name, node.ID(), "")
if err != nil {
r.Log("err at node", node.Name, err)
continue
}
operator := client.Operator()
out, err = operator.AutopilotServerHealth(&api.QueryOptions{})
if err != nil {
r.Log("err at node", node.Name, err)
continue
}
// Got the Autopilot server health response, break the loop
r.Log("Got response at node", node.Name)
break
}
}
r.Log("out", out, "health", out.Healthy)
require.Equal(r, expectedHealthy, out.Healthy)
})
return
}
type AuditEntry struct {
CreatedAt time.Time `json:"created_at"`
EventType string `json:"event_type"`
Payload Payload `json:"payload"`
}
type Payload struct {
ID string `json:"id"`
Version string `json:"version"`
Type string `json:"type"`
Timestamp time.Time `json:"timestamp"`
Auth Auth `json:"auth"`
Request Request `json:"request"`
Response Response `json:"response,omitempty"` // Response is not present in OperationStart log
Stage string `json:"stage"`
}
type Auth struct {
AccessorID string `json:"accessor_id"`
Description string `json:"description"`
CreateTime time.Time `json:"create_time"`
}
type Request struct {
Operation string `json:"operation"`
Endpoint string `json:"endpoint"`
RemoteAddr string `json:"remote_addr"`
UserAgent string `json:"user_agent"`
Host string `json:"host"`
QueryParams map[string]string `json:"query_params,omitempty"` // QueryParams might not be present in all cases
}
type Response struct {
Status string `json:"status"`
Error string `json:"error,omitempty"` // Error field is optional,shows up only with the status is non 2xx
}
func (a *Asserter) ValidateHealthEndpointAuditLog(t *testing.T, filePath string) {
boolIsErrorJSON := false
jsonData, err := os.ReadFile(filePath)
require.NoError(t, err)
// Print details of each AuditEntry
var entry AuditEntry
events := strings.Split(strings.TrimSpace(string(jsonData)), "\n")
// function to validate the error object when the health endpoint is returning 429
isErrorJSONObject := func(errorString string) error {
// Attempt to unmarshal the error string into a map[string]interface{}
var m map[string]interface{}
err := json.Unmarshal([]byte(errorString), &m)
return err
}
for _, event := range events {
if strings.Contains(event, "/v1/operator/autopilot/health") && strings.Contains(event, "OperationComplete") {
err = json.Unmarshal([]byte(event), &entry)
require.NoError(t, err, "Error unmarshalling JSON: %v\", err")
if entry.Payload.Response.Status == "429" {
boolIsErrorJSON = true
errResponse := entry.Payload.Response.Error
err = isErrorJSONObject(errResponse)
require.NoError(t, err, "Autopilot Health endpoint error response in the audit log is in unexpected format: %v\", err")
break
}
}
}
require.Equal(t, boolIsErrorJSON, true, "Unable to verify audit log health endpoint error message")
}