diff --git a/agent/agent.go b/agent/agent.go index c898c864e9..76fd923d4a 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -360,10 +360,16 @@ func New(bd BaseDeps) (*Agent, error) { } cacheName := cachetype.HealthServicesName - if bd.RuntimeConfig.CacheUseStreamingBackend { + if bd.RuntimeConfig.UseStreamingBackend { cacheName = cachetype.StreamingHealthServicesName } - a.rpcClientHealth = &health.Client{Cache: bd.Cache, NetRPC: &a, CacheName: cacheName} + a.rpcClientHealth = &health.Client{ + Cache: bd.Cache, + NetRPC: &a, + CacheName: cacheName, + // Temporarily until streaming supports all connect events + CacheNameConnect: cachetype.HealthServicesName, + } a.serviceManager = NewServiceManager(&a) diff --git a/agent/config/builder.go b/agent/config/builder.go index e2ec9ac037..5f6292d949 100644 --- a/agent/config/builder.go +++ b/agent/config/builder.go @@ -1093,7 +1093,7 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) { Watches: c.Watches, } - rt.CacheUseStreamingBackend = b.boolVal(c.Cache.UseStreamingBackend) + rt.UseStreamingBackend = b.boolVal(c.UseStreamingBackend) if rt.Cache.EntryFetchMaxBurst <= 0 { return RuntimeConfig{}, fmt.Errorf("cache.entry_fetch_max_burst must be strictly positive, was: %v", rt.Cache.EntryFetchMaxBurst) diff --git a/agent/config/config.go b/agent/config/config.go index b5254d7092..41af5360e0 100644 --- a/agent/config/config.go +++ b/agent/config/config.go @@ -103,9 +103,6 @@ type Cache struct { EntryFetchMaxBurst *int `json:"entry_fetch_max_burst,omitempty" hcl:"entry_fetch_max_burst" mapstructure:"entry_fetch_max_burst"` // EntryFetchRate represents the max calls/sec for a single cache entry EntryFetchRate *float64 `json:"entry_fetch_rate,omitempty" hcl:"entry_fetch_rate" mapstructure:"entry_fetch_rate"` - // UseStreamingBackend instead of blocking queries to populate the cache. - // Only supported by some cache types. - UseStreamingBackend *bool `json:"use_streaming_backend" hcl:"use_streaming_backend" mapstructure:"use_streaming_backend"` } // Config defines the format of a configuration file in either JSON or @@ -264,6 +261,10 @@ type Config struct { RPC RPC `mapstructure:"rpc"` + // UseStreamingBackend instead of blocking queries for service health and + // any other endpoints which support streaming. + UseStreamingBackend *bool `json:"use_streaming_backend" hcl:"use_streaming_backend" mapstructure:"use_streaming_backend"` + // This isn't used by Consul but we've documented a feature where users // can deploy their snapshot agent configs alongside their Consul configs // so we have a placeholder here so it can be parsed but this doesn't diff --git a/agent/config/runtime.go b/agent/config/runtime.go index c3a7e3ec9b..1fd3db430c 100644 --- a/agent/config/runtime.go +++ b/agent/config/runtime.go @@ -937,7 +937,9 @@ type RuntimeConfig struct { RPCConfig consul.RPCConfig - CacheUseStreamingBackend bool + // UseStreamingBackend enables streaming as a replacement for agent/cache + // in the client agent for endpoints which support streaming. + UseStreamingBackend bool // RaftProtocol sets the Raft protocol version to use on this server. // Defaults to 3. diff --git a/agent/config/runtime_test.go b/agent/config/runtime_test.go index dc3a7a6d3f..42adf5986f 100644 --- a/agent/config/runtime_test.go +++ b/agent/config/runtime_test.go @@ -4895,9 +4895,9 @@ func TestFullConfig(t *testing.T) { "bootstrap_expect": 53, "cache": { "entry_fetch_max_burst": 42, - "entry_fetch_rate": 0.334, - "use_streaming_backend": true + "entry_fetch_rate": 0.334 }, + "use_streaming_backend": true, "ca_file": "erA7T0PM", "ca_path": "mQEN1Mfp", "cert_file": "7s4QAzDk", @@ -5581,8 +5581,8 @@ func TestFullConfig(t *testing.T) { cache = { entry_fetch_max_burst = 42 entry_fetch_rate = 0.334 - use_streaming_backend = true }, + use_streaming_backend = true ca_file = "erA7T0PM" ca_path = "mQEN1Mfp" cert_file = "7s4QAzDk" @@ -6877,17 +6877,17 @@ func TestFullConfig(t *testing.T) { }, }, }, - CacheUseStreamingBackend: true, - SerfAdvertiseAddrLAN: tcpAddr("17.99.29.16:8301"), - SerfAdvertiseAddrWAN: tcpAddr("78.63.37.19:8302"), - SerfBindAddrLAN: tcpAddr("99.43.63.15:8301"), - SerfBindAddrWAN: tcpAddr("67.88.33.19:8302"), - SerfAllowedCIDRsLAN: []net.IPNet{}, - SerfAllowedCIDRsWAN: []net.IPNet{}, - SessionTTLMin: 26627 * time.Second, - SkipLeaveOnInt: true, - StartJoinAddrsLAN: []string{"LR3hGDoG", "MwVpZ4Up"}, - StartJoinAddrsWAN: []string{"EbFSc3nA", "kwXTh623"}, + UseStreamingBackend: true, + SerfAdvertiseAddrLAN: tcpAddr("17.99.29.16:8301"), + SerfAdvertiseAddrWAN: tcpAddr("78.63.37.19:8302"), + SerfBindAddrLAN: tcpAddr("99.43.63.15:8301"), + SerfBindAddrWAN: tcpAddr("67.88.33.19:8302"), + SerfAllowedCIDRsLAN: []net.IPNet{}, + SerfAllowedCIDRsWAN: []net.IPNet{}, + SessionTTLMin: 26627 * time.Second, + SkipLeaveOnInt: true, + StartJoinAddrsLAN: []string{"LR3hGDoG", "MwVpZ4Up"}, + StartJoinAddrsWAN: []string{"EbFSc3nA", "kwXTh623"}, Telemetry: lib.TelemetryConfig{ CirconusAPIApp: "p4QOTe9j", CirconusAPIToken: "E3j35V23", @@ -7527,7 +7527,7 @@ func TestSanitize(t *testing.T) { "SerfBindAddrWAN": "", "SerfPortLAN": 0, "SerfPortWAN": 0, - "CacheUseStreamingBackend": false, + "UseStreamingBackend": false, "ServerMode": false, "ServerName": "", "ServerPort": 0, diff --git a/agent/consul/state/memdb.go b/agent/consul/state/memdb.go index d7076dacd0..2c7cc5cd41 100644 --- a/agent/consul/state/memdb.go +++ b/agent/consul/state/memdb.go @@ -3,9 +3,10 @@ package state import ( "fmt" + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/proto/pbsubscribe" - "github.com/hashicorp/go-memdb" ) // ReadTxn is implemented by memdb.Txn to perform read operations. @@ -183,7 +184,9 @@ func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { func newSnapshotHandlers(s *Store) stream.SnapshotHandlers { return stream.SnapshotHandlers{ - topicServiceHealth: serviceHealthSnapshot(s, topicServiceHealth), - topicServiceHealthConnect: serviceHealthSnapshot(s, topicServiceHealthConnect), + topicServiceHealth: serviceHealthSnapshot(s, topicServiceHealth), + // The connect topic is temporarily disabled until the correct events are + // created for terminating gateway changes. + //topicServiceHealthConnect: serviceHealthSnapshot(s, topicServiceHealthConnect), } } diff --git a/agent/health_endpoint.go b/agent/health_endpoint.go index af935f1ba8..b37c47e16a 100644 --- a/agent/health_endpoint.go +++ b/agent/health_endpoint.go @@ -219,8 +219,8 @@ func (s *HTTPHandlers) healthServiceNodes(resp http.ResponseWriter, req *http.Re return nil, nil } - // TODO: handle this for all endpoints in parseConsistency - args.QueryOptions.UseCache = s.agent.config.HTTPUseCache && args.QueryOptions.UseCache + useStreaming := s.agent.config.UseStreamingBackend && args.MinQueryIndex > 0 + args.QueryOptions.UseCache = s.agent.config.HTTPUseCache && (args.QueryOptions.UseCache || useStreaming) out, md, err := s.agent.rpcClientHealth.ServiceNodes(req.Context(), args) if err != nil { diff --git a/agent/rpcclient/health/health.go b/agent/rpcclient/health/health.go index 09fe452ab6..0118a363cd 100644 --- a/agent/rpcclient/health/health.go +++ b/agent/rpcclient/health/health.go @@ -8,9 +8,12 @@ import ( ) type Client struct { - NetRPC NetRPC - Cache CacheGetter + NetRPC NetRPC + Cache CacheGetter + // CacheName to use for service health. CacheName string + // CacheNameConnect is the name of the cache to use for connect service health. + CacheNameConnect string } type NetRPC interface { @@ -51,7 +54,12 @@ func (c *Client) getServiceNodes( return out, cache.ResultMeta{}, err } - raw, md, err := c.Cache.Get(ctx, c.CacheName, &req) + cacheName := c.CacheName + if req.Connect { + cacheName = c.CacheNameConnect + } + + raw, md, err := c.Cache.Get(ctx, cacheName, &req) if err != nil { return out, md, err } diff --git a/agent/setup.go b/agent/setup.go index c7fe7f523b..96265ef24a 100644 --- a/agent/setup.go +++ b/agent/setup.go @@ -122,7 +122,7 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error) // function is for registering newer cache-types which no longer have a dependency // on Agent. func registerCacheTypes(bd BaseDeps) error { - if bd.RuntimeConfig.CacheUseStreamingBackend { + if bd.RuntimeConfig.UseStreamingBackend { conn, err := bd.GRPCConnPool.ClientConn(bd.RuntimeConfig.Datacenter) if err != nil { return err diff --git a/website/pages/docs/agent/options.mdx b/website/pages/docs/agent/options.mdx index c31069c2c0..4206613470 100644 --- a/website/pages/docs/agent/options.mdx +++ b/website/pages/docs/agent/options.mdx @@ -1151,13 +1151,6 @@ Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'." The default value is "No limit" and should be tuned on large clusters to avoid performing too many RPCs on entries changing a lot. - - `use_streaming_backend` when enabled Consul client agents will use streaming rpc to - populate the cache, instead of the traditional blocking queries. All servers must - have [`rpc.enable_streaming`](#rpc_enable_streaming) enabled before any client can enable `use_streaming_backend`. - At least one of [`dns.use_cache`](#dns_use_cache) or - [`http_config.use_cache`](#http_config_use_cache) must be enabled, otherwise - this setting has no effect. - - `ca_file` This provides a file path to a PEM-encoded certificate authority. The certificate authority is used to check the authenticity of client and server connections with the appropriate [`verify_incoming`](#verify_incoming) @@ -1834,7 +1827,7 @@ Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'." - `enable_streaming` ((#rpc_enable_streaming)) enables the gRPC subscribe endpoint on a Consul Server. All servers in all federated datacenters must have this enabled before any client can use - [`cache.use_streaming_backend`](#use_streaming_backend). This setting will default to true in a future release of Consul. + [`use_streaming_backend`](#use_streaming_backend). This setting will default to true in a future release of Consul. - `segment` - Equivalent to the [`-segment` command-line flag](#_segment). @@ -2162,6 +2155,14 @@ Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'." currently only supports numeric IDs. - `mode` - The permission bits to set on the file. +- `use_streaming_backend` when enabled Consul client agents will use streaming rpc to + populate, instead of the traditional blocking queries, for endpoints which support + streaming. All servers must have [`rpc.enable_streaming`](#rpc_enable_streaming) + enabled before any client can enable `use_streaming_backend`. + At least one of [`dns.use_cache`](#dns_use_cache) or + [`http_config.use_cache`](#http_config_use_cache) must be enabled, otherwise + this setting has no effect. + - `verify_incoming` - If set to true, Consul requires that all incoming connections make use of TLS and that the client provides a certificate signed by a Certificate Authority from the