mirror of https://github.com/hashicorp/consul
agent: blocking central config RPCs iterations should not interfere with each other (#6316)
parent
6d995246a8
commit
3975cb89bf
|
@ -231,6 +231,8 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
|
|||
&args.QueryOptions,
|
||||
&reply.QueryMeta,
|
||||
func(ws memdb.WatchSet, state *state.Store) error {
|
||||
reply.Reset()
|
||||
|
||||
reply.MeshGateway.Mode = structs.MeshGatewayModeDefault
|
||||
// Pass the WatchSet to both the service and proxy config lookups. If either is updated
|
||||
// during the blocking query, this function will be rerun and these state store lookups
|
||||
|
|
|
@ -3,6 +3,7 @@ package consul
|
|||
import (
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
|
@ -733,6 +734,170 @@ func TestConfigEntry_ResolveServiceConfig(t *testing.T) {
|
|||
require.Equal(map[string]interface{}{"foo": 1}, proxyConf.Config)
|
||||
}
|
||||
|
||||
func TestConfigEntry_ResolveServiceConfig_Blocking(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
require := require.New(t)
|
||||
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
// The main thing this should test is that information from one iteration
|
||||
// of the blocking query does NOT bleed over into the next run. Concretely
|
||||
// in this test the data present in the initial proxy-defaults should not
|
||||
// be present when we are woken up due to proxy-defaults being deleted.
|
||||
|
||||
state := s1.fsm.State()
|
||||
require.NoError(state.EnsureConfigEntry(1, &structs.ProxyConfigEntry{
|
||||
Kind: structs.ProxyDefaults,
|
||||
Name: structs.ProxyConfigGlobal,
|
||||
Config: map[string]interface{}{
|
||||
"global": 1,
|
||||
},
|
||||
}))
|
||||
require.NoError(state.EnsureConfigEntry(2, &structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "foo",
|
||||
Protocol: "grpc",
|
||||
}))
|
||||
require.NoError(state.EnsureConfigEntry(3, &structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "bar",
|
||||
Protocol: "http",
|
||||
}))
|
||||
|
||||
var index uint64
|
||||
|
||||
{ // Verify that we get the results of proxy-defaults and service-defaults for 'foo'.
|
||||
var out structs.ServiceConfigResponse
|
||||
require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig",
|
||||
&structs.ServiceConfigRequest{
|
||||
Name: "foo",
|
||||
Datacenter: "dc1",
|
||||
},
|
||||
&out,
|
||||
))
|
||||
|
||||
expected := structs.ServiceConfigResponse{
|
||||
ProxyConfig: map[string]interface{}{
|
||||
"global": int64(1),
|
||||
"protocol": "grpc",
|
||||
},
|
||||
QueryMeta: out.QueryMeta,
|
||||
}
|
||||
require.Equal(expected, out)
|
||||
index = out.Index
|
||||
}
|
||||
|
||||
// Now setup a blocking query for 'foo' while we erase the service-defaults for foo.
|
||||
{
|
||||
// Async cause a change
|
||||
start := time.Now()
|
||||
go func() {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
require.NoError(state.DeleteConfigEntry(index+1,
|
||||
structs.ServiceDefaults,
|
||||
"foo",
|
||||
))
|
||||
}()
|
||||
|
||||
// Re-run the query
|
||||
var out structs.ServiceConfigResponse
|
||||
require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig",
|
||||
&structs.ServiceConfigRequest{
|
||||
Name: "foo",
|
||||
Datacenter: "dc1",
|
||||
QueryOptions: structs.QueryOptions{
|
||||
MinQueryIndex: index,
|
||||
MaxQueryTime: time.Second,
|
||||
},
|
||||
},
|
||||
&out,
|
||||
))
|
||||
|
||||
// Should block at least 100ms
|
||||
require.True(time.Since(start) >= 100*time.Millisecond, "too fast")
|
||||
|
||||
// Check the indexes
|
||||
require.Equal(out.Index, index+1)
|
||||
|
||||
expected := structs.ServiceConfigResponse{
|
||||
ProxyConfig: map[string]interface{}{
|
||||
"global": int64(1),
|
||||
},
|
||||
QueryMeta: out.QueryMeta,
|
||||
}
|
||||
require.Equal(expected, out)
|
||||
|
||||
index = out.Index
|
||||
}
|
||||
|
||||
{ // Verify that we get the results of proxy-defaults and service-defaults for 'bar'.
|
||||
var out structs.ServiceConfigResponse
|
||||
require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig",
|
||||
&structs.ServiceConfigRequest{
|
||||
Name: "bar",
|
||||
Datacenter: "dc1",
|
||||
},
|
||||
&out,
|
||||
))
|
||||
|
||||
expected := structs.ServiceConfigResponse{
|
||||
ProxyConfig: map[string]interface{}{
|
||||
"global": int64(1),
|
||||
"protocol": "http",
|
||||
},
|
||||
QueryMeta: out.QueryMeta,
|
||||
}
|
||||
require.Equal(expected, out)
|
||||
index = out.Index
|
||||
}
|
||||
|
||||
// Now setup a blocking query for 'bar' while we erase the global proxy-defaults.
|
||||
{
|
||||
// Async cause a change
|
||||
start := time.Now()
|
||||
go func() {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
require.NoError(state.DeleteConfigEntry(index+1,
|
||||
structs.ProxyDefaults,
|
||||
structs.ProxyConfigGlobal,
|
||||
))
|
||||
}()
|
||||
|
||||
// Re-run the query
|
||||
var out structs.ServiceConfigResponse
|
||||
require.NoError(msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig",
|
||||
&structs.ServiceConfigRequest{
|
||||
Name: "bar",
|
||||
Datacenter: "dc1",
|
||||
QueryOptions: structs.QueryOptions{
|
||||
MinQueryIndex: index,
|
||||
MaxQueryTime: time.Second,
|
||||
},
|
||||
},
|
||||
&out,
|
||||
))
|
||||
|
||||
// Should block at least 100ms
|
||||
require.True(time.Since(start) >= 100*time.Millisecond, "too fast")
|
||||
|
||||
// Check the indexes
|
||||
require.Equal(out.Index, index+1)
|
||||
|
||||
expected := structs.ServiceConfigResponse{
|
||||
ProxyConfig: map[string]interface{}{
|
||||
"protocol": "http",
|
||||
},
|
||||
QueryMeta: out.QueryMeta,
|
||||
}
|
||||
require.Equal(expected, out)
|
||||
}
|
||||
}
|
||||
|
||||
func TestConfigEntry_ResolveServiceConfig_UpstreamProxyDefaultsProtocol(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
|
|
@ -538,6 +538,12 @@ type ServiceConfigResponse struct {
|
|||
QueryMeta
|
||||
}
|
||||
|
||||
func (r *ServiceConfigResponse) Reset() {
|
||||
r.ProxyConfig = nil
|
||||
r.UpstreamConfigs = nil
|
||||
r.MeshGateway = MeshGatewayConfig{}
|
||||
}
|
||||
|
||||
// MarshalBinary writes ServiceConfigResponse as msgpack encoded. It's only here
|
||||
// because we need custom decoding of the raw interface{} values.
|
||||
func (r *ServiceConfigResponse) MarshalBinary() (data []byte, err error) {
|
||||
|
|
Loading…
Reference in New Issue