diff --git a/agent/agent.go b/agent/agent.go index 201dc7aee6..43ec868772 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -1948,6 +1948,7 @@ type addServiceLockedRequest struct { // agent using Agent.AddService. type AddServiceRequest struct { Service *structs.NodeService + nodeName string chkTypes []*structs.CheckType persist bool token string @@ -3107,6 +3108,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig, snap map[structs.CheckI err = a.addServiceLocked(addServiceLockedRequest{ AddServiceRequest: AddServiceRequest{ Service: ns, + nodeName: a.config.NodeName, chkTypes: chkTypes, persist: false, // don't rewrite the file with the same data we just read token: service.Token, @@ -3127,6 +3129,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig, snap map[structs.CheckI err = a.addServiceLocked(addServiceLockedRequest{ AddServiceRequest: AddServiceRequest{ Service: sidecar, + nodeName: a.config.NodeName, chkTypes: sidecarChecks, persist: false, // don't rewrite the file with the same data we just read token: sidecarToken, @@ -3225,6 +3228,7 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig, snap map[structs.CheckI err = a.addServiceLocked(addServiceLockedRequest{ AddServiceRequest: AddServiceRequest{ Service: p.Service, + nodeName: a.config.NodeName, chkTypes: nil, persist: false, // don't rewrite the file with the same data we just read token: p.Token, diff --git a/agent/agent_endpoint.go b/agent/agent_endpoint.go index 10fe70c6d4..19789c5434 100644 --- a/agent/agent_endpoint.go +++ b/agent/agent_endpoint.go @@ -994,6 +994,7 @@ func (s *HTTPHandlers) AgentRegisterService(resp http.ResponseWriter, req *http. addReq := AddServiceRequest{ Service: ns, + nodeName: s.agent.config.NodeName, chkTypes: chkTypes, persist: true, token: token, @@ -1007,6 +1008,7 @@ func (s *HTTPHandlers) AgentRegisterService(resp http.ResponseWriter, req *http. if sidecar != nil { addReq := AddServiceRequest{ Service: sidecar, + nodeName: s.agent.config.NodeName, chkTypes: sidecarChecks, persist: true, token: sidecarToken, diff --git a/agent/cache-types/resolved_service_config_test.go b/agent/cache-types/resolved_service_config_test.go index 34c2ddea05..fd01f1bbba 100644 --- a/agent/cache-types/resolved_service_config_test.go +++ b/agent/cache-types/resolved_service_config_test.go @@ -25,6 +25,8 @@ func TestResolvedServiceConfig(t *testing.T) { require.Equal(uint64(24), req.QueryOptions.MinQueryIndex) require.Equal(1*time.Second, req.QueryOptions.MaxQueryTime) require.Equal("foo", req.Name) + require.Equal("foo-1", req.ID) + require.Equal("foo-node", req.NodeName) require.True(req.AllowStale) reply := args.Get(2).(*structs.ServiceConfigResponse) @@ -48,6 +50,8 @@ func TestResolvedServiceConfig(t *testing.T) { }, &structs.ServiceConfigRequest{ Datacenter: "dc1", Name: "foo", + ID: "foo-1", + NodeName: "foo-node", }) require.NoError(err) require.Equal(cache.FetchResult{ diff --git a/agent/consul/config_endpoint.go b/agent/consul/config_endpoint.go index b2529133d5..354fb05a47 100644 --- a/agent/consul/config_endpoint.go +++ b/agent/consul/config_endpoint.go @@ -329,31 +329,21 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r &reply.QueryMeta, func(ws memdb.WatchSet, state *state.Store) error { reply.Reset() - reply.MeshGateway.Mode = structs.MeshGatewayModeDefault - // Pass the WatchSet to both the service and proxy config lookups. If either is updated - // during the blocking query, this function will be rerun and these state store lookups - // will both be current. - index, serviceEntry, err := state.ConfigEntry(ws, structs.ServiceDefaults, args.Name, &args.EnterpriseMeta) - if err != nil { - return err - } - var serviceConf *structs.ServiceConfigEntry - var ok bool - if serviceEntry != nil { - serviceConf, ok = serviceEntry.(*structs.ServiceConfigEntry) - if !ok { - return fmt.Errorf("invalid service config type %T", serviceEntry) - } - } - // Use the default enterprise meta to look up the global proxy defaults. In the future we may allow per-namespace proxy-defaults - // but not yet. + // Pass the WatchSet to both the service and proxy config lookups. If either is updated during the + // blocking query, this function will be rerun and these state store lookups will both be current. + // We use the default enterprise meta to look up the global proxy defaults because their are not namespaced. _, proxyEntry, err := state.ConfigEntry(ws, structs.ProxyDefaults, structs.ProxyConfigGlobal, structs.DefaultEnterpriseMeta()) if err != nil { return err } - var proxyConf *structs.ProxyConfigEntry + + var ( + proxyConf *structs.ProxyConfigEntry + proxyConfGlobalProtocol string + ok bool + ) if proxyEntry != nil { proxyConf, ok = proxyEntry.(*structs.ProxyConfigEntry) if !ok { @@ -367,11 +357,29 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r reply.ProxyConfig = mapCopy.(map[string]interface{}) reply.MeshGateway = proxyConf.MeshGateway reply.Expose = proxyConf.Expose + + // Extract the global protocol from proxyConf for upstream configs. + rawProtocol := proxyConf.Config["protocol"] + if rawProtocol != nil { + proxyConfGlobalProtocol, ok = rawProtocol.(string) + if !ok { + return fmt.Errorf("invalid protocol type %T", rawProtocol) + } + } } + index, serviceEntry, err := state.ConfigEntry(ws, structs.ServiceDefaults, args.Name, &args.EnterpriseMeta) + if err != nil { + return err + } reply.Index = index - if serviceConf != nil { + var serviceConf *structs.ServiceConfigEntry + if serviceEntry != nil { + serviceConf, ok = serviceEntry.(*structs.ServiceConfigEntry) + if !ok { + return fmt.Errorf("invalid service config type %T", serviceEntry) + } if serviceConf.Expose.Checks { reply.Expose.Checks = true } @@ -389,55 +397,121 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r } } - // Extract the global protocol from proxyConf for upstream configs. - var proxyConfGlobalProtocol interface{} - if proxyConf != nil && proxyConf.Config != nil { - proxyConfGlobalProtocol = proxyConf.Config["protocol"] - } + // First collect all upstreams into a set of seen upstreams. + // Upstreams can come from: + // - Explicitly from proxy registrations, and therefore as an argument to this RPC endpoint + // - Implicitly from centralized upstream config in service-defaults + seenUpstreams := map[structs.ServiceID]struct{}{} - // map the legacy request structure using only service names - // to the new ServiceID type. upstreamIDs := args.UpstreamIDs legacyUpstreams := false + // Before Consul namespaces were released, the Upstreams provided to the endpoint did not contain the namespace. + // Because of this we attach the enterprise meta of the request, which will just be the default namespace. if len(upstreamIDs) == 0 { legacyUpstreams = true upstreamIDs = make([]structs.ServiceID, 0) for _, upstream := range args.Upstreams { - upstreamIDs = append(upstreamIDs, structs.NewServiceID(upstream, &args.EnterpriseMeta)) + sid := structs.NewServiceID(upstream, &args.EnterpriseMeta) + upstreamIDs = append(upstreamIDs, sid) } } + // First store all upstreams that were provided in the request + for _, sid := range upstreamIDs { + if _, ok := seenUpstreams[sid]; !ok { + seenUpstreams[sid] = struct{}{} + } + } + + // Then store upstreams inferred from service-defaults + if serviceConf != nil && serviceConf.Connect != nil { + for sid := range serviceConf.Connect.UpstreamConfigs { + seenUpstreams[structs.ServiceIDFromString(sid)] = struct{}{} + } + } + + var ( + upstreamDefaults *structs.UpstreamConfig + upstreamConfigs map[string]*structs.UpstreamConfig + ) + if serviceConf != nil && serviceConf.Connect != nil { + if serviceConf.Connect.UpstreamDefaults != nil { + upstreamDefaults = serviceConf.Connect.UpstreamDefaults + } + if serviceConf.Connect.UpstreamConfigs != nil { + upstreamConfigs = serviceConf.Connect.UpstreamConfigs + } + } + + // The goal is to flatten the mesh gateway mode in this order: + // 0. Value from centralized upstream_defaults + // 1. Value from local proxy registration + // 2. Value from centralized upstream_configs + // 3. Value from local upstream definition. This last step is done in the client's service manager. + var registrationMGConfig structs.MeshGatewayConfig + + if args.ID != "" && args.NodeName != "" { + index, registration, err := state.NodeServiceWatch(ws, args.NodeName, args.ID, &args.EnterpriseMeta) + if err != nil { + return fmt.Errorf("failed to query service registration") + } + if index > reply.Index { + reply.Index = index + } + + if registration != nil && !registration.Proxy.MeshGateway.IsZero() { + registrationMGConfig = registration.Proxy.MeshGateway + } + } + + // usConfigs stores the opaque config map for each upstream and is keyed on the upstream's ID. usConfigs := make(map[structs.ServiceID]map[string]interface{}) - for _, upstream := range upstreamIDs { - _, upstreamEntry, err := state.ConfigEntry(ws, structs.ServiceDefaults, upstream.ID, &upstream.EnterpriseMeta) + for upstream, _ := range seenUpstreams { + resolvedCfg := make(map[string]interface{}) + + // The protocol of an upstream is resolved in this order: + // 1. Default protocol from proxy-defaults (how all services should be addressed) + // 2. Protocol for upstream service defined in its service-defaults (how the upstream wants to be addressed) + // 3. Protocol defined for the upstream in the service-defaults.(upstream_defaults|upstream_configs) of the downstream + // (how the downstream wants to address it) + protocol := proxyConfGlobalProtocol + + _, upstreamSvcDefaults, err := state.ConfigEntry(ws, structs.ServiceDefaults, upstream.ID, &upstream.EnterpriseMeta) if err != nil { return err } - var upstreamConf *structs.ServiceConfigEntry - var ok bool - if upstreamEntry != nil { - upstreamConf, ok = upstreamEntry.(*structs.ServiceConfigEntry) + if upstreamSvcDefaults != nil { + cfg, ok := upstreamSvcDefaults.(*structs.ServiceConfigEntry) if !ok { - return fmt.Errorf("invalid service config type %T", upstreamEntry) + return fmt.Errorf("invalid service config type %T", upstreamSvcDefaults) + } + if cfg.Protocol != "" { + protocol = cfg.Protocol } } - - // Fallback to proxyConf global protocol. - protocol := proxyConfGlobalProtocol - if upstreamConf != nil && upstreamConf.Protocol != "" { - protocol = upstreamConf.Protocol + if protocol != "" { + resolvedCfg["protocol"] = protocol } - // Nothing to configure if a protocol hasn't been set. - if protocol == nil { - continue + // Merge centralized defaults for all upstreams before configuration for specific upstreams + if upstreamDefaults != nil { + upstreamDefaults.MergeInto(resolvedCfg, args.ID == "") + } + // The value from the proxy registration overrides the one from upstream_defaults because + // it is specific to the proxy instance + if !registrationMGConfig.IsZero() { + resolvedCfg["mesh_gateway"] = registrationMGConfig } - usConfigs[upstream] = map[string]interface{}{ - "protocol": protocol, + if upstreamConfigs[upstream.String()] != nil { + upstreamConfigs[upstream.String()].MergeInto(resolvedCfg, args.ID == "") + } + + if len(resolvedCfg) > 0 { + usConfigs[upstream] = resolvedCfg } } @@ -447,22 +521,21 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r } if legacyUpstreams { - if reply.UpstreamConfigs == nil { - reply.UpstreamConfigs = make(map[string]map[string]interface{}) - } + // For legacy upstreams we return a map that is only keyed on the string ID, since they precede namespaces + reply.UpstreamConfigs = make(map[string]map[string]interface{}) + for us, conf := range usConfigs { reply.UpstreamConfigs[us.ID] = conf } + } else { - if reply.UpstreamIDConfigs == nil { - reply.UpstreamIDConfigs = make(structs.UpstreamConfigs, 0, len(usConfigs)) - } + reply.UpstreamIDConfigs = make(structs.OpaqueUpstreamConfigs, 0, len(usConfigs)) for us, conf := range usConfigs { - reply.UpstreamIDConfigs = append(reply.UpstreamIDConfigs, structs.UpstreamConfig{Upstream: us, Config: conf}) + reply.UpstreamIDConfigs = append(reply.UpstreamIDConfigs, + structs.OpaqueUpstreamConfig{Upstream: us, Config: conf}) } } - return nil }) } diff --git a/agent/consul/config_endpoint_test.go b/agent/consul/config_endpoint_test.go index ebeadaae66..a10112834f 100644 --- a/agent/consul/config_endpoint_test.go +++ b/agent/consul/config_endpoint_test.go @@ -2,6 +2,7 @@ package consul import ( "os" + "sort" "testing" "time" @@ -892,6 +893,550 @@ func TestConfigEntry_ResolveServiceConfig(t *testing.T) { require.Equal(map[string]interface{}{"foo": 1}, proxyConf.Config) } +func TestConfigEntry_ResolveServiceConfig_Upstreams(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + t.Parallel() + + tt := []struct { + name string + entries []structs.ConfigEntry + request structs.ServiceConfigRequest + proxyCfg structs.ConnectProxyConfig + expect structs.ServiceConfigResponse + }{ + { + name: "upstream config entries from Upstreams and service-defaults", + entries: []structs.ConfigEntry{ + &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: structs.ProxyConfigGlobal, + Config: map[string]interface{}{ + "protocol": "grpc", + }, + }, + &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "foo", + Connect: &structs.ConnectConfiguration{ + UpstreamConfigs: map[string]*structs.UpstreamConfig{ + "zip": { + Protocol: "http", + }, + }, + }, + }, + }, + request: structs.ServiceConfigRequest{ + Name: "foo", + ID: "foo-proxy-1", + NodeName: "foo-node", + Datacenter: "dc1", + Upstreams: []string{"zap"}, + }, + expect: structs.ServiceConfigResponse{ + ProxyConfig: map[string]interface{}{ + "protocol": "grpc", + }, + UpstreamConfigs: map[string]map[string]interface{}{ + "zip": { + "protocol": "http", + }, + "zap": { + "protocol": "grpc", + }, + }, + }, + }, + { + name: "upstream config entries from UpstreamIDs and service-defaults", + entries: []structs.ConfigEntry{ + &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: structs.ProxyConfigGlobal, + Config: map[string]interface{}{ + "protocol": "grpc", + }, + }, + &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "foo", + Connect: &structs.ConnectConfiguration{ + UpstreamConfigs: map[string]*structs.UpstreamConfig{ + "zip": { + Protocol: "http", + }, + }, + }, + }, + }, + request: structs.ServiceConfigRequest{ + Name: "foo", + ID: "foo-proxy-1", + NodeName: "foo-node", + Datacenter: "dc1", + UpstreamIDs: []structs.ServiceID{{ID: "zap"}}, + }, + expect: structs.ServiceConfigResponse{ + ProxyConfig: map[string]interface{}{ + "protocol": "grpc", + }, + UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{ + { + Upstream: structs.ServiceID{ + ID: "zap", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + Config: map[string]interface{}{ + "protocol": "grpc", + }, + }, + { + Upstream: structs.ServiceID{ + ID: "zip", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + Config: map[string]interface{}{ + "protocol": "http", + }, + }, + }, + }, + }, + { + name: "proxy registration overrides upstream_defaults", + entries: []structs.ConfigEntry{ + &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "foo", + Connect: &structs.ConnectConfiguration{ + UpstreamDefaults: &structs.UpstreamConfig{ + MeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeRemote}, + }, + }, + }, + }, + request: structs.ServiceConfigRequest{ + Name: "foo", + ID: "foo-proxy-1", + NodeName: "foo-node", + Datacenter: "dc1", + UpstreamIDs: []structs.ServiceID{ + {ID: "zap"}, + }, + }, + proxyCfg: structs.ConnectProxyConfig{ + MeshGateway: structs.MeshGatewayConfig{ + Mode: structs.MeshGatewayModeNone, + }, + }, + expect: structs.ServiceConfigResponse{ + UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{ + { + Upstream: structs.ServiceID{ + ID: "zap", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + Config: map[string]interface{}{ + "mesh_gateway": map[string]interface{}{ + "Mode": "none", + }, + }, + }, + }, + }, + }, + { + name: "upstream_configs overrides all", + entries: []structs.ConfigEntry{ + &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: structs.ProxyConfigGlobal, + Config: map[string]interface{}{ + "protocol": "udp", + }, + }, + &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "foo", + Protocol: "tcp", + }, + &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "foo", + Connect: &structs.ConnectConfiguration{ + UpstreamDefaults: &structs.UpstreamConfig{ + Protocol: "http", + MeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeRemote}, + PassiveHealthCheck: structs.PassiveHealthCheck{ + Interval: 10, + MaxFailures: 2, + }, + }, + UpstreamConfigs: map[string]*structs.UpstreamConfig{ + "zap": { + Protocol: "grpc", + MeshGateway: structs.MeshGatewayConfig{Mode: structs.MeshGatewayModeLocal}, + }, + }, + }, + }, + }, + request: structs.ServiceConfigRequest{ + Name: "foo", + ID: "foo-proxy-1", + NodeName: "foo-node", + Datacenter: "dc1", + UpstreamIDs: []structs.ServiceID{ + {ID: "zap"}, + }, + }, + proxyCfg: structs.ConnectProxyConfig{ + MeshGateway: structs.MeshGatewayConfig{ + Mode: structs.MeshGatewayModeNone, + }, + }, + expect: structs.ServiceConfigResponse{ + ProxyConfig: map[string]interface{}{ + "protocol": "udp", + }, + UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{ + { + Upstream: structs.ServiceID{ + ID: "zap", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + Config: map[string]interface{}{ + "passive_health_check": map[string]interface{}{ + "Interval": int64(10), + "MaxFailures": int64(2), + }, + "mesh_gateway": map[string]interface{}{ + "Mode": "local", + }, + "protocol": "grpc", + }, + }, + }, + }, + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + codec := rpcClient(t, s1) + defer codec.Close() + + state := s1.fsm.State() + + // Boostrap the config entries + idx := uint64(1) + for _, conf := range tc.entries { + require.NoError(t, state.EnsureConfigEntry(idx, conf)) + idx++ + } + + // The config endpoints pulls the proxy registration if a proxy ID is provided. + if tc.request.ID != "" { + require.NoError(t, state.EnsureNode(4, &structs.Node{ + ID: "9c6e733c-c39d-4555-8d41-0f174a31c489", + Node: tc.request.NodeName, + })) + require.NoError(t, state.EnsureService(5, tc.request.NodeName, &structs.NodeService{ + ID: tc.request.ID, + Service: tc.request.ID, + Proxy: tc.proxyCfg, + })) + } + + var out structs.ServiceConfigResponse + require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig", &tc.request, &out)) + + // Don't know what this is deterministically, so we grab it from the response + tc.expect.QueryMeta = out.QueryMeta + + // Order of this slice is also not deterministic since it's populated from a map + sort.SliceStable(out.UpstreamIDConfigs, func(i, j int) bool { + return out.UpstreamIDConfigs[i].Upstream.String() < out.UpstreamIDConfigs[j].Upstream.String() + }) + + require.Equal(t, tc.expect, out) + }) + } +} + +func TestConfigEntry_ResolveServiceConfig_Upstreams_RegistrationBlocking(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + t.Parallel() + + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + codec := rpcClient(t, s1) + defer codec.Close() + + testrpc.WaitForTestAgent(t, s1.RPC, "dc1") + + nodeName := "foo-node" + + // Create a dummy proxy/service config in the state store to look up. + state := s1.fsm.State() + require.NoError(t, state.EnsureConfigEntry(1, &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: structs.ProxyConfigGlobal, + Config: map[string]interface{}{ + "foo": 1, + }, + })) + require.NoError(t, state.EnsureConfigEntry(2, &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "foo", + Protocol: "http", + })) + require.NoError(t, state.EnsureConfigEntry(3, &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "bar", + Protocol: "grpc", + })) + require.NoError(t, state.EnsureNode(4, &structs.Node{ + ID: "9c6e733c-c39d-4555-8d41-0f174a31c489", + Node: nodeName, + })) + + args := structs.ServiceConfigRequest{ + Name: "foo", + ID: "foo-proxy", + NodeName: nodeName, + Datacenter: s1.config.Datacenter, + Upstreams: []string{"bar", "baz"}, + } + var out structs.ServiceConfigResponse + require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig", &args, &out)) + + var index uint64 + expected := structs.ServiceConfigResponse{ + ProxyConfig: map[string]interface{}{ + "foo": int64(1), + "protocol": "http", + }, + // This mesh gateway configuration is pulled from foo-proxy's registration + UpstreamConfigs: map[string]map[string]interface{}{ + "bar": { + "protocol": "grpc", + }, + }, + // Don't know what this is deterministically + QueryMeta: out.QueryMeta, + } + require.Equal(t, expected, out) + index = out.Index + + // Now setup a blocking query for 'foo' while we add the proxy registration for foo-proxy. + // Adding the foo proxy registration should cause the blocking query to fire because it is + // watched when the ID and NodeName are provided. + { + // Async cause a change + start := time.Now() + go func() { + time.Sleep(100 * time.Millisecond) + require.NoError(t, state.EnsureService(index+1, nodeName, &structs.NodeService{ + ID: "foo-proxy", + Service: "foo-proxy", + Proxy: structs.ConnectProxyConfig{ + MeshGateway: structs.MeshGatewayConfig{ + Mode: structs.MeshGatewayModeLocal, + }, + }, + })) + }() + + // Re-run the query + var out structs.ServiceConfigResponse + require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig", + &structs.ServiceConfigRequest{ + Name: "foo", + ID: "foo-proxy", + NodeName: nodeName, + Datacenter: "dc1", + Upstreams: []string{"bar", "baz"}, + QueryOptions: structs.QueryOptions{ + MinQueryIndex: index, + MaxQueryTime: time.Second, + }, + }, + &out, + )) + + // Should block at least 100ms + require.True(t, time.Since(start) >= 100*time.Millisecond, "too fast") + + // Check the indexes + require.Equal(t, out.Index, index+1) + + // The mesh gateway config from the proxy registration should no longer be present + expected := structs.ServiceConfigResponse{ + ProxyConfig: map[string]interface{}{ + "foo": int64(1), + "protocol": "http", + }, + UpstreamConfigs: map[string]map[string]interface{}{ + "bar": { + "protocol": "grpc", + "mesh_gateway": map[string]interface{}{"Mode": string(structs.MeshGatewayModeLocal)}, + }, + "baz": { + "mesh_gateway": map[string]interface{}{"Mode": string(structs.MeshGatewayModeLocal)}, + }, + }, + // Don't know what this is deterministically + QueryMeta: out.QueryMeta, + } + require.Equal(t, expected, out) + } +} + +func TestConfigEntry_ResolveServiceConfig_Upstreams_DegistrationBlocking(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + t.Parallel() + + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + codec := rpcClient(t, s1) + defer codec.Close() + + testrpc.WaitForTestAgent(t, s1.RPC, "dc1") + + nodeName := "foo-node" + + // Create a dummy proxy/service config in the state store to look up. + state := s1.fsm.State() + require.NoError(t, state.EnsureConfigEntry(1, &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: structs.ProxyConfigGlobal, + Config: map[string]interface{}{ + "foo": 1, + }, + })) + require.NoError(t, state.EnsureConfigEntry(2, &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "foo", + Protocol: "http", + })) + require.NoError(t, state.EnsureConfigEntry(3, &structs.ServiceConfigEntry{ + Kind: structs.ServiceDefaults, + Name: "bar", + Protocol: "grpc", + })) + require.NoError(t, state.EnsureNode(4, &structs.Node{ + ID: "9c6e733c-c39d-4555-8d41-0f174a31c489", + Node: nodeName, + })) + require.NoError(t, state.EnsureService(5, nodeName, &structs.NodeService{ + ID: "foo-proxy", + Service: "foo-proxy", + Proxy: structs.ConnectProxyConfig{ + MeshGateway: structs.MeshGatewayConfig{ + Mode: structs.MeshGatewayModeLocal, + }, + }, + })) + + args := structs.ServiceConfigRequest{ + Name: "foo", + ID: "foo-proxy", + NodeName: nodeName, + Datacenter: s1.config.Datacenter, + Upstreams: []string{"bar", "baz"}, + } + var out structs.ServiceConfigResponse + require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig", &args, &out)) + + var index uint64 + expected := structs.ServiceConfigResponse{ + ProxyConfig: map[string]interface{}{ + "foo": int64(1), + "protocol": "http", + }, + // This mesh gateway configuration is pulled from foo-proxy's registration + UpstreamConfigs: map[string]map[string]interface{}{ + "bar": { + "protocol": "grpc", + "mesh_gateway": map[string]interface{}{"Mode": string(structs.MeshGatewayModeLocal)}, + }, + "baz": { + "mesh_gateway": map[string]interface{}{"Mode": string(structs.MeshGatewayModeLocal)}, + }, + }, + // Don't know what this is deterministically + QueryMeta: out.QueryMeta, + } + require.Equal(t, expected, out) + index = out.Index + + // Now setup a blocking query for 'foo' while we erase the proxy registration for foo-proxy. + // Deleting the foo proxy registration should cause the blocking query to fire because it is + // watched when the ID and NodeName are provided. + { + // Async cause a change + start := time.Now() + go func() { + time.Sleep(100 * time.Millisecond) + require.NoError(t, state.DeleteService(index+1, nodeName, "foo-proxy", nil)) + }() + + // Re-run the query + var out structs.ServiceConfigResponse + require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig", + &structs.ServiceConfigRequest{ + Name: "foo", + ID: "foo-proxy", + NodeName: nodeName, + Datacenter: "dc1", + Upstreams: []string{"bar", "baz"}, + QueryOptions: structs.QueryOptions{ + MinQueryIndex: index, + MaxQueryTime: time.Second, + }, + }, + &out, + )) + + // Should block at least 100ms + require.True(t, time.Since(start) >= 100*time.Millisecond, "too fast") + + // Check the indexes + require.Equal(t, out.Index, index+1) + + // The mesh gateway config from the proxy registration should no longer be present + expected := structs.ServiceConfigResponse{ + ProxyConfig: map[string]interface{}{ + "foo": int64(1), + "protocol": "http", + }, + UpstreamConfigs: map[string]map[string]interface{}{ + "bar": { + "protocol": "grpc", + }, + }, + // Don't know what this is deterministically + QueryMeta: out.QueryMeta, + } + require.Equal(t, expected, out) + } +} + func TestConfigEntry_ResolveServiceConfig_Blocking(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 71a31d2725..e525196c9d 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -1136,6 +1136,24 @@ func (s *Store) NodeService(nodeName string, serviceID string, entMeta *structs. return idx, service, nil } +// NodeServiceWatch is used to retrieve a specific service associated with the given +// node, and add it to the watch set. +func (s *Store) NodeServiceWatch(ws memdb.WatchSet, nodeName string, serviceID string, entMeta *structs.EnterpriseMeta) (uint64, *structs.NodeService, error) { + tx := s.db.Txn(false) + defer tx.Abort() + + // Get the table index. + idx := catalogServicesMaxIndex(tx, entMeta) + + // Query the service + service, err := getNodeServiceWatchTxn(tx, ws, nodeName, serviceID, entMeta) + if err != nil { + return 0, nil, fmt.Errorf("failed querying service for node %q: %s", nodeName, err) + } + + return idx, service, nil +} + func getNodeServiceTxn(tx ReadTxn, nodeName, serviceID string, entMeta *structs.EnterpriseMeta) (*structs.NodeService, error) { // Query the service _, service, err := firstWatchCompoundWithTxn(tx, "services", "id", entMeta, nodeName, serviceID) @@ -1150,6 +1168,21 @@ func getNodeServiceTxn(tx ReadTxn, nodeName, serviceID string, entMeta *structs. return nil, nil } +func getNodeServiceWatchTxn(tx ReadTxn, ws memdb.WatchSet, nodeName, serviceID string, entMeta *structs.EnterpriseMeta) (*structs.NodeService, error) { + // Query the service + watchCh, service, err := firstWatchCompoundWithTxn(tx, "services", "id", entMeta, nodeName, serviceID) + if err != nil { + return nil, fmt.Errorf("failed querying service for node %q: %s", nodeName, err) + } + ws.Add(watchCh) + + if service != nil { + return service.(*structs.ServiceNode).ToNodeService(), nil + } + + return nil, nil +} + func (s *Store) nodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *structs.EnterpriseMeta, allowWildcard bool) (bool, uint64, *structs.Node, memdb.ResultIterator, error) { tx := s.db.Txn(false) defer tx.Abort() diff --git a/agent/service_manager.go b/agent/service_manager.go index 591c0981f8..8f3f31625d 100644 --- a/agent/service_manager.go +++ b/agent/service_manager.go @@ -309,8 +309,13 @@ func (w *serviceConfigWatch) handleUpdate(ctx context.Context, event cache.Updat } func makeConfigRequest(bd BaseDeps, addReq AddServiceRequest) *structs.ServiceConfigRequest { - ns := addReq.Service - name := ns.Service + var ( + ns = addReq.Service + name = ns.Service + id = ns.ID + node = addReq.nodeName + ) + var upstreams []structs.ServiceID // Note that only sidecar proxies should even make it here for now although @@ -333,6 +338,8 @@ func makeConfigRequest(bd BaseDeps, addReq AddServiceRequest) *structs.ServiceCo req := &structs.ServiceConfigRequest{ Name: name, + ID: id, + NodeName: node, Datacenter: bd.RuntimeConfig.Datacenter, QueryOptions: structs.QueryOptions{Token: addReq.token}, UpstreamIDs: upstreams, @@ -365,7 +372,6 @@ func mergeServiceConfig(defaults *structs.ServiceConfigResponse, service *struct if err := mergo.Merge(&ns.Proxy.Config, defaults.ProxyConfig); err != nil { return nil, err } - if err := mergo.Merge(&ns.Proxy.Expose, defaults.Expose); err != nil { return nil, err } @@ -382,16 +388,27 @@ func mergeServiceConfig(defaults *structs.ServiceConfigResponse, service *struct continue } - // default the upstreams gateway mode if it didn't specify one - if us.MeshGateway.Mode == structs.MeshGatewayModeDefault { - us.MeshGateway.Mode = ns.Proxy.MeshGateway.Mode - } - usCfg, ok := defaults.UpstreamIDConfigs.GetUpstreamConfig(us.DestinationID()) if !ok { // No config defaults to merge continue } + + // MeshGateway mode is fetched separately since it is a first class field and not read from us.Config + parsed, err := structs.ParseUpstreamConfig(usCfg) + if err != nil { + return nil, fmt.Errorf("failed to parse upstream config map for %s: %v", us.Identifier(), err) + } + + // The local upstream config mode has the highest precedence, so only overwrite when it's set to the default + if us.MeshGateway.Mode == structs.MeshGatewayModeDefault { + us.MeshGateway.Mode = parsed.MeshGateway.Mode + } + + // Delete the mesh gateway key since this is the only place it is read from an opaque map. + delete(usCfg, "mesh_gateway") + + // Merge in everything else that is read from the map if err := mergo.Merge(&us.Config, usCfg); err != nil { return nil, err } diff --git a/agent/service_manager_test.go b/agent/service_manager_test.go index bdda31b179..3d179377aa 100644 --- a/agent/service_manager_test.go +++ b/agent/service_manager_test.go @@ -8,11 +8,11 @@ import ( "path/filepath" "testing" - "github.com/stretchr/testify/require" - "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestServiceManager_RegisterService(t *testing.T) { @@ -848,3 +848,205 @@ func convertToMap(v interface{}) (map[string]interface{}, error) { return raw, nil } + +func Test_mergeServiceConfig_UpstreamOverrides(t *testing.T) { + type args struct { + defaults *structs.ServiceConfigResponse + service *structs.NodeService + } + tests := []struct { + name string + args args + want *structs.NodeService + }{ + { + name: "new config fields", + args: args{ + defaults: &structs.ServiceConfigResponse{ + UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{ + { + Upstream: structs.ServiceID{ + ID: "zap", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + Config: map[string]interface{}{ + "passive_health_check": map[string]interface{}{ + "Interval": int64(10), + "MaxFailures": int64(2), + }, + "mesh_gateway": map[string]interface{}{ + "Mode": "local", + }, + "protocol": "grpc", + }, + }, + }, + }, + service: &structs.NodeService{ + ID: "foo-proxy", + Service: "foo-proxy", + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "foo", + DestinationServiceID: "foo", + Upstreams: structs.Upstreams{ + structs.Upstream{ + DestinationNamespace: "default", + DestinationName: "zap", + }, + }, + }, + }, + }, + want: &structs.NodeService{ + ID: "foo-proxy", + Service: "foo-proxy", + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "foo", + DestinationServiceID: "foo", + Upstreams: structs.Upstreams{ + structs.Upstream{ + DestinationNamespace: "default", + DestinationName: "zap", + Config: map[string]interface{}{ + "passive_health_check": map[string]interface{}{ + "Interval": int64(10), + "MaxFailures": int64(2), + }, + "protocol": "grpc", + }, + MeshGateway: structs.MeshGatewayConfig{ + Mode: structs.MeshGatewayModeLocal, + }, + }, + }, + }, + }, + }, + { + name: "upstream mode from remote defaults overrides local default", + args: args{ + defaults: &structs.ServiceConfigResponse{ + UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{ + { + Upstream: structs.ServiceID{ + ID: "zap", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + Config: map[string]interface{}{ + "mesh_gateway": map[string]interface{}{ + "Mode": "local", + }, + }, + }, + }, + }, + service: &structs.NodeService{ + ID: "foo-proxy", + Service: "foo-proxy", + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "foo", + DestinationServiceID: "foo", + MeshGateway: structs.MeshGatewayConfig{ + Mode: structs.MeshGatewayModeRemote, + }, + Upstreams: structs.Upstreams{ + structs.Upstream{ + DestinationNamespace: "default", + DestinationName: "zap", + }, + }, + }, + }, + }, + want: &structs.NodeService{ + ID: "foo-proxy", + Service: "foo-proxy", + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "foo", + DestinationServiceID: "foo", + MeshGateway: structs.MeshGatewayConfig{ + Mode: structs.MeshGatewayModeRemote, + }, + Upstreams: structs.Upstreams{ + structs.Upstream{ + DestinationNamespace: "default", + DestinationName: "zap", + Config: map[string]interface{}{}, + MeshGateway: structs.MeshGatewayConfig{ + Mode: structs.MeshGatewayModeLocal, + }, + }, + }, + }, + }, + }, + { + name: "mode in local upstream config overrides all", + args: args{ + defaults: &structs.ServiceConfigResponse{ + UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{ + { + Upstream: structs.ServiceID{ + ID: "zap", + EnterpriseMeta: *structs.DefaultEnterpriseMeta(), + }, + Config: map[string]interface{}{ + "mesh_gateway": map[string]interface{}{ + "Mode": "local", + }, + }, + }, + }, + }, + service: &structs.NodeService{ + ID: "foo-proxy", + Service: "foo-proxy", + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "foo", + DestinationServiceID: "foo", + MeshGateway: structs.MeshGatewayConfig{ + Mode: structs.MeshGatewayModeRemote, + }, + Upstreams: structs.Upstreams{ + structs.Upstream{ + DestinationNamespace: "default", + DestinationName: "zap", + MeshGateway: structs.MeshGatewayConfig{ + Mode: structs.MeshGatewayModeNone, + }, + }, + }, + }, + }, + }, + want: &structs.NodeService{ + ID: "foo-proxy", + Service: "foo-proxy", + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "foo", + DestinationServiceID: "foo", + MeshGateway: structs.MeshGatewayConfig{ + Mode: structs.MeshGatewayModeRemote, + }, + Upstreams: structs.Upstreams{ + structs.Upstream{ + DestinationNamespace: "default", + DestinationName: "zap", + Config: map[string]interface{}{}, + MeshGateway: structs.MeshGatewayConfig{ + Mode: structs.MeshGatewayModeNone, + }, + }, + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := mergeServiceConfig(tt.args.defaults, tt.args.service) + require.NoError(t, err) + assert.Equal(t, tt.want, got) + }) + } +} diff --git a/agent/structs/config_entry.go b/agent/structs/config_entry.go index c43038bf3a..158264c13e 100644 --- a/agent/structs/config_entry.go +++ b/agent/structs/config_entry.go @@ -578,13 +578,17 @@ func (r *ConfigEntryListAllRequest) RequestDatacenter() string { // for a service. type ServiceConfigRequest struct { Name string + ID string + NodeName string Datacenter string + + UpstreamIDs []ServiceID + // DEPRECATED // Upstreams is a list of upstream service names to use for resolving the service config // UpstreamIDs should be used instead which can encode more than just the name to // uniquely identify a service. - Upstreams []string - UpstreamIDs []ServiceID + Upstreams []string EnterpriseMeta `hcl:",squash" mapstructure:",squash"` QueryOptions