From a62dcc9bfec356bf9b861b94e4c7c402ab53dfef Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Fri, 23 Oct 2020 17:39:55 -0400 Subject: [PATCH 1/4] health: use streaming, even when cache=1 is not set --- agent/health_endpoint.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/agent/health_endpoint.go b/agent/health_endpoint.go index af935f1ba8..2f50147ee3 100644 --- a/agent/health_endpoint.go +++ b/agent/health_endpoint.go @@ -219,8 +219,8 @@ func (s *HTTPHandlers) healthServiceNodes(resp http.ResponseWriter, req *http.Re return nil, nil } - // TODO: handle this for all endpoints in parseConsistency - args.QueryOptions.UseCache = s.agent.config.HTTPUseCache && args.QueryOptions.UseCache + useStreaming := s.agent.config.CacheUseStreamingBackend && args.MinQueryIndex > 0 + args.QueryOptions.UseCache = s.agent.config.HTTPUseCache && (args.QueryOptions.UseCache || useStreaming) out, md, err := s.agent.rpcClientHealth.ServiceNodes(req.Context(), args) if err != nil { From 853667e7d82f48856f360e87b6e96ba31393234a Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Fri, 23 Oct 2020 17:47:01 -0400 Subject: [PATCH 2/4] health: change the name of UseStreamingBackend config Remove it from the cache section, and update the docs. --- agent/agent.go | 2 +- agent/config/builder.go | 2 +- agent/config/config.go | 7 ++++--- agent/config/runtime.go | 4 +++- agent/config/runtime_test.go | 30 ++++++++++++++-------------- agent/health_endpoint.go | 2 +- agent/setup.go | 2 +- website/pages/docs/agent/options.mdx | 17 ++++++++-------- 8 files changed, 35 insertions(+), 31 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index c898c864e9..ae2371cf3f 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -360,7 +360,7 @@ func New(bd BaseDeps) (*Agent, error) { } cacheName := cachetype.HealthServicesName - if bd.RuntimeConfig.CacheUseStreamingBackend { + if bd.RuntimeConfig.UseStreamingBackend { cacheName = cachetype.StreamingHealthServicesName } a.rpcClientHealth = &health.Client{Cache: bd.Cache, NetRPC: &a, CacheName: cacheName} diff --git a/agent/config/builder.go b/agent/config/builder.go index e2ec9ac037..5f6292d949 100644 --- a/agent/config/builder.go +++ b/agent/config/builder.go @@ -1093,7 +1093,7 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) { Watches: c.Watches, } - rt.CacheUseStreamingBackend = b.boolVal(c.Cache.UseStreamingBackend) + rt.UseStreamingBackend = b.boolVal(c.UseStreamingBackend) if rt.Cache.EntryFetchMaxBurst <= 0 { return RuntimeConfig{}, fmt.Errorf("cache.entry_fetch_max_burst must be strictly positive, was: %v", rt.Cache.EntryFetchMaxBurst) diff --git a/agent/config/config.go b/agent/config/config.go index b5254d7092..41af5360e0 100644 --- a/agent/config/config.go +++ b/agent/config/config.go @@ -103,9 +103,6 @@ type Cache struct { EntryFetchMaxBurst *int `json:"entry_fetch_max_burst,omitempty" hcl:"entry_fetch_max_burst" mapstructure:"entry_fetch_max_burst"` // EntryFetchRate represents the max calls/sec for a single cache entry EntryFetchRate *float64 `json:"entry_fetch_rate,omitempty" hcl:"entry_fetch_rate" mapstructure:"entry_fetch_rate"` - // UseStreamingBackend instead of blocking queries to populate the cache. - // Only supported by some cache types. - UseStreamingBackend *bool `json:"use_streaming_backend" hcl:"use_streaming_backend" mapstructure:"use_streaming_backend"` } // Config defines the format of a configuration file in either JSON or @@ -264,6 +261,10 @@ type Config struct { RPC RPC `mapstructure:"rpc"` + // UseStreamingBackend instead of blocking queries for service health and + // any other endpoints which support streaming. + UseStreamingBackend *bool `json:"use_streaming_backend" hcl:"use_streaming_backend" mapstructure:"use_streaming_backend"` + // This isn't used by Consul but we've documented a feature where users // can deploy their snapshot agent configs alongside their Consul configs // so we have a placeholder here so it can be parsed but this doesn't diff --git a/agent/config/runtime.go b/agent/config/runtime.go index c3a7e3ec9b..1fd3db430c 100644 --- a/agent/config/runtime.go +++ b/agent/config/runtime.go @@ -937,7 +937,9 @@ type RuntimeConfig struct { RPCConfig consul.RPCConfig - CacheUseStreamingBackend bool + // UseStreamingBackend enables streaming as a replacement for agent/cache + // in the client agent for endpoints which support streaming. + UseStreamingBackend bool // RaftProtocol sets the Raft protocol version to use on this server. // Defaults to 3. diff --git a/agent/config/runtime_test.go b/agent/config/runtime_test.go index dc3a7a6d3f..42adf5986f 100644 --- a/agent/config/runtime_test.go +++ b/agent/config/runtime_test.go @@ -4895,9 +4895,9 @@ func TestFullConfig(t *testing.T) { "bootstrap_expect": 53, "cache": { "entry_fetch_max_burst": 42, - "entry_fetch_rate": 0.334, - "use_streaming_backend": true + "entry_fetch_rate": 0.334 }, + "use_streaming_backend": true, "ca_file": "erA7T0PM", "ca_path": "mQEN1Mfp", "cert_file": "7s4QAzDk", @@ -5581,8 +5581,8 @@ func TestFullConfig(t *testing.T) { cache = { entry_fetch_max_burst = 42 entry_fetch_rate = 0.334 - use_streaming_backend = true }, + use_streaming_backend = true ca_file = "erA7T0PM" ca_path = "mQEN1Mfp" cert_file = "7s4QAzDk" @@ -6877,17 +6877,17 @@ func TestFullConfig(t *testing.T) { }, }, }, - CacheUseStreamingBackend: true, - SerfAdvertiseAddrLAN: tcpAddr("17.99.29.16:8301"), - SerfAdvertiseAddrWAN: tcpAddr("78.63.37.19:8302"), - SerfBindAddrLAN: tcpAddr("99.43.63.15:8301"), - SerfBindAddrWAN: tcpAddr("67.88.33.19:8302"), - SerfAllowedCIDRsLAN: []net.IPNet{}, - SerfAllowedCIDRsWAN: []net.IPNet{}, - SessionTTLMin: 26627 * time.Second, - SkipLeaveOnInt: true, - StartJoinAddrsLAN: []string{"LR3hGDoG", "MwVpZ4Up"}, - StartJoinAddrsWAN: []string{"EbFSc3nA", "kwXTh623"}, + UseStreamingBackend: true, + SerfAdvertiseAddrLAN: tcpAddr("17.99.29.16:8301"), + SerfAdvertiseAddrWAN: tcpAddr("78.63.37.19:8302"), + SerfBindAddrLAN: tcpAddr("99.43.63.15:8301"), + SerfBindAddrWAN: tcpAddr("67.88.33.19:8302"), + SerfAllowedCIDRsLAN: []net.IPNet{}, + SerfAllowedCIDRsWAN: []net.IPNet{}, + SessionTTLMin: 26627 * time.Second, + SkipLeaveOnInt: true, + StartJoinAddrsLAN: []string{"LR3hGDoG", "MwVpZ4Up"}, + StartJoinAddrsWAN: []string{"EbFSc3nA", "kwXTh623"}, Telemetry: lib.TelemetryConfig{ CirconusAPIApp: "p4QOTe9j", CirconusAPIToken: "E3j35V23", @@ -7527,7 +7527,7 @@ func TestSanitize(t *testing.T) { "SerfBindAddrWAN": "", "SerfPortLAN": 0, "SerfPortWAN": 0, - "CacheUseStreamingBackend": false, + "UseStreamingBackend": false, "ServerMode": false, "ServerName": "", "ServerPort": 0, diff --git a/agent/health_endpoint.go b/agent/health_endpoint.go index 2f50147ee3..b37c47e16a 100644 --- a/agent/health_endpoint.go +++ b/agent/health_endpoint.go @@ -219,7 +219,7 @@ func (s *HTTPHandlers) healthServiceNodes(resp http.ResponseWriter, req *http.Re return nil, nil } - useStreaming := s.agent.config.CacheUseStreamingBackend && args.MinQueryIndex > 0 + useStreaming := s.agent.config.UseStreamingBackend && args.MinQueryIndex > 0 args.QueryOptions.UseCache = s.agent.config.HTTPUseCache && (args.QueryOptions.UseCache || useStreaming) out, md, err := s.agent.rpcClientHealth.ServiceNodes(req.Context(), args) diff --git a/agent/setup.go b/agent/setup.go index 7c65777c9c..bd7b68e97d 100644 --- a/agent/setup.go +++ b/agent/setup.go @@ -120,7 +120,7 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error) // function is for registering newer cache-types which no longer have a dependency // on Agent. func registerCacheTypes(bd BaseDeps) error { - if bd.RuntimeConfig.CacheUseStreamingBackend { + if bd.RuntimeConfig.UseStreamingBackend { conn, err := bd.GRPCConnPool.ClientConn(bd.RuntimeConfig.Datacenter) if err != nil { return err diff --git a/website/pages/docs/agent/options.mdx b/website/pages/docs/agent/options.mdx index c31069c2c0..4206613470 100644 --- a/website/pages/docs/agent/options.mdx +++ b/website/pages/docs/agent/options.mdx @@ -1151,13 +1151,6 @@ Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'." The default value is "No limit" and should be tuned on large clusters to avoid performing too many RPCs on entries changing a lot. - - `use_streaming_backend` when enabled Consul client agents will use streaming rpc to - populate the cache, instead of the traditional blocking queries. All servers must - have [`rpc.enable_streaming`](#rpc_enable_streaming) enabled before any client can enable `use_streaming_backend`. - At least one of [`dns.use_cache`](#dns_use_cache) or - [`http_config.use_cache`](#http_config_use_cache) must be enabled, otherwise - this setting has no effect. - - `ca_file` This provides a file path to a PEM-encoded certificate authority. The certificate authority is used to check the authenticity of client and server connections with the appropriate [`verify_incoming`](#verify_incoming) @@ -1834,7 +1827,7 @@ Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'." - `enable_streaming` ((#rpc_enable_streaming)) enables the gRPC subscribe endpoint on a Consul Server. All servers in all federated datacenters must have this enabled before any client can use - [`cache.use_streaming_backend`](#use_streaming_backend). This setting will default to true in a future release of Consul. + [`use_streaming_backend`](#use_streaming_backend). This setting will default to true in a future release of Consul. - `segment` - Equivalent to the [`-segment` command-line flag](#_segment). @@ -2162,6 +2155,14 @@ Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'." currently only supports numeric IDs. - `mode` - The permission bits to set on the file. +- `use_streaming_backend` when enabled Consul client agents will use streaming rpc to + populate, instead of the traditional blocking queries, for endpoints which support + streaming. All servers must have [`rpc.enable_streaming`](#rpc_enable_streaming) + enabled before any client can enable `use_streaming_backend`. + At least one of [`dns.use_cache`](#dns_use_cache) or + [`http_config.use_cache`](#http_config_use_cache) must be enabled, otherwise + this setting has no effect. + - `verify_incoming` - If set to true, Consul requires that all incoming connections make use of TLS and that the client provides a certificate signed by a Certificate Authority from the From c398a6b272b8bcc693601b5be29b41f1b8be4716 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Mon, 26 Oct 2020 11:42:27 -0400 Subject: [PATCH 3/4] state: disable streaming connect topic --- agent/consul/state/memdb.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/agent/consul/state/memdb.go b/agent/consul/state/memdb.go index 5e6bbb604a..ded68636fd 100644 --- a/agent/consul/state/memdb.go +++ b/agent/consul/state/memdb.go @@ -3,9 +3,10 @@ package state import ( "fmt" + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/proto/pbsubscribe" - "github.com/hashicorp/go-memdb" ) // ReadTxn is implemented by memdb.Txn to perform read operations. @@ -179,7 +180,9 @@ func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { func newSnapshotHandlers(s *Store) stream.SnapshotHandlers { return stream.SnapshotHandlers{ - topicServiceHealth: serviceHealthSnapshot(s, topicServiceHealth), - topicServiceHealthConnect: serviceHealthSnapshot(s, topicServiceHealthConnect), + topicServiceHealth: serviceHealthSnapshot(s, topicServiceHealth), + // The connect topic is temporarily disabled until the correct events are + // created for terminating gateway changes. + //topicServiceHealthConnect: serviceHealthSnapshot(s, topicServiceHealthConnect), } } From bd44952c2e6138dd3f8471a09463abddce17b4c5 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Mon, 26 Oct 2020 11:55:49 -0400 Subject: [PATCH 4/4] streaming: disable streaming when requesting connect events Until the correct events are created for terminating gateways. --- agent/agent.go | 8 +++++++- agent/rpcclient/health/health.go | 14 +++++++++++--- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index ae2371cf3f..76fd923d4a 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -363,7 +363,13 @@ func New(bd BaseDeps) (*Agent, error) { if bd.RuntimeConfig.UseStreamingBackend { cacheName = cachetype.StreamingHealthServicesName } - a.rpcClientHealth = &health.Client{Cache: bd.Cache, NetRPC: &a, CacheName: cacheName} + a.rpcClientHealth = &health.Client{ + Cache: bd.Cache, + NetRPC: &a, + CacheName: cacheName, + // Temporarily until streaming supports all connect events + CacheNameConnect: cachetype.HealthServicesName, + } a.serviceManager = NewServiceManager(&a) diff --git a/agent/rpcclient/health/health.go b/agent/rpcclient/health/health.go index 09fe452ab6..0118a363cd 100644 --- a/agent/rpcclient/health/health.go +++ b/agent/rpcclient/health/health.go @@ -8,9 +8,12 @@ import ( ) type Client struct { - NetRPC NetRPC - Cache CacheGetter + NetRPC NetRPC + Cache CacheGetter + // CacheName to use for service health. CacheName string + // CacheNameConnect is the name of the cache to use for connect service health. + CacheNameConnect string } type NetRPC interface { @@ -51,7 +54,12 @@ func (c *Client) getServiceNodes( return out, cache.ResultMeta{}, err } - raw, md, err := c.Cache.Get(ctx, c.CacheName, &req) + cacheName := c.CacheName + if req.Connect { + cacheName = c.CacheNameConnect + } + + raw, md, err := c.Cache.Get(ctx, cacheName, &req) if err != nil { return out, md, err }