From 3975cb89bfe0699cbdd2a14e87e725b2440e670e Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" Date: Wed, 14 Aug 2019 09:08:46 -0500 Subject: [PATCH] agent: blocking central config RPCs iterations should not interfere with each other (#6316) --- agent/consul/config_endpoint.go | 2 + agent/consul/config_endpoint_test.go | 165 +++++++++++++++++++++++++++ agent/structs/config_entry.go | 6 + 3 files changed, 173 insertions(+) diff --git a/agent/consul/config_endpoint.go b/agent/consul/config_endpoint.go index 1727d527a4..8f9b005260 100644 --- a/agent/consul/config_endpoint.go +++ b/agent/consul/config_endpoint.go @@ -231,6 +231,8 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r &args.QueryOptions, &reply.QueryMeta, func(ws memdb.WatchSet, state *state.Store) error { + reply.Reset() + reply.MeshGateway.Mode = structs.MeshGatewayModeDefault // Pass the WatchSet to both the service and proxy config lookups. If either is updated // during the blocking query, this function will be rerun and these state store lookups diff --git a/agent/consul/config_endpoint_test.go b/agent/consul/config_endpoint_test.go index 7a5bae274c..e1ab202729 100644 --- a/agent/consul/config_endpoint_test.go +++ b/agent/consul/config_endpoint_test.go @@ -3,6 +3,7 @@ package consul import ( "os" "testing" + "time" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/structs" @@ -733,6 +734,170 @@ func TestConfigEntry_ResolveServiceConfig(t *testing.T) { require.Equal(map[string]interface{}{"foo": 1}, proxyConf.Config) } +func TestConfigEntry_ResolveServiceConfig_Blocking(t *testing.T) { + t.Parallel() + + require := require.New(t) + + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + // The main thing this should test is that information from one iteration + // of the blocking query does NOT bleed over into the next run. Concretely + // in this test the data present in the initial proxy-defaults should not + // be present when we are woken up due to proxy-defaults being deleted. + + state := s1.fsm.State() + require.NoError(state.EnsureConfigEntry(1, &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: structs.ProxyConfigGlobal, + Config: map[string]interface{}{ + "global": 1, + }, + })) + require.NoError(state.EnsureConfigEntry(2, &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "foo", + Protocol: "grpc", + })) + require.NoError(state.EnsureConfigEntry(3, &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "bar", + Protocol: "http", + })) + + var index uint64 + + { // Verify that we get the results of proxy-defaults and service-defaults for 'foo'. + var out structs.ServiceConfigResponse + require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig", + &structs.ServiceConfigRequest{ + Name: "foo", + Datacenter: "dc1", + }, + &out, + )) + + expected := structs.ServiceConfigResponse{ + ProxyConfig: map[string]interface{}{ + "global": int64(1), + "protocol": "grpc", + }, + QueryMeta: out.QueryMeta, + } + require.Equal(expected, out) + index = out.Index + } + + // Now setup a blocking query for 'foo' while we erase the service-defaults for foo. + { + // Async cause a change + start := time.Now() + go func() { + time.Sleep(100 * time.Millisecond) + require.NoError(state.DeleteConfigEntry(index+1, + structs.ServiceDefaults, + "foo", + )) + }() + + // Re-run the query + var out structs.ServiceConfigResponse + require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig", + &structs.ServiceConfigRequest{ + Name: "foo", + Datacenter: "dc1", + QueryOptions: structs.QueryOptions{ + MinQueryIndex: index, + MaxQueryTime: time.Second, + }, + }, + &out, + )) + + // Should block at least 100ms + require.True(time.Since(start) >= 100*time.Millisecond, "too fast") + + // Check the indexes + require.Equal(out.Index, index+1) + + expected := structs.ServiceConfigResponse{ + ProxyConfig: map[string]interface{}{ + "global": int64(1), + }, + QueryMeta: out.QueryMeta, + } + require.Equal(expected, out) + + index = out.Index + } + + { // Verify that we get the results of proxy-defaults and service-defaults for 'bar'. + var out structs.ServiceConfigResponse + require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig", + &structs.ServiceConfigRequest{ + Name: "bar", + Datacenter: "dc1", + }, + &out, + )) + + expected := structs.ServiceConfigResponse{ + ProxyConfig: map[string]interface{}{ + "global": int64(1), + "protocol": "http", + }, + QueryMeta: out.QueryMeta, + } + require.Equal(expected, out) + index = out.Index + } + + // Now setup a blocking query for 'bar' while we erase the global proxy-defaults. + { + // Async cause a change + start := time.Now() + go func() { + time.Sleep(100 * time.Millisecond) + require.NoError(state.DeleteConfigEntry(index+1, + structs.ProxyDefaults, + structs.ProxyConfigGlobal, + )) + }() + + // Re-run the query + var out structs.ServiceConfigResponse + require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig", + &structs.ServiceConfigRequest{ + Name: "bar", + Datacenter: "dc1", + QueryOptions: structs.QueryOptions{ + MinQueryIndex: index, + MaxQueryTime: time.Second, + }, + }, + &out, + )) + + // Should block at least 100ms + require.True(time.Since(start) >= 100*time.Millisecond, "too fast") + + // Check the indexes + require.Equal(out.Index, index+1) + + expected := structs.ServiceConfigResponse{ + ProxyConfig: map[string]interface{}{ + "protocol": "http", + }, + QueryMeta: out.QueryMeta, + } + require.Equal(expected, out) + } +} + func TestConfigEntry_ResolveServiceConfig_UpstreamProxyDefaultsProtocol(t *testing.T) { t.Parallel() diff --git a/agent/structs/config_entry.go b/agent/structs/config_entry.go index 58ccd13cf6..aa5458cf31 100644 --- a/agent/structs/config_entry.go +++ b/agent/structs/config_entry.go @@ -538,6 +538,12 @@ type ServiceConfigResponse struct { QueryMeta } +func (r *ServiceConfigResponse) Reset() { + r.ProxyConfig = nil + r.UpstreamConfigs = nil + r.MeshGateway = MeshGatewayConfig{} +} + // MarshalBinary writes ServiceConfigResponse as msgpack encoded. It's only here // because we need custom decoding of the raw interface{} values. func (r *ServiceConfigResponse) MarshalBinary() (data []byte, err error) {