|
|
|
@ -2,11 +2,13 @@ package xds
|
|
|
|
|
|
|
|
|
|
import ( |
|
|
|
|
"errors" |
|
|
|
|
"strconv" |
|
|
|
|
"strings" |
|
|
|
|
"sync/atomic" |
|
|
|
|
"testing" |
|
|
|
|
"time" |
|
|
|
|
|
|
|
|
|
"github.com/armon/go-metrics" |
|
|
|
|
envoy_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" |
|
|
|
|
"github.com/stretchr/testify/require" |
|
|
|
|
rpcstatus "google.golang.org/genproto/googleapis/rpc/status" |
|
|
|
@ -19,7 +21,9 @@ import (
|
|
|
|
|
"github.com/hashicorp/consul/agent/proxycfg" |
|
|
|
|
"github.com/hashicorp/consul/agent/structs" |
|
|
|
|
"github.com/hashicorp/consul/agent/xds/xdscommon" |
|
|
|
|
"github.com/hashicorp/consul/api" |
|
|
|
|
"github.com/hashicorp/consul/sdk/testutil" |
|
|
|
|
"github.com/hashicorp/consul/version" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
// NOTE: For these tests, prefer not using xDS protobuf "factory" methods if
|
|
|
|
@ -43,7 +47,20 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
|
|
|
|
|
var snap *proxycfg.ConfigSnapshot |
|
|
|
|
|
|
|
|
|
testutil.RunStep(t, "initial setup", func(t *testing.T) { |
|
|
|
|
snap = newTestSnapshot(t, nil, "") |
|
|
|
|
snap = newTestSnapshot(t, nil, "", &structs.ProxyConfigEntry{ |
|
|
|
|
Kind: structs.ProxyDefaults, |
|
|
|
|
Name: structs.ProxyConfigGlobal, |
|
|
|
|
EnvoyExtensions: []structs.EnvoyExtension{ |
|
|
|
|
{ |
|
|
|
|
Name: api.BuiltinLuaExtension, |
|
|
|
|
Arguments: map[string]interface{}{ |
|
|
|
|
"ProxyType": "connect-proxy", |
|
|
|
|
"Listener": "inbound", |
|
|
|
|
"Script": "x = 0", |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
// Send initial cluster discover. We'll assume we are testing a partial
|
|
|
|
|
// reconnect and include some initial resource versions that will be
|
|
|
|
@ -153,6 +170,8 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
|
|
|
|
|
|
|
|
|
|
// We are caught up, so there should be nothing queued to send.
|
|
|
|
|
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) |
|
|
|
|
|
|
|
|
|
requireExtensionMetrics(t, scenario, api.BuiltinLuaExtension, sid, nil) |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
deleteAllButOneEndpoint := func(snap *proxycfg.ConfigSnapshot, uid proxycfg.UpstreamID, targetID string) { |
|
|
|
@ -1524,3 +1543,39 @@ func mustMakeVersionMap(t *testing.T, resources ...proto.Message) map[string]str
|
|
|
|
|
} |
|
|
|
|
return m |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func requireExtensionMetrics( |
|
|
|
|
t *testing.T, |
|
|
|
|
scenario *testServerScenario, |
|
|
|
|
extName string, |
|
|
|
|
sid structs.ServiceID, |
|
|
|
|
err error, |
|
|
|
|
) { |
|
|
|
|
data := scenario.sink.Data() |
|
|
|
|
require.Len(t, data, 1) |
|
|
|
|
item := data[0] |
|
|
|
|
|
|
|
|
|
expectLabels := []metrics.Label{ |
|
|
|
|
{Name: "extension", Value: extName}, |
|
|
|
|
{Name: "version", Value: "builtin/" + version.Version}, |
|
|
|
|
{Name: "service", Value: sid.ID}, |
|
|
|
|
{Name: "partition", Value: sid.PartitionOrDefault()}, |
|
|
|
|
{Name: "namespace", Value: sid.NamespaceOrDefault()}, |
|
|
|
|
{Name: "error", Value: strconv.FormatBool(err != nil)}, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for _, s := range []string{ |
|
|
|
|
"consul.xds.test.envoy_extension.validate_arguments;", |
|
|
|
|
"consul.xds.test.envoy_extension.validate;", |
|
|
|
|
"consul.xds.test.envoy_extension.extend;", |
|
|
|
|
} { |
|
|
|
|
foundLabel := false |
|
|
|
|
for k, v := range item.Samples { |
|
|
|
|
if strings.HasPrefix(k, s) { |
|
|
|
|
foundLabel = true |
|
|
|
|
require.ElementsMatch(t, expectLabels, v.Labels) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
require.True(t, foundLabel) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|