Merge pull request #8825 from hashicorp/streaming/add-config

streaming: add config and docs
pull/8912/head
Daniel Nephin 2020-10-09 14:33:58 -04:00 committed by GitHub
commit ea77eccb14
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 146 additions and 46 deletions

View File

@ -359,7 +359,11 @@ func New(bd BaseDeps) (*Agent, error) {
cache: bd.Cache,
}
a.rpcClientHealth = &health.Client{Cache: bd.Cache, NetRPC: &a}
cacheName := cachetype.HealthServicesName
if bd.RuntimeConfig.CacheUseStreamingBackend {
cacheName = cachetype.StreamingHealthServicesName
}
a.rpcClientHealth = &health.Client{Cache: bd.Cache, NetRPC: &a, CacheName: cacheName}
a.serviceManager = NewServiceManager(&a)
@ -1140,6 +1144,8 @@ func newConsulConfig(runtimeCfg *config.RuntimeConfig, logger hclog.Logger) (*co
// copy it whatever the value.
cfg.RPCHoldTimeout = runtimeCfg.RPCHoldTimeout
cfg.RPCConfig = runtimeCfg.RPCConfig
if runtimeCfg.LeaveDrainTime > 0 {
cfg.LeaveDrainTime = runtimeCfg.LeaveDrainTime
}
@ -3687,10 +3693,11 @@ func (a *Agent) LocalBlockingQuery(alwaysBlock bool, hash string, wait time.Dura
}
}
// registerCache configures the cache and registers all the supported
// types onto the cache. This is NOT safe to call multiple times so
// care should be taken to call this exactly once after the cache
// field has been initialized.
// registerCache types on a.cache.
// This function may only be called once from New.
//
// Note: this function no longer registered all cache-types. Newer cache-types
// that do not depend on Agent are registered from registerCacheTypes.
func (a *Agent) registerCache() {
// Note that you should register the _agent_ as the RPC implementation and not
// the a.delegate directly, otherwise tests that rely on overriding RPC

View File

@ -28,6 +28,19 @@ type StreamingHealthServices struct {
deps MaterializerDeps
}
// RegisterOptions returns options with a much shorter LastGetTTL than the default.
// Unlike other cache-types, StreamingHealthServices runs a materialized view in
// the background which will receive streamed events from a server. If the cache
// is not being used, that stream uses memory on the server and network transfer
// between the client and the server.
// The materialize view and the stream are stopped when the cache entry expires,
// so using a shorter TTL ensures the cache entry expires sooner.
func (c *StreamingHealthServices) RegisterOptions() cache.RegisterOptions {
opts := c.RegisterOptionsBlockingRefresh.RegisterOptions()
opts.LastGetTTL = 10 * time.Minute
return opts
}
// NewStreamingHealthServices creates a cache-type for watching for service
// health results via streaming updates.
func NewStreamingHealthServices(deps MaterializerDeps) *StreamingHealthServices {

View File

@ -16,6 +16,13 @@ import (
"strings"
"time"
"github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-sockaddr/template"
"github.com/hashicorp/memberlist"
"golang.org/x/time/rate"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/checks"
"github.com/hashicorp/consul/agent/connect/ca"
@ -30,12 +37,6 @@ import (
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-sockaddr/template"
"github.com/hashicorp/memberlist"
"golang.org/x/time/rate"
)
// Load will build the configuration including the extraHead source injected
@ -1042,6 +1043,7 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
RPCMaxConnsPerClient: b.intVal(c.Limits.RPCMaxConnsPerClient),
RPCProtocol: b.intVal(c.RPCProtocol),
RPCRateLimit: rate.Limit(b.float64Val(c.Limits.RPCRate)),
RPCConfig: consul.RPCConfig{EnableStreaming: b.boolVal(c.RPC.EnableStreaming)},
RaftProtocol: b.intVal(c.RaftProtocol),
RaftSnapshotThreshold: b.intVal(c.RaftSnapshotThreshold),
RaftSnapshotInterval: b.durationVal("raft_snapshot_interval", c.RaftSnapshotInterval),
@ -1091,6 +1093,8 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
Watches: c.Watches,
}
rt.CacheUseStreamingBackend = b.boolVal(c.Cache.UseStreamingBackend)
if rt.Cache.EntryFetchMaxBurst <= 0 {
return RuntimeConfig{}, fmt.Errorf("cache.entry_fetch_max_burst must be strictly positive, was: %v", rt.Cache.EntryFetchMaxBurst)
}

View File

@ -4,9 +4,10 @@ import (
"encoding/json"
"fmt"
"github.com/hashicorp/consul/lib/decode"
"github.com/hashicorp/hcl"
"github.com/mitchellh/mapstructure"
"github.com/hashicorp/consul/lib/decode"
)
const (
@ -96,12 +97,15 @@ func (l LiteralSource) Parse() (Config, mapstructure.Metadata, error) {
return l.Config, mapstructure.Metadata{}, nil
}
// Cache is the tunning configuration for cache, values are optional
// Cache configuration for the agent/cache.
type Cache struct {
// EntryFetchMaxBurst max burst size of RateLimit for a single cache entry
EntryFetchMaxBurst *int `json:"entry_fetch_max_burst,omitempty" hcl:"entry_fetch_max_burst" mapstructure:"entry_fetch_max_burst"`
// EntryFetchRate represents the max calls/sec for a single cache entry
EntryFetchRate *float64 `json:"entry_fetch_rate,omitempty" hcl:"entry_fetch_rate" mapstructure:"entry_fetch_rate"`
// UseStreamingBackend instead of blocking queries to populate the cache.
// Only supported by some cache types.
UseStreamingBackend *bool `json:"use_streaming_backend" hcl:"use_streaming_backend" mapstructure:"use_streaming_backend"`
}
// Config defines the format of a configuration file in either JSON or
@ -258,6 +262,8 @@ type Config struct {
VerifyServerHostname *bool `json:"verify_server_hostname,omitempty" hcl:"verify_server_hostname" mapstructure:"verify_server_hostname"`
Watches []map[string]interface{} `json:"watches,omitempty" hcl:"watches" mapstructure:"watches"`
RPC RPC `mapstructure:"rpc"`
// This isn't used by Consul but we've documented a feature where users
// can deploy their snapshot agent configs alongside their Consul configs
// so we have a placeholder here so it can be parsed but this doesn't
@ -798,3 +804,7 @@ type RawUIMetricsProxyAddHeader struct {
Name *string `json:"name,omitempty" hcl:"name" mapstructure:"name"`
Value *string `json:"value,omitempty" hcl:"value" mapstructure:"value"`
}
type RPC struct {
EnableStreaming *bool `json:"enable_streaming" hcl:"enable_streaming" mapstructure:"enable_streaming"`
}

View File

@ -7,7 +7,11 @@ import (
"strings"
"time"
"github.com/hashicorp/go-uuid"
"golang.org/x/time/rate"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/api"
@ -15,8 +19,6 @@ import (
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/go-uuid"
"golang.org/x/time/rate"
)
type RuntimeSOAConfig struct {
@ -933,6 +935,10 @@ type RuntimeConfig struct {
// hcl: protocol = int
RPCProtocol int
RPCConfig consul.RPCConfig
CacheUseStreamingBackend bool
// RaftProtocol sets the Raft protocol version to use on this server.
// Defaults to 3.
//

View File

@ -18,15 +18,17 @@ import (
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/checks"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/types"
"github.com/stretchr/testify/require"
)
type configTest struct {
@ -4893,7 +4895,8 @@ func TestFullConfig(t *testing.T) {
"bootstrap_expect": 53,
"cache": {
"entry_fetch_max_burst": 42,
"entry_fetch_rate": 0.334
"entry_fetch_rate": 0.334,
"use_streaming_backend": true
},
"ca_file": "erA7T0PM",
"ca_path": "mQEN1Mfp",
@ -5130,6 +5133,7 @@ func TestFullConfig(t *testing.T) {
"retry_join_wan": [ "PFsR02Ye", "rJdQIhER" ],
"retry_max": 913,
"retry_max_wan": 23160,
"rpc": {"enable_streaming": true},
"segment": "BC2NhTDi",
"segments": [
{
@ -5577,6 +5581,7 @@ func TestFullConfig(t *testing.T) {
cache = {
entry_fetch_max_burst = 42
entry_fetch_rate = 0.334
use_streaming_backend = true
},
ca_file = "erA7T0PM"
ca_path = "mQEN1Mfp"
@ -5816,6 +5821,9 @@ func TestFullConfig(t *testing.T) {
retry_join_wan = [ "PFsR02Ye", "rJdQIhER" ]
retry_max = 913
retry_max_wan = 23160
rpc {
enable_streaming = true
}
segment = "BC2NhTDi"
segments = [
{
@ -6573,6 +6581,7 @@ func TestFullConfig(t *testing.T) {
RetryJoinMaxAttemptsLAN: 913,
RetryJoinMaxAttemptsWAN: 23160,
RetryJoinWAN: []string{"PFsR02Ye", "rJdQIhER"},
RPCConfig: consul.RPCConfig{EnableStreaming: true},
SegmentName: "BC2NhTDi",
Segments: []structs.NetworkSegment{
{
@ -6868,16 +6877,17 @@ func TestFullConfig(t *testing.T) {
},
},
},
SerfAdvertiseAddrLAN: tcpAddr("17.99.29.16:8301"),
SerfAdvertiseAddrWAN: tcpAddr("78.63.37.19:8302"),
SerfBindAddrLAN: tcpAddr("99.43.63.15:8301"),
SerfBindAddrWAN: tcpAddr("67.88.33.19:8302"),
SerfAllowedCIDRsLAN: []net.IPNet{},
SerfAllowedCIDRsWAN: []net.IPNet{},
SessionTTLMin: 26627 * time.Second,
SkipLeaveOnInt: true,
StartJoinAddrsLAN: []string{"LR3hGDoG", "MwVpZ4Up"},
StartJoinAddrsWAN: []string{"EbFSc3nA", "kwXTh623"},
CacheUseStreamingBackend: true,
SerfAdvertiseAddrLAN: tcpAddr("17.99.29.16:8301"),
SerfAdvertiseAddrWAN: tcpAddr("78.63.37.19:8302"),
SerfBindAddrLAN: tcpAddr("99.43.63.15:8301"),
SerfBindAddrWAN: tcpAddr("67.88.33.19:8302"),
SerfAllowedCIDRsLAN: []net.IPNet{},
SerfAllowedCIDRsWAN: []net.IPNet{},
SessionTTLMin: 26627 * time.Second,
SkipLeaveOnInt: true,
StartJoinAddrsLAN: []string{"LR3hGDoG", "MwVpZ4Up"},
StartJoinAddrsWAN: []string{"EbFSc3nA", "kwXTh623"},
Telemetry: lib.TelemetryConfig{
CirconusAPIApp: "p4QOTe9j",
CirconusAPIToken: "E3j35V23",
@ -7484,6 +7494,9 @@ func TestSanitize(t *testing.T) {
"RPCMaxConnsPerClient": 0,
"RPCProtocol": 0,
"RPCRateLimit": 0,
"RPCConfig": {
"EnableStreaming": false
},
"RaftProtocol": 0,
"RaftSnapshotInterval": "0s",
"RaftSnapshotThreshold": 0,
@ -7514,6 +7527,7 @@ func TestSanitize(t *testing.T) {
"SerfBindAddrWAN": "",
"SerfPortLAN": 0,
"SerfPortWAN": 0,
"CacheUseStreamingBackend": false,
"ServerMode": false,
"ServerName": "",
"ServerPort": 0,

View File

@ -6,6 +6,11 @@ import (
"os"
"time"
"github.com/hashicorp/memberlist"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
"golang.org/x/time/rate"
"github.com/hashicorp/consul/agent/checks"
"github.com/hashicorp/consul/agent/consul/autopilot"
"github.com/hashicorp/consul/agent/structs"
@ -13,10 +18,6 @@ import (
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/consul/version"
"github.com/hashicorp/memberlist"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
"golang.org/x/time/rate"
)
const (
@ -480,8 +481,7 @@ type Config struct {
// AutoEncrypt.Sign requests.
AutoEncryptAllowTLS bool
// TODO: godoc, set this value from Agent
EnableGRPCServer bool
RPCConfig RPCConfig
// Embedded Consul Enterprise specific configuration
*EnterpriseConfig
@ -649,3 +649,10 @@ func DefaultConfig() *Config {
return conf
}
// RPCConfig settings for the RPC server
//
// TODO: move many settings to this struct.
type RPCConfig struct {
EnableStreaming bool
}

View File

@ -617,7 +617,7 @@ func NewServer(config *Config, flat Deps) (*Server, error) {
}
func newGRPCHandlerFromConfig(deps Deps, config *Config, s *Server) connHandler {
if !config.EnableGRPCServer {
if !config.RPCConfig.EnableStreaming {
return agentgrpc.NoOpHandler{Logger: deps.Logger}
}

View File

@ -4,13 +4,13 @@ import (
"context"
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/structs"
)
type Client struct {
NetRPC NetRPC
Cache CacheGetter
NetRPC NetRPC
Cache CacheGetter
CacheName string
}
type NetRPC interface {
@ -51,7 +51,7 @@ func (c *Client) getServiceNodes(
return out, cache.ResultMeta{}, err
}
raw, md, err := c.Cache.Get(ctx, cachetype.HealthServicesName, &req)
raw, md, err := c.Cache.Get(ctx, c.CacheName, &req)
if err != nil {
return out, md, err
}

View File

@ -7,8 +7,12 @@ import (
"net/http"
"time"
"github.com/hashicorp/go-hclog"
"google.golang.org/grpc/grpclog"
autoconf "github.com/hashicorp/consul/agent/auto-config"
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/grpc"
@ -19,9 +23,8 @@ import (
"github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/proto/pbsubscribe"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/go-hclog"
"google.golang.org/grpc/grpclog"
)
// TODO: BaseDeps should be renamed in the future once more of Agent.Start
@ -84,7 +87,6 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error)
d.Cache = cache.New(cfg.Cache)
d.ConnPool = newConnPool(cfg, d.Logger, d.TLSConfigurator)
// TODO(streaming): setConfig.Scheme name for tests
builder := resolver.NewServerResolverBuilder(resolver.Config{})
resolver.RegisterWithGRPC(builder)
d.GRPCConnPool = grpc.NewClientConnPool(builder, grpc.TLSWrapper(d.TLSConfigurator.OutgoingRPCWrapper()))
@ -105,9 +107,33 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error)
return d, err
}
if err := registerCacheTypes(d); err != nil {
return d, err
}
return d, nil
}
// registerCacheTypes on bd.Cache.
//
// Note: most cache types are still registered in Agent.registerCache. This
// function is for registering newer cache-types which no longer have a dependency
// on Agent.
func registerCacheTypes(bd BaseDeps) error {
if bd.RuntimeConfig.CacheUseStreamingBackend {
conn, err := bd.GRPCConnPool.ClientConn(bd.RuntimeConfig.Datacenter)
if err != nil {
return err
}
matDeps := cachetype.MaterializerDeps{
Client: pbsubscribe.NewStateChangeSubscriptionClient(conn),
Logger: bd.Logger,
}
bd.Cache.RegisterType(cachetype.StreamingHealthServicesName, cachetype.NewStreamingHealthServices(matDeps))
}
return nil
}
func newConnPool(config *config.RuntimeConfig, logger hclog.Logger, tls *tlsutil.Configurator) *pool.ConnPool {
var rpcSrcAddr *net.TCPAddr
if !ipaddr.IsAny(config.RPCBindAddr) {

View File

@ -55,7 +55,7 @@ There are four specific cases covered with increasing complexity:
state for client agent's RPC client.
- [ ] Add a test to `agent/agent_test.go` similar to others with prefix
`TestAgent_reloadConfig*`.
- [ ] Add documentation to `website/source/docs/agent/options.html.md`.
- [ ] Add documentation to `website/pages/docs/agent/options.mdx`.
Done! You can now use your new field in a client agent by accessing
`s.agent.Config.<FieldName>`.

View File

@ -1133,14 +1133,14 @@ Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'."
</Tab>
</Tabs>
- `cache` Cache configuration of agent. The configurable values are the following:
- `cache` configuration for client agents. The configurable values are the following:
- `entry_fetch_max_burst`: The size of the token bucket used to recharge the rate-limit per
- `entry_fetch_max_burst` The size of the token bucket used to recharge the rate-limit per
cache entry. The default value is 2 and means that when cache has not been updated
for a long time, 2 successive queries can be made as long as the rate-limit is not
reached.
- `entry_fetch_rate`: configures the rate-limit at which the cache may refresh a single
- `entry_fetch_rate` configures the rate-limit at which the cache may refresh a single
entry. On a cluster with many changes/s, watching changes in the cache might put high
pressure on the servers. This ensures the number of requests for a single cache entry
will never go beyond this limit, even when a given service changes every 1/100s.
@ -1151,6 +1151,13 @@ Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'."
The default value is "No limit" and should be tuned on large
clusters to avoid performing too many RPCs on entries changing a lot.
- `use_streaming_backend` when enabled Consul client agents will use streaming rpc to
populate the cache, instead of the traditional blocking queries. All servers must
have [`rpc.enable_streaming`](#rpc_enable_streaming) enabled before any client can enable `use_streaming_backend`.
At least one of [`dns.use_cache`](#dns_use_cache) or
[`http_config.use_cache`](#http_config_use_cache) must be enabled, otherwise
this setting has no effect.
- `ca_file` This provides a file path to a PEM-encoded certificate
authority. The certificate authority is used to check the authenticity of client
and server connections with the appropriate [`verify_incoming`](#verify_incoming)
@ -1623,7 +1630,7 @@ Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'."
- `allow_write_http_from` This object is a list of networks in CIDR notation (eg "127.0.0.0/8") that are allowed to call the agent write endpoints. It defaults to an empty list, which means all networks are allowed. This is used to make the agent read-only, except for select ip ranges. - To block write calls from anywhere, use `[ "255.255.255.255/32" ]`. - To only allow write calls from localhost, use `[ "127.0.0.0/8" ]` - To only allow specific IPs, use `[ "10.0.0.1/32", "10.0.0.2/32" ]`
- `use_cache` Defaults to true. If disabled, the agent won't be using [agent caching](/api/features/caching) to answer the request. Even when the url parameter is provided.
- `use_cache` ((#http_config_use_cache)) Defaults to true. If disabled, the agent won't be using [agent caching](/api/features/caching) to answer the request. Even when the url parameter is provided.
- `leave_on_terminate` If enabled, when the agent receives a TERM signal, it will send a `Leave` message to the rest of the cluster and gracefully leave. The default behavior for this feature varies based on whether or not the agent is running as a client or a server (prior to Consul 0.7 the default value was unconditionally set to `false`). On agents in client-mode, this defaults to `true` and for agents in server-mode, this defaults to `false`.
@ -1820,6 +1827,12 @@ Valid time units are 'ns', 'us' (or 'µs'), 'ms', 's', 'm', 'h'."
- `retry_interval_wan` Equivalent to the [`-retry-interval-wan` command-line flag](#_retry_interval_wan).
- `rpc` configuration for Consul servers.
- `enable_streaming` ((#rpc_enable_streaming)) enables the gRPC subscribe endpoint on a Consul Server. All
servers in all federated datacenters must have this enabled before any client can use
[`cache.use_streaming_backend`](#use_streaming_backend). This setting will default to true in a future release of Consul.
- `segment` <EnterpriseAlert inline /> - Equivalent to the [`-segment` command-line flag](#_segment).
- `segments` <EnterpriseAlert inline /> - This is a list of nested objects