mirror of https://github.com/hashicorp/consul
Merge pull request #9026 from hashicorp/dnephin/streaming-without-cache-query-param
streaming: rename config and remove requirement for cache=1pull/9058/head
commit
7b9ee25956
|
@ -360,10 +360,16 @@ func New(bd BaseDeps) (*Agent, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
cacheName := cachetype.HealthServicesName
|
cacheName := cachetype.HealthServicesName
|
||||||
if bd.RuntimeConfig.CacheUseStreamingBackend {
|
if bd.RuntimeConfig.UseStreamingBackend {
|
||||||
cacheName = cachetype.StreamingHealthServicesName
|
cacheName = cachetype.StreamingHealthServicesName
|
||||||
}
|
}
|
||||||
a.rpcClientHealth = &health.Client{Cache: bd.Cache, NetRPC: &a, CacheName: cacheName}
|
a.rpcClientHealth = &health.Client{
|
||||||
|
Cache: bd.Cache,
|
||||||
|
NetRPC: &a,
|
||||||
|
CacheName: cacheName,
|
||||||
|
// Temporarily until streaming supports all connect events
|
||||||
|
CacheNameConnect: cachetype.HealthServicesName,
|
||||||
|
}
|
||||||
|
|
||||||
a.serviceManager = NewServiceManager(&a)
|
a.serviceManager = NewServiceManager(&a)
|
||||||
|
|
||||||
|
|
|
@ -1093,7 +1093,7 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
|
||||||
Watches: c.Watches,
|
Watches: c.Watches,
|
||||||
}
|
}
|
||||||
|
|
||||||
rt.CacheUseStreamingBackend = b.boolVal(c.Cache.UseStreamingBackend)
|
rt.UseStreamingBackend = b.boolVal(c.UseStreamingBackend)
|
||||||
|
|
||||||
if rt.Cache.EntryFetchMaxBurst <= 0 {
|
if rt.Cache.EntryFetchMaxBurst <= 0 {
|
||||||
return RuntimeConfig{}, fmt.Errorf("cache.entry_fetch_max_burst must be strictly positive, was: %v", rt.Cache.EntryFetchMaxBurst)
|
return RuntimeConfig{}, fmt.Errorf("cache.entry_fetch_max_burst must be strictly positive, was: %v", rt.Cache.EntryFetchMaxBurst)
|
||||||
|
|
|
@ -103,9 +103,6 @@ type Cache struct {
|
||||||
EntryFetchMaxBurst *int `json:"entry_fetch_max_burst,omitempty" hcl:"entry_fetch_max_burst" mapstructure:"entry_fetch_max_burst"`
|
EntryFetchMaxBurst *int `json:"entry_fetch_max_burst,omitempty" hcl:"entry_fetch_max_burst" mapstructure:"entry_fetch_max_burst"`
|
||||||
// EntryFetchRate represents the max calls/sec for a single cache entry
|
// EntryFetchRate represents the max calls/sec for a single cache entry
|
||||||
EntryFetchRate *float64 `json:"entry_fetch_rate,omitempty" hcl:"entry_fetch_rate" mapstructure:"entry_fetch_rate"`
|
EntryFetchRate *float64 `json:"entry_fetch_rate,omitempty" hcl:"entry_fetch_rate" mapstructure:"entry_fetch_rate"`
|
||||||
// UseStreamingBackend instead of blocking queries to populate the cache.
|
|
||||||
// Only supported by some cache types.
|
|
||||||
UseStreamingBackend *bool `json:"use_streaming_backend" hcl:"use_streaming_backend" mapstructure:"use_streaming_backend"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Config defines the format of a configuration file in either JSON or
|
// Config defines the format of a configuration file in either JSON or
|
||||||
|
@ -264,6 +261,10 @@ type Config struct {
|
||||||
|
|
||||||
RPC RPC `mapstructure:"rpc"`
|
RPC RPC `mapstructure:"rpc"`
|
||||||
|
|
||||||
|
// UseStreamingBackend instead of blocking queries for service health and
|
||||||
|
// any other endpoints which support streaming.
|
||||||
|
UseStreamingBackend *bool `json:"use_streaming_backend" hcl:"use_streaming_backend" mapstructure:"use_streaming_backend"`
|
||||||
|
|
||||||
// This isn't used by Consul but we've documented a feature where users
|
// This isn't used by Consul but we've documented a feature where users
|
||||||
// can deploy their snapshot agent configs alongside their Consul configs
|
// can deploy their snapshot agent configs alongside their Consul configs
|
||||||
// so we have a placeholder here so it can be parsed but this doesn't
|
// so we have a placeholder here so it can be parsed but this doesn't
|
||||||
|
|
|
@ -937,7 +937,9 @@ type RuntimeConfig struct {
|
||||||
|
|
||||||
RPCConfig consul.RPCConfig
|
RPCConfig consul.RPCConfig
|
||||||
|
|
||||||
CacheUseStreamingBackend bool
|
// UseStreamingBackend enables streaming as a replacement for agent/cache
|
||||||
|
// in the client agent for endpoints which support streaming.
|
||||||
|
UseStreamingBackend bool
|
||||||
|
|
||||||
// RaftProtocol sets the Raft protocol version to use on this server.
|
// RaftProtocol sets the Raft protocol version to use on this server.
|
||||||
// Defaults to 3.
|
// Defaults to 3.
|
||||||
|
|
|
@ -4895,9 +4895,9 @@ func TestFullConfig(t *testing.T) {
|
||||||
"bootstrap_expect": 53,
|
"bootstrap_expect": 53,
|
||||||
"cache": {
|
"cache": {
|
||||||
"entry_fetch_max_burst": 42,
|
"entry_fetch_max_burst": 42,
|
||||||
"entry_fetch_rate": 0.334,
|
"entry_fetch_rate": 0.334
|
||||||
"use_streaming_backend": true
|
|
||||||
},
|
},
|
||||||
|
"use_streaming_backend": true,
|
||||||
"ca_file": "erA7T0PM",
|
"ca_file": "erA7T0PM",
|
||||||
"ca_path": "mQEN1Mfp",
|
"ca_path": "mQEN1Mfp",
|
||||||
"cert_file": "7s4QAzDk",
|
"cert_file": "7s4QAzDk",
|
||||||
|
@ -5581,8 +5581,8 @@ func TestFullConfig(t *testing.T) {
|
||||||
cache = {
|
cache = {
|
||||||
entry_fetch_max_burst = 42
|
entry_fetch_max_burst = 42
|
||||||
entry_fetch_rate = 0.334
|
entry_fetch_rate = 0.334
|
||||||
use_streaming_backend = true
|
|
||||||
},
|
},
|
||||||
|
use_streaming_backend = true
|
||||||
ca_file = "erA7T0PM"
|
ca_file = "erA7T0PM"
|
||||||
ca_path = "mQEN1Mfp"
|
ca_path = "mQEN1Mfp"
|
||||||
cert_file = "7s4QAzDk"
|
cert_file = "7s4QAzDk"
|
||||||
|
@ -6877,17 +6877,17 @@ func TestFullConfig(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
CacheUseStreamingBackend: true,
|
UseStreamingBackend: true,
|
||||||
SerfAdvertiseAddrLAN: tcpAddr("17.99.29.16:8301"),
|
SerfAdvertiseAddrLAN: tcpAddr("17.99.29.16:8301"),
|
||||||
SerfAdvertiseAddrWAN: tcpAddr("78.63.37.19:8302"),
|
SerfAdvertiseAddrWAN: tcpAddr("78.63.37.19:8302"),
|
||||||
SerfBindAddrLAN: tcpAddr("99.43.63.15:8301"),
|
SerfBindAddrLAN: tcpAddr("99.43.63.15:8301"),
|
||||||
SerfBindAddrWAN: tcpAddr("67.88.33.19:8302"),
|
SerfBindAddrWAN: tcpAddr("67.88.33.19:8302"),
|
||||||
SerfAllowedCIDRsLAN: []net.IPNet{},
|
SerfAllowedCIDRsLAN: []net.IPNet{},
|
||||||
SerfAllowedCIDRsWAN: []net.IPNet{},
|
SerfAllowedCIDRsWAN: []net.IPNet{},
|
||||||
SessionTTLMin: 26627 * time.Second,
|
SessionTTLMin: 26627 * time.Second,
|
||||||
SkipLeaveOnInt: true,
|
SkipLeaveOnInt: true,
|
||||||
StartJoinAddrsLAN: []string{"LR3hGDoG", "MwVpZ4Up"},
|
StartJoinAddrsLAN: []string{"LR3hGDoG", "MwVpZ4Up"},
|
||||||
StartJoinAddrsWAN: []string{"EbFSc3nA", "kwXTh623"},
|
StartJoinAddrsWAN: []string{"EbFSc3nA", "kwXTh623"},
|
||||||
Telemetry: lib.TelemetryConfig{
|
Telemetry: lib.TelemetryConfig{
|
||||||
CirconusAPIApp: "p4QOTe9j",
|
CirconusAPIApp: "p4QOTe9j",
|
||||||
CirconusAPIToken: "E3j35V23",
|
CirconusAPIToken: "E3j35V23",
|
||||||
|
@ -7527,7 +7527,7 @@ func TestSanitize(t *testing.T) {
|
||||||
"SerfBindAddrWAN": "",
|
"SerfBindAddrWAN": "",
|
||||||
"SerfPortLAN": 0,
|
"SerfPortLAN": 0,
|
||||||
"SerfPortWAN": 0,
|
"SerfPortWAN": 0,
|
||||||
"CacheUseStreamingBackend": false,
|
"UseStreamingBackend": false,
|
||||||
"ServerMode": false,
|
"ServerMode": false,
|
||||||
"ServerName": "",
|
"ServerName": "",
|
||||||
"ServerPort": 0,
|
"ServerPort": 0,
|
||||||
|
|
|
@ -3,9 +3,10 @@ package state
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/hashicorp/go-memdb"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/consul/stream"
|
"github.com/hashicorp/consul/agent/consul/stream"
|
||||||
"github.com/hashicorp/consul/proto/pbsubscribe"
|
"github.com/hashicorp/consul/proto/pbsubscribe"
|
||||||
"github.com/hashicorp/go-memdb"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// ReadTxn is implemented by memdb.Txn to perform read operations.
|
// ReadTxn is implemented by memdb.Txn to perform read operations.
|
||||||
|
@ -183,7 +184,9 @@ func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
|
||||||
|
|
||||||
func newSnapshotHandlers(s *Store) stream.SnapshotHandlers {
|
func newSnapshotHandlers(s *Store) stream.SnapshotHandlers {
|
||||||
return stream.SnapshotHandlers{
|
return stream.SnapshotHandlers{
|
||||||
topicServiceHealth: serviceHealthSnapshot(s, topicServiceHealth),
|
topicServiceHealth: serviceHealthSnapshot(s, topicServiceHealth),
|
||||||
topicServiceHealthConnect: serviceHealthSnapshot(s, topicServiceHealthConnect),
|
// The connect topic is temporarily disabled until the correct events are
|
||||||
|
// created for terminating gateway changes.
|
||||||
|
//topicServiceHealthConnect: serviceHealthSnapshot(s, topicServiceHealthConnect),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -219,8 +219,8 @@ func (s *HTTPHandlers) healthServiceNodes(resp http.ResponseWriter, req *http.Re
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: handle this for all endpoints in parseConsistency
|
useStreaming := s.agent.config.UseStreamingBackend && args.MinQueryIndex > 0
|
||||||
args.QueryOptions.UseCache = s.agent.config.HTTPUseCache && args.QueryOptions.UseCache
|
args.QueryOptions.UseCache = s.agent.config.HTTPUseCache && (args.QueryOptions.UseCache || useStreaming)
|
||||||
|
|
||||||
out, md, err := s.agent.rpcClientHealth.ServiceNodes(req.Context(), args)
|
out, md, err := s.agent.rpcClientHealth.ServiceNodes(req.Context(), args)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -8,9 +8,12 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type Client struct {
|
type Client struct {
|
||||||
NetRPC NetRPC
|
NetRPC NetRPC
|
||||||
Cache CacheGetter
|
Cache CacheGetter
|
||||||
|
// CacheName to use for service health.
|
||||||
CacheName string
|
CacheName string
|
||||||
|
// CacheNameConnect is the name of the cache to use for connect service health.
|
||||||
|
CacheNameConnect string
|
||||||
}
|
}
|
||||||
|
|
||||||
type NetRPC interface {
|
type NetRPC interface {
|
||||||
|
@ -51,7 +54,12 @@ func (c *Client) getServiceNodes(
|
||||||
return out, cache.ResultMeta{}, err
|
return out, cache.ResultMeta{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
raw, md, err := c.Cache.Get(ctx, c.CacheName, &req)
|
cacheName := c.CacheName
|
||||||
|
if req.Connect {
|
||||||
|
cacheName = c.CacheNameConnect
|
||||||
|
}
|
||||||
|
|
||||||
|
raw, md, err := c.Cache.Get(ctx, cacheName, &req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return out, md, err
|
return out, md, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -122,7 +122,7 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error)
|
||||||
// function is for registering newer cache-types which no longer have a dependency
|
// function is for registering newer cache-types which no longer have a dependency
|
||||||
// on Agent.
|
// on Agent.
|
||||||
func registerCacheTypes(bd BaseDeps) error {
|
func registerCacheTypes(bd BaseDeps) error {
|
||||||
if bd.RuntimeConfig.CacheUseStreamingBackend {
|
if bd.RuntimeConfig.UseStreamingBackend {
|
||||||
conn, err := bd.GRPCConnPool.ClientConn(bd.RuntimeConfig.Datacenter)
|
conn, err := bd.GRPCConnPool.ClientConn(bd.RuntimeConfig.Datacenter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
|
@ -1151,13 +1151,6 @@ Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'."
|
||||||
The default value is "No limit" and should be tuned on large
|
The default value is "No limit" and should be tuned on large
|
||||||
clusters to avoid performing too many RPCs on entries changing a lot.
|
clusters to avoid performing too many RPCs on entries changing a lot.
|
||||||
|
|
||||||
- `use_streaming_backend` when enabled Consul client agents will use streaming rpc to
|
|
||||||
populate the cache, instead of the traditional blocking queries. All servers must
|
|
||||||
have [`rpc.enable_streaming`](#rpc_enable_streaming) enabled before any client can enable `use_streaming_backend`.
|
|
||||||
At least one of [`dns.use_cache`](#dns_use_cache) or
|
|
||||||
[`http_config.use_cache`](#http_config_use_cache) must be enabled, otherwise
|
|
||||||
this setting has no effect.
|
|
||||||
|
|
||||||
- `ca_file` This provides a file path to a PEM-encoded certificate
|
- `ca_file` This provides a file path to a PEM-encoded certificate
|
||||||
authority. The certificate authority is used to check the authenticity of client
|
authority. The certificate authority is used to check the authenticity of client
|
||||||
and server connections with the appropriate [`verify_incoming`](#verify_incoming)
|
and server connections with the appropriate [`verify_incoming`](#verify_incoming)
|
||||||
|
@ -1834,7 +1827,7 @@ Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'."
|
||||||
|
|
||||||
- `enable_streaming` ((#rpc_enable_streaming)) enables the gRPC subscribe endpoint on a Consul Server. All
|
- `enable_streaming` ((#rpc_enable_streaming)) enables the gRPC subscribe endpoint on a Consul Server. All
|
||||||
servers in all federated datacenters must have this enabled before any client can use
|
servers in all federated datacenters must have this enabled before any client can use
|
||||||
[`cache.use_streaming_backend`](#use_streaming_backend). This setting will default to true in a future release of Consul.
|
[`use_streaming_backend`](#use_streaming_backend). This setting will default to true in a future release of Consul.
|
||||||
|
|
||||||
- `segment` <EnterpriseAlert inline /> - Equivalent to the [`-segment` command-line flag](#_segment).
|
- `segment` <EnterpriseAlert inline /> - Equivalent to the [`-segment` command-line flag](#_segment).
|
||||||
|
|
||||||
|
@ -2162,6 +2155,14 @@ Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'."
|
||||||
currently only supports numeric IDs.
|
currently only supports numeric IDs.
|
||||||
- `mode` - The permission bits to set on the file.
|
- `mode` - The permission bits to set on the file.
|
||||||
|
|
||||||
|
- `use_streaming_backend` when enabled Consul client agents will use streaming rpc to
|
||||||
|
populate, instead of the traditional blocking queries, for endpoints which support
|
||||||
|
streaming. All servers must have [`rpc.enable_streaming`](#rpc_enable_streaming)
|
||||||
|
enabled before any client can enable `use_streaming_backend`.
|
||||||
|
At least one of [`dns.use_cache`](#dns_use_cache) or
|
||||||
|
[`http_config.use_cache`](#http_config_use_cache) must be enabled, otherwise
|
||||||
|
this setting has no effect.
|
||||||
|
|
||||||
- `verify_incoming` - If set to true, Consul
|
- `verify_incoming` - If set to true, Consul
|
||||||
requires that all incoming connections make use of TLS and that the client
|
requires that all incoming connections make use of TLS and that the client
|
||||||
provides a certificate signed by a Certificate Authority from the
|
provides a certificate signed by a Certificate Authority from the
|
||||||
|
|
Loading…
Reference in New Issue