|
|
@ -32,8 +32,6 @@ const (
|
|
|
|
// - logs for exceeding
|
|
|
|
// - logs for exceeding
|
|
|
|
|
|
|
|
|
|
|
|
func TestServerRequestRateLimit(t *testing.T) {
|
|
|
|
func TestServerRequestRateLimit(t *testing.T) {
|
|
|
|
t.Parallel()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
type action struct {
|
|
|
|
type action struct {
|
|
|
|
function func(client *api.Client) error
|
|
|
|
function func(client *api.Client) error
|
|
|
|
rateLimitOperation string
|
|
|
|
rateLimitOperation string
|
|
|
@ -52,6 +50,7 @@ func TestServerRequestRateLimit(t *testing.T) {
|
|
|
|
mode string
|
|
|
|
mode string
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// getKV and putKV are net/RPC calls
|
|
|
|
getKV := action{
|
|
|
|
getKV := action{
|
|
|
|
function: func(client *api.Client) error {
|
|
|
|
function: func(client *api.Client) error {
|
|
|
|
_, _, err := client.KV().Get("foo", &api.QueryOptions{})
|
|
|
|
_, _, err := client.KV().Get("foo", &api.QueryOptions{})
|
|
|
@ -99,13 +98,13 @@ func TestServerRequestRateLimit(t *testing.T) {
|
|
|
|
action: putKV,
|
|
|
|
action: putKV,
|
|
|
|
expectedErrorMsg: "",
|
|
|
|
expectedErrorMsg: "",
|
|
|
|
expectExceededLog: true,
|
|
|
|
expectExceededLog: true,
|
|
|
|
expectMetric: false,
|
|
|
|
expectMetric: true,
|
|
|
|
},
|
|
|
|
},
|
|
|
|
{
|
|
|
|
{
|
|
|
|
action: getKV,
|
|
|
|
action: getKV,
|
|
|
|
expectedErrorMsg: "",
|
|
|
|
expectedErrorMsg: "",
|
|
|
|
expectExceededLog: true,
|
|
|
|
expectExceededLog: true,
|
|
|
|
expectMetric: false,
|
|
|
|
expectMetric: true,
|
|
|
|
},
|
|
|
|
},
|
|
|
|
},
|
|
|
|
},
|
|
|
|
},
|
|
|
|
},
|
|
|
@ -127,10 +126,13 @@ func TestServerRequestRateLimit(t *testing.T) {
|
|
|
|
expectMetric: true,
|
|
|
|
expectMetric: true,
|
|
|
|
},
|
|
|
|
},
|
|
|
|
},
|
|
|
|
},
|
|
|
|
}}
|
|
|
|
},
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
for _, tc := range testCases {
|
|
|
|
for _, tc := range testCases {
|
|
|
|
|
|
|
|
tc := tc
|
|
|
|
t.Run(tc.description, func(t *testing.T) {
|
|
|
|
t.Run(tc.description, func(t *testing.T) {
|
|
|
|
|
|
|
|
t.Parallel()
|
|
|
|
clusterConfig := &libtopology.ClusterConfig{
|
|
|
|
clusterConfig := &libtopology.ClusterConfig{
|
|
|
|
NumServers: 1,
|
|
|
|
NumServers: 1,
|
|
|
|
NumClients: 0,
|
|
|
|
NumClients: 0,
|
|
|
@ -144,12 +146,9 @@ func TestServerRequestRateLimit(t *testing.T) {
|
|
|
|
ApplyDefaultProxySettings: false,
|
|
|
|
ApplyDefaultProxySettings: false,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
cluster, _, _ := libtopology.NewCluster(t, clusterConfig)
|
|
|
|
cluster, client := setupClusterAndClient(t, clusterConfig, true)
|
|
|
|
defer terminate(t, cluster)
|
|
|
|
defer terminate(t, cluster)
|
|
|
|
|
|
|
|
|
|
|
|
client, err := cluster.GetClient(nil, true)
|
|
|
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// perform actions and validate returned errors to client
|
|
|
|
// perform actions and validate returned errors to client
|
|
|
|
for _, op := range tc.operations {
|
|
|
|
for _, op := range tc.operations {
|
|
|
|
err := op.action.function(client)
|
|
|
|
err := op.action.function(client)
|
|
|
@ -165,22 +164,14 @@ func TestServerRequestRateLimit(t *testing.T) {
|
|
|
|
// doing this in a separate loop so we can perform actions, allow metrics
|
|
|
|
// doing this in a separate loop so we can perform actions, allow metrics
|
|
|
|
// and logs to collect and then assert on each.
|
|
|
|
// and logs to collect and then assert on each.
|
|
|
|
for _, op := range tc.operations {
|
|
|
|
for _, op := range tc.operations {
|
|
|
|
timer := &retry.Timer{Timeout: 10 * time.Second, Wait: 500 * time.Millisecond}
|
|
|
|
timer := &retry.Timer{Timeout: 15 * time.Second, Wait: 500 * time.Millisecond}
|
|
|
|
retry.RunWith(timer, t, func(r *retry.R) {
|
|
|
|
retry.RunWith(timer, t, func(r *retry.R) {
|
|
|
|
// validate metrics
|
|
|
|
checkForMetric(t, cluster, op.action.rateLimitOperation, op.action.rateLimitType, tc.mode, op.expectMetric)
|
|
|
|
metricsInfo, err := client.Agent().Metrics()
|
|
|
|
|
|
|
|
// TODO(NET-1978): currently returns NaN error
|
|
|
|
|
|
|
|
// require.NoError(t, err)
|
|
|
|
|
|
|
|
if metricsInfo != nil && err == nil {
|
|
|
|
|
|
|
|
if op.expectMetric {
|
|
|
|
|
|
|
|
checkForMetric(r, metricsInfo, op.action.rateLimitOperation, op.action.rateLimitType, tc.mode)
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// validate logs
|
|
|
|
// validate logs
|
|
|
|
// putting this last as there are cases where logs
|
|
|
|
// putting this last as there are cases where logs
|
|
|
|
// were not present in consumer when assertion was made.
|
|
|
|
// were not present in consumer when assertion was made.
|
|
|
|
checkLogsForMessage(r, clusterConfig.LogConsumer.Msgs,
|
|
|
|
checkLogsForMessage(t, clusterConfig.LogConsumer.Msgs,
|
|
|
|
fmt.Sprintf("[DEBUG] agent.server.rpc-rate-limit: RPC exceeded allowed rate limit: rpc=%s", op.action.rateLimitOperation),
|
|
|
|
fmt.Sprintf("[DEBUG] agent.server.rpc-rate-limit: RPC exceeded allowed rate limit: rpc=%s", op.action.rateLimitOperation),
|
|
|
|
op.action.rateLimitOperation, "exceeded", op.expectExceededLog)
|
|
|
|
op.action.rateLimitOperation, "exceeded", op.expectExceededLog)
|
|
|
|
|
|
|
|
|
|
|
@ -190,43 +181,65 @@ func TestServerRequestRateLimit(t *testing.T) {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func checkForMetric(t *retry.R, metricsInfo *api.MetricsInfo, operationName string, expectedLimitType string, expectedMode string) {
|
|
|
|
func setupClusterAndClient(t *testing.T, config *libtopology.ClusterConfig, isServer bool) (*libcluster.Cluster, *api.Client) {
|
|
|
|
const counterName = "consul.rpc.rate_limit.exceeded"
|
|
|
|
cluster, _, _ := libtopology.NewCluster(t, config)
|
|
|
|
|
|
|
|
|
|
|
|
var counter api.SampledValue
|
|
|
|
client, err := cluster.GetClient(nil, isServer)
|
|
|
|
for _, c := range metricsInfo.Counters {
|
|
|
|
require.NoError(t, err)
|
|
|
|
if c.Name == counterName {
|
|
|
|
|
|
|
|
counter = c
|
|
|
|
return cluster, client
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
func checkForMetric(t *testing.T, cluster *libcluster.Cluster, operationName string, expectedLimitType string, expectedMode string, expectMetric bool) {
|
|
|
|
require.NotEmptyf(t, counter.Name, "counter not found: %s", counterName)
|
|
|
|
// validate metrics
|
|
|
|
|
|
|
|
server, err := cluster.GetClient(nil, true)
|
|
|
|
|
|
|
|
require.NoError(t, err)
|
|
|
|
|
|
|
|
metricsInfo, err := server.Agent().Metrics()
|
|
|
|
|
|
|
|
// TODO(NET-1978): currently returns NaN error
|
|
|
|
|
|
|
|
// require.NoError(t, err)
|
|
|
|
|
|
|
|
if metricsInfo != nil && err == nil {
|
|
|
|
|
|
|
|
if expectMetric {
|
|
|
|
|
|
|
|
const counterName = "consul.rpc.rate_limit.exceeded"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
var counter api.SampledValue
|
|
|
|
|
|
|
|
for _, c := range metricsInfo.Counters {
|
|
|
|
|
|
|
|
if c.Name == counterName {
|
|
|
|
|
|
|
|
counter = c
|
|
|
|
|
|
|
|
break
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
require.NotEmptyf(t, counter.Name, "counter not found: %s", counterName)
|
|
|
|
|
|
|
|
|
|
|
|
operation, ok := counter.Labels["op"]
|
|
|
|
operation, ok := counter.Labels["op"]
|
|
|
|
require.True(t, ok)
|
|
|
|
require.True(t, ok)
|
|
|
|
|
|
|
|
|
|
|
|
limitType, ok := counter.Labels["limit_type"]
|
|
|
|
limitType, ok := counter.Labels["limit_type"]
|
|
|
|
require.True(t, ok)
|
|
|
|
require.True(t, ok)
|
|
|
|
|
|
|
|
|
|
|
|
mode, ok := counter.Labels["mode"]
|
|
|
|
mode, ok := counter.Labels["mode"]
|
|
|
|
require.True(t, ok)
|
|
|
|
require.True(t, ok)
|
|
|
|
|
|
|
|
|
|
|
|
if operation == operationName {
|
|
|
|
if operation == operationName {
|
|
|
|
require.GreaterOrEqual(t, counter.Count, 1)
|
|
|
|
require.GreaterOrEqual(t, counter.Count, 1)
|
|
|
|
require.Equal(t, expectedLimitType, limitType)
|
|
|
|
require.Equal(t, expectedLimitType, limitType)
|
|
|
|
require.Equal(t, expectedMode, mode)
|
|
|
|
require.Equal(t, expectedMode, mode)
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func checkLogsForMessage(t *retry.R, logs []string, msg string, operationName string, logType string, logShouldExist bool) {
|
|
|
|
func checkLogsForMessage(t *testing.T, logs []string, msg string, operationName string, logType string, logShouldExist bool) {
|
|
|
|
found := false
|
|
|
|
if logShouldExist {
|
|
|
|
for _, log := range logs {
|
|
|
|
found := false
|
|
|
|
if strings.Contains(log, msg) {
|
|
|
|
for _, log := range logs {
|
|
|
|
found = true
|
|
|
|
if strings.Contains(log, msg) {
|
|
|
|
break
|
|
|
|
found = true
|
|
|
|
|
|
|
|
break
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
expectedLog := fmt.Sprintf("%s log check failed for: %s. Log expected: %t", logType, operationName, logShouldExist)
|
|
|
|
|
|
|
|
require.Equal(t, logShouldExist, found, expectedLog)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
require.Equal(t, logShouldExist, found, fmt.Sprintf("%s log check failed for: %s. Log expected: %t", logType, operationName, logShouldExist))
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func terminate(t *testing.T, cluster *libcluster.Cluster) {
|
|
|
|
func terminate(t *testing.T, cluster *libcluster.Cluster) {
|
|
|
|