mirror of https://github.com/hashicorp/consul
Merge pull request #8680 from hashicorp/dnephin/replace-consul-opts-with-base-deps
agent: Repalce ConsulOptions with a new struct from agent.BaseDepspull/8745/head
commit
c18516ad7d
|
@ -18,7 +18,6 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/dns"
|
"github.com/hashicorp/consul/agent/dns"
|
||||||
"github.com/hashicorp/consul/agent/router"
|
|
||||||
"github.com/hashicorp/consul/agent/token"
|
"github.com/hashicorp/consul/agent/token"
|
||||||
"github.com/hashicorp/go-connlimit"
|
"github.com/hashicorp/go-connlimit"
|
||||||
"github.com/hashicorp/go-hclog"
|
"github.com/hashicorp/go-hclog"
|
||||||
|
@ -29,14 +28,12 @@ import (
|
||||||
"github.com/armon/go-metrics"
|
"github.com/armon/go-metrics"
|
||||||
"github.com/hashicorp/consul/acl"
|
"github.com/hashicorp/consul/acl"
|
||||||
"github.com/hashicorp/consul/agent/ae"
|
"github.com/hashicorp/consul/agent/ae"
|
||||||
autoconf "github.com/hashicorp/consul/agent/auto-config"
|
|
||||||
"github.com/hashicorp/consul/agent/cache"
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||||||
"github.com/hashicorp/consul/agent/checks"
|
"github.com/hashicorp/consul/agent/checks"
|
||||||
"github.com/hashicorp/consul/agent/config"
|
"github.com/hashicorp/consul/agent/config"
|
||||||
"github.com/hashicorp/consul/agent/consul"
|
"github.com/hashicorp/consul/agent/consul"
|
||||||
"github.com/hashicorp/consul/agent/local"
|
"github.com/hashicorp/consul/agent/local"
|
||||||
"github.com/hashicorp/consul/agent/pool"
|
|
||||||
"github.com/hashicorp/consul/agent/proxycfg"
|
"github.com/hashicorp/consul/agent/proxycfg"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/agent/systemd"
|
"github.com/hashicorp/consul/agent/systemd"
|
||||||
|
@ -156,7 +153,8 @@ type notifier interface {
|
||||||
// mode, it runs a full Consul server. In client-only mode, it only forwards
|
// mode, it runs a full Consul server. In client-only mode, it only forwards
|
||||||
// requests to other Consul servers.
|
// requests to other Consul servers.
|
||||||
type Agent struct {
|
type Agent struct {
|
||||||
autoConf *autoconf.AutoConfig
|
// TODO: remove fields that are already in BaseDeps
|
||||||
|
baseDeps BaseDeps
|
||||||
|
|
||||||
// config is the agent configuration.
|
// config is the agent configuration.
|
||||||
config *config.RuntimeConfig
|
config *config.RuntimeConfig
|
||||||
|
@ -164,9 +162,6 @@ type Agent struct {
|
||||||
// Used for writing our logs
|
// Used for writing our logs
|
||||||
logger hclog.InterceptLogger
|
logger hclog.InterceptLogger
|
||||||
|
|
||||||
// In-memory sink used for collecting metrics
|
|
||||||
MemSink MetricsHandler
|
|
||||||
|
|
||||||
// delegate is either a *consul.Server or *consul.Client
|
// delegate is either a *consul.Server or *consul.Client
|
||||||
// depending on the configuration
|
// depending on the configuration
|
||||||
delegate delegate
|
delegate delegate
|
||||||
|
@ -295,12 +290,6 @@ type Agent struct {
|
||||||
// IP.
|
// IP.
|
||||||
httpConnLimiter connlimit.Limiter
|
httpConnLimiter connlimit.Limiter
|
||||||
|
|
||||||
// Connection Pool
|
|
||||||
connPool *pool.ConnPool
|
|
||||||
|
|
||||||
// Shared RPC Router
|
|
||||||
router *router.Router
|
|
||||||
|
|
||||||
// enterpriseAgent embeds fields that we only access in consul-enterprise builds
|
// enterpriseAgent embeds fields that we only access in consul-enterprise builds
|
||||||
enterpriseAgent
|
enterpriseAgent
|
||||||
}
|
}
|
||||||
|
@ -337,16 +326,12 @@ func New(bd BaseDeps) (*Agent, error) {
|
||||||
shutdownCh: make(chan struct{}),
|
shutdownCh: make(chan struct{}),
|
||||||
endpoints: make(map[string]string),
|
endpoints: make(map[string]string),
|
||||||
|
|
||||||
// TODO: store the BaseDeps instead of copying them over to Agent
|
baseDeps: bd,
|
||||||
tokens: bd.Tokens,
|
tokens: bd.Tokens,
|
||||||
logger: bd.Logger,
|
logger: bd.Logger,
|
||||||
tlsConfigurator: bd.TLSConfigurator,
|
tlsConfigurator: bd.TLSConfigurator,
|
||||||
config: bd.RuntimeConfig,
|
config: bd.RuntimeConfig,
|
||||||
cache: bd.Cache,
|
cache: bd.Cache,
|
||||||
MemSink: bd.MetricsHandler,
|
|
||||||
connPool: bd.ConnPool,
|
|
||||||
autoConf: bd.AutoConfig,
|
|
||||||
router: bd.Router,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
a.serviceManager = NewServiceManager(&a)
|
a.serviceManager = NewServiceManager(&a)
|
||||||
|
@ -407,7 +392,7 @@ func (a *Agent) Start(ctx context.Context) error {
|
||||||
|
|
||||||
// This needs to be done early on as it will potentially alter the configuration
|
// This needs to be done early on as it will potentially alter the configuration
|
||||||
// and then how other bits are brought up
|
// and then how other bits are brought up
|
||||||
c, err := a.autoConf.InitialConfiguration(ctx)
|
c, err := a.baseDeps.AutoConfig.InitialConfiguration(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -454,23 +439,15 @@ func (a *Agent) Start(ctx context.Context) error {
|
||||||
return fmt.Errorf("failed to start Consul enterprise component: %v", err)
|
return fmt.Errorf("failed to start Consul enterprise component: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
options := []consul.ConsulOption{
|
|
||||||
consul.WithLogger(a.logger),
|
|
||||||
consul.WithTokenStore(a.tokens),
|
|
||||||
consul.WithTLSConfigurator(a.tlsConfigurator),
|
|
||||||
consul.WithConnectionPool(a.connPool),
|
|
||||||
consul.WithRouter(a.router),
|
|
||||||
}
|
|
||||||
|
|
||||||
// Setup either the client or the server.
|
// Setup either the client or the server.
|
||||||
if c.ServerMode {
|
if c.ServerMode {
|
||||||
server, err := consul.NewServer(consulCfg, options...)
|
server, err := consul.NewServer(consulCfg, a.baseDeps.Deps)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Failed to start Consul server: %v", err)
|
return fmt.Errorf("Failed to start Consul server: %v", err)
|
||||||
}
|
}
|
||||||
a.delegate = server
|
a.delegate = server
|
||||||
} else {
|
} else {
|
||||||
client, err := consul.NewClient(consulCfg, options...)
|
client, err := consul.NewClient(consulCfg, a.baseDeps.Deps)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Failed to start Consul client: %v", err)
|
return fmt.Errorf("Failed to start Consul client: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -487,7 +464,7 @@ func (a *Agent) Start(ctx context.Context) error {
|
||||||
a.State.Delegate = a.delegate
|
a.State.Delegate = a.delegate
|
||||||
a.State.TriggerSyncChanges = a.sync.SyncChanges.Trigger
|
a.State.TriggerSyncChanges = a.sync.SyncChanges.Trigger
|
||||||
|
|
||||||
if err := a.autoConf.Start(&lib.StopChannelContext{StopCh: a.shutdownCh}); err != nil {
|
if err := a.baseDeps.AutoConfig.Start(&lib.StopChannelContext{StopCh: a.shutdownCh}); err != nil {
|
||||||
return fmt.Errorf("AutoConf failed to start certificate monitor: %w", err)
|
return fmt.Errorf("AutoConf failed to start certificate monitor: %w", err)
|
||||||
}
|
}
|
||||||
a.serviceManager.Start()
|
a.serviceManager.Start()
|
||||||
|
@ -1297,7 +1274,7 @@ func (a *Agent) ShutdownAgent() error {
|
||||||
|
|
||||||
// this would be cancelled anyways (by the closing of the shutdown ch) but
|
// this would be cancelled anyways (by the closing of the shutdown ch) but
|
||||||
// this should help them to be stopped more quickly
|
// this should help them to be stopped more quickly
|
||||||
a.autoConf.Stop()
|
a.baseDeps.AutoConfig.Stop()
|
||||||
|
|
||||||
// Stop the service manager (must happen before we take the stateLock to avoid deadlock)
|
// Stop the service manager (must happen before we take the stateLock to avoid deadlock)
|
||||||
if a.serviceManager != nil {
|
if a.serviceManager != nil {
|
||||||
|
@ -3472,7 +3449,7 @@ func (a *Agent) loadLimits(conf *config.RuntimeConfig) {
|
||||||
// all services, checks, tokens, metadata, dnsServer configs, etc.
|
// all services, checks, tokens, metadata, dnsServer configs, etc.
|
||||||
// It will also reload all ongoing watches.
|
// It will also reload all ongoing watches.
|
||||||
func (a *Agent) ReloadConfig() error {
|
func (a *Agent) ReloadConfig() error {
|
||||||
newCfg, err := a.autoConf.ReadConfig()
|
newCfg, err := a.baseDeps.AutoConfig.ReadConfig()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -152,7 +152,7 @@ func (s *HTTPHandlers) AgentMetrics(resp http.ResponseWriter, req *http.Request)
|
||||||
handler.ServeHTTP(resp, req)
|
handler.ServeHTTP(resp, req)
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
return s.agent.MemSink.DisplayMetrics(resp, req)
|
return s.agent.baseDeps.MetricsHandler.DisplayMetrics(resp, req)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *HTTPHandlers) AgentReload(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
func (s *HTTPHandlers) AgentReload(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||||
|
|
|
@ -4609,7 +4609,7 @@ func TestSharedRPCRouter(t *testing.T) {
|
||||||
|
|
||||||
testrpc.WaitForTestAgent(t, srv.RPC, "dc1")
|
testrpc.WaitForTestAgent(t, srv.RPC, "dc1")
|
||||||
|
|
||||||
mgr, server := srv.Agent.router.FindLANRoute()
|
mgr, server := srv.Agent.baseDeps.Router.FindLANRoute()
|
||||||
require.NotNil(t, mgr)
|
require.NotNil(t, mgr)
|
||||||
require.NotNil(t, server)
|
require.NotNil(t, server)
|
||||||
|
|
||||||
|
@ -4621,7 +4621,7 @@ func TestSharedRPCRouter(t *testing.T) {
|
||||||
|
|
||||||
testrpc.WaitForTestAgent(t, client.RPC, "dc1")
|
testrpc.WaitForTestAgent(t, client.RPC, "dc1")
|
||||||
|
|
||||||
mgr, server = client.Agent.router.FindLANRoute()
|
mgr, server = client.Agent.baseDeps.Router.FindLANRoute()
|
||||||
require.NotNil(t, mgr)
|
require.NotNil(t, mgr)
|
||||||
require.NotNil(t, server)
|
require.NotNil(t, server)
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,18 +22,6 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// clientRPCConnMaxIdle controls how long we keep an idle connection
|
|
||||||
// open to a server. 127s was chosen as the first prime above 120s
|
|
||||||
// (arbitrarily chose to use a prime) with the intent of reusing
|
|
||||||
// connections who are used by once-a-minute cron(8) jobs *and* who
|
|
||||||
// use a 60s jitter window (e.g. in vixie cron job execution can
|
|
||||||
// drift by up to 59s per job, or 119s for a once-a-minute cron job).
|
|
||||||
clientRPCConnMaxIdle = 127 * time.Second
|
|
||||||
|
|
||||||
// clientMaxStreams controls how many idle streams we keep
|
|
||||||
// open to a server
|
|
||||||
clientMaxStreams = 32
|
|
||||||
|
|
||||||
// serfEventBacklog is the maximum number of unprocessed Serf Events
|
// serfEventBacklog is the maximum number of unprocessed Serf Events
|
||||||
// that will be held in queue before new serf events block. A
|
// that will be held in queue before new serf events block. A
|
||||||
// blocking serf event queue is a bad thing.
|
// blocking serf event queue is a bad thing.
|
||||||
|
@ -68,8 +56,7 @@ type Client struct {
|
||||||
// from an agent.
|
// from an agent.
|
||||||
rpcLimiter atomic.Value
|
rpcLimiter atomic.Value
|
||||||
|
|
||||||
// eventCh is used to receive events from the
|
// eventCh is used to receive events from the serf cluster in the datacenter
|
||||||
// serf cluster in the datacenter
|
|
||||||
eventCh chan serf.Event
|
eventCh chan serf.Event
|
||||||
|
|
||||||
// Logger uses the provided LogOutput
|
// Logger uses the provided LogOutput
|
||||||
|
@ -90,12 +77,7 @@ type Client struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewClient creates and returns a Client
|
// NewClient creates and returns a Client
|
||||||
func NewClient(config *Config, options ...ConsulOption) (*Client, error) {
|
func NewClient(config *Config, deps Deps) (*Client, error) {
|
||||||
flat := flattenConsulOptions(options)
|
|
||||||
|
|
||||||
tlsConfigurator := flat.tlsConfigurator
|
|
||||||
connPool := flat.connPool
|
|
||||||
|
|
||||||
if err := config.CheckProtocolVersion(); err != nil {
|
if err := config.CheckProtocolVersion(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -105,32 +87,14 @@ func NewClient(config *Config, options ...ConsulOption) (*Client, error) {
|
||||||
if err := config.CheckACL(); err != nil {
|
if err := config.CheckACL(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if flat.logger == nil {
|
|
||||||
return nil, fmt.Errorf("logger is required")
|
|
||||||
}
|
|
||||||
|
|
||||||
if connPool == nil {
|
|
||||||
connPool = &pool.ConnPool{
|
|
||||||
Server: false,
|
|
||||||
SrcAddr: config.RPCSrcAddr,
|
|
||||||
Logger: flat.logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
|
|
||||||
MaxTime: clientRPCConnMaxIdle,
|
|
||||||
MaxStreams: clientMaxStreams,
|
|
||||||
TLSConfigurator: tlsConfigurator,
|
|
||||||
Datacenter: config.Datacenter,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
logger := flat.logger.NamedIntercept(logging.ConsulClient)
|
|
||||||
|
|
||||||
// Create client
|
|
||||||
c := &Client{
|
c := &Client{
|
||||||
config: config,
|
config: config,
|
||||||
connPool: connPool,
|
connPool: deps.ConnPool,
|
||||||
eventCh: make(chan serf.Event, serfEventBacklog),
|
eventCh: make(chan serf.Event, serfEventBacklog),
|
||||||
logger: logger,
|
logger: deps.Logger.NamedIntercept(logging.ConsulClient),
|
||||||
shutdownCh: make(chan struct{}),
|
shutdownCh: make(chan struct{}),
|
||||||
tlsConfigurator: tlsConfigurator,
|
tlsConfigurator: deps.TLSConfigurator,
|
||||||
}
|
}
|
||||||
|
|
||||||
c.rpcLimiter.Store(rate.NewLimiter(config.RPCRate, config.RPCMaxBurst))
|
c.rpcLimiter.Store(rate.NewLimiter(config.RPCRate, config.RPCMaxBurst))
|
||||||
|
@ -156,23 +120,17 @@ func NewClient(config *Config, options ...ConsulOption) (*Client, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialize the LAN Serf
|
// Initialize the LAN Serf
|
||||||
c.serf, err = c.setupSerf(config.SerfLANConfig,
|
c.serf, err = c.setupSerf(config.SerfLANConfig, c.eventCh, serfLANSnapshot)
|
||||||
c.eventCh, serfLANSnapshot)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.Shutdown()
|
c.Shutdown()
|
||||||
return nil, fmt.Errorf("Failed to start lan serf: %v", err)
|
return nil, fmt.Errorf("Failed to start lan serf: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
rpcRouter := flat.router
|
if err := deps.Router.AddArea(types.AreaLAN, c.serf, c.connPool); err != nil {
|
||||||
if rpcRouter == nil {
|
|
||||||
rpcRouter = router.NewRouter(logger, config.Datacenter, fmt.Sprintf("%s.%s", config.NodeName, config.Datacenter))
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := rpcRouter.AddArea(types.AreaLAN, c.serf, c.connPool); err != nil {
|
|
||||||
c.Shutdown()
|
c.Shutdown()
|
||||||
return nil, fmt.Errorf("Failed to add LAN area to the RPC router: %w", err)
|
return nil, fmt.Errorf("Failed to add LAN area to the RPC router: %w", err)
|
||||||
}
|
}
|
||||||
c.router = rpcRouter
|
c.router = deps.Router
|
||||||
|
|
||||||
// Start LAN event handlers after the router is complete since the event
|
// Start LAN event handlers after the router is complete since the event
|
||||||
// handlers depend on the router and the router depends on Serf.
|
// handlers depend on the router and the router depends on Serf.
|
||||||
|
|
|
@ -2,13 +2,17 @@ package consul
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"os"
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/pool"
|
||||||
|
"github.com/hashicorp/consul/agent/router"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/hashicorp/consul/agent/token"
|
||||||
"github.com/hashicorp/consul/sdk/freeport"
|
"github.com/hashicorp/consul/sdk/freeport"
|
||||||
"github.com/hashicorp/consul/sdk/testutil"
|
"github.com/hashicorp/consul/sdk/testutil"
|
||||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||||
|
@ -64,18 +68,8 @@ func testClientWithConfigWithErr(t *testing.T, cb func(c *Config)) (string, *Cli
|
||||||
if cb != nil {
|
if cb != nil {
|
||||||
cb(config)
|
cb(config)
|
||||||
}
|
}
|
||||||
logger := hclog.NewInterceptLogger(&hclog.LoggerOptions{
|
|
||||||
Name: config.NodeName,
|
|
||||||
Level: hclog.Debug,
|
|
||||||
Output: testutil.NewLogBuffer(t),
|
|
||||||
})
|
|
||||||
|
|
||||||
tlsConf, err := tlsutil.NewConfigurator(config.ToTLSUtilConfig(), logger)
|
client, err := NewClient(config, newDefaultDeps(t, config))
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
client, err := NewClient(config, WithLogger(logger), WithTLSConfigurator(tlsConf))
|
|
||||||
return dir, client, err
|
return dir, client, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -466,14 +460,7 @@ func TestClient_RPC_TLS(t *testing.T) {
|
||||||
func newClient(t *testing.T, config *Config) *Client {
|
func newClient(t *testing.T, config *Config) *Client {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
c, err := tlsutil.NewConfigurator(config.ToTLSUtilConfig(), nil)
|
client, err := NewClient(config, newDefaultDeps(t, config))
|
||||||
require.NoError(t, err, "failed to create tls configuration")
|
|
||||||
|
|
||||||
logger := hclog.NewInterceptLogger(&hclog.LoggerOptions{
|
|
||||||
Level: hclog.Debug,
|
|
||||||
Output: testutil.NewLogBuffer(t),
|
|
||||||
})
|
|
||||||
client, err := NewClient(config, WithLogger(logger), WithTLSConfigurator(c))
|
|
||||||
require.NoError(t, err, "failed to create client")
|
require.NoError(t, err, "failed to create client")
|
||||||
t.Cleanup(func() {
|
t.Cleanup(func() {
|
||||||
client.Shutdown()
|
client.Shutdown()
|
||||||
|
@ -481,6 +468,39 @@ func newClient(t *testing.T, config *Config) *Client {
|
||||||
return client
|
return client
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newDefaultDeps(t *testing.T, c *Config) Deps {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
logger := hclog.NewInterceptLogger(&hclog.LoggerOptions{
|
||||||
|
Name: c.NodeName,
|
||||||
|
Level: hclog.Debug,
|
||||||
|
Output: testutil.NewLogBuffer(t),
|
||||||
|
})
|
||||||
|
|
||||||
|
tls, err := tlsutil.NewConfigurator(c.ToTLSUtilConfig(), logger)
|
||||||
|
require.NoError(t, err, "failed to create tls configuration")
|
||||||
|
|
||||||
|
r := router.NewRouter(logger, c.Datacenter, fmt.Sprintf("%s.%s", c.NodeName, c.Datacenter))
|
||||||
|
|
||||||
|
connPool := &pool.ConnPool{
|
||||||
|
Server: false,
|
||||||
|
SrcAddr: c.RPCSrcAddr,
|
||||||
|
Logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
|
||||||
|
MaxTime: 2 * time.Minute,
|
||||||
|
MaxStreams: 4,
|
||||||
|
TLSConfigurator: tls,
|
||||||
|
Datacenter: c.Datacenter,
|
||||||
|
}
|
||||||
|
|
||||||
|
return Deps{
|
||||||
|
Logger: logger,
|
||||||
|
TLSConfigurator: tls,
|
||||||
|
Tokens: new(token.Store),
|
||||||
|
Router: r,
|
||||||
|
ConnPool: connPool,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestClient_RPC_RateLimit(t *testing.T) {
|
func TestClient_RPC_RateLimit(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
_, conf1 := testServerConfig(t)
|
_, conf1 := testServerConfig(t)
|
||||||
|
|
|
@ -10,12 +10,10 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/agent/token"
|
|
||||||
"github.com/hashicorp/consul/api"
|
"github.com/hashicorp/consul/api"
|
||||||
"github.com/hashicorp/consul/sdk/testutil"
|
"github.com/hashicorp/consul/sdk/testutil"
|
||||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||||
"github.com/hashicorp/consul/testrpc"
|
"github.com/hashicorp/consul/testrpc"
|
||||||
"github.com/hashicorp/consul/tlsutil"
|
|
||||||
"github.com/hashicorp/go-hclog"
|
"github.com/hashicorp/go-hclog"
|
||||||
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
|
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
|
||||||
"github.com/hashicorp/serf/serf"
|
"github.com/hashicorp/serf/serf"
|
||||||
|
@ -1303,12 +1301,11 @@ func TestLeader_ConfigEntryBootstrap_Fail(t *testing.T) {
|
||||||
Level: hclog.Debug,
|
Level: hclog.Debug,
|
||||||
Output: io.MultiWriter(pw, testutil.NewLogBuffer(t)),
|
Output: io.MultiWriter(pw, testutil.NewLogBuffer(t)),
|
||||||
})
|
})
|
||||||
tlsConf, err := tlsutil.NewConfigurator(config.ToTLSUtilConfig(), logger)
|
|
||||||
require.NoError(t, err)
|
deps := newDefaultDeps(t, config)
|
||||||
srv, err := NewServer(config,
|
deps.Logger = logger
|
||||||
WithLogger(logger),
|
|
||||||
WithTokenStore(new(token.Store)),
|
srv, err := NewServer(config, deps)
|
||||||
WithTLSConfigurator(tlsConf))
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
defer srv.Shutdown()
|
defer srv.Shutdown()
|
||||||
|
|
||||||
|
|
|
@ -8,50 +8,10 @@ import (
|
||||||
"github.com/hashicorp/go-hclog"
|
"github.com/hashicorp/go-hclog"
|
||||||
)
|
)
|
||||||
|
|
||||||
type consulOptions struct {
|
type Deps struct {
|
||||||
logger hclog.InterceptLogger
|
Logger hclog.InterceptLogger
|
||||||
tlsConfigurator *tlsutil.Configurator
|
TLSConfigurator *tlsutil.Configurator
|
||||||
connPool *pool.ConnPool
|
Tokens *token.Store
|
||||||
tokens *token.Store
|
Router *router.Router
|
||||||
router *router.Router
|
ConnPool *pool.ConnPool
|
||||||
}
|
|
||||||
|
|
||||||
type ConsulOption func(*consulOptions)
|
|
||||||
|
|
||||||
func WithLogger(logger hclog.InterceptLogger) ConsulOption {
|
|
||||||
return func(opt *consulOptions) {
|
|
||||||
opt.logger = logger
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithTLSConfigurator(tlsConfigurator *tlsutil.Configurator) ConsulOption {
|
|
||||||
return func(opt *consulOptions) {
|
|
||||||
opt.tlsConfigurator = tlsConfigurator
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithConnectionPool(connPool *pool.ConnPool) ConsulOption {
|
|
||||||
return func(opt *consulOptions) {
|
|
||||||
opt.connPool = connPool
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithTokenStore(tokens *token.Store) ConsulOption {
|
|
||||||
return func(opt *consulOptions) {
|
|
||||||
opt.tokens = tokens
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithRouter(router *router.Router) ConsulOption {
|
|
||||||
return func(opt *consulOptions) {
|
|
||||||
opt.router = router
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func flattenConsulOptions(options []ConsulOption) consulOptions {
|
|
||||||
var flat consulOptions
|
|
||||||
for _, opt := range options {
|
|
||||||
opt(&flat)
|
|
||||||
}
|
|
||||||
return flat
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -70,14 +70,6 @@ const (
|
||||||
raftState = "raft/"
|
raftState = "raft/"
|
||||||
snapshotsRetained = 2
|
snapshotsRetained = 2
|
||||||
|
|
||||||
// serverRPCCache controls how long we keep an idle connection
|
|
||||||
// open to a server
|
|
||||||
serverRPCCache = 2 * time.Minute
|
|
||||||
|
|
||||||
// serverMaxStreams controls how many idle streams we keep
|
|
||||||
// open to a server
|
|
||||||
serverMaxStreams = 64
|
|
||||||
|
|
||||||
// raftLogCacheSize is the maximum number of logs to cache in-memory.
|
// raftLogCacheSize is the maximum number of logs to cache in-memory.
|
||||||
// This is used to reduce disk I/O for the recently committed entries.
|
// This is used to reduce disk I/O for the recently committed entries.
|
||||||
raftLogCacheSize = 512
|
raftLogCacheSize = 512
|
||||||
|
@ -324,15 +316,8 @@ type connHandler interface {
|
||||||
|
|
||||||
// NewServer is used to construct a new Consul server from the configuration
|
// NewServer is used to construct a new Consul server from the configuration
|
||||||
// and extra options, potentially returning an error.
|
// and extra options, potentially returning an error.
|
||||||
func NewServer(config *Config, options ...ConsulOption) (*Server, error) {
|
func NewServer(config *Config, flat Deps) (*Server, error) {
|
||||||
flat := flattenConsulOptions(options)
|
logger := flat.Logger
|
||||||
|
|
||||||
logger := flat.logger
|
|
||||||
tokens := flat.tokens
|
|
||||||
tlsConfigurator := flat.tlsConfigurator
|
|
||||||
connPool := flat.connPool
|
|
||||||
rpcRouter := flat.router
|
|
||||||
|
|
||||||
if err := config.CheckProtocolVersion(); err != nil {
|
if err := config.CheckProtocolVersion(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -342,9 +327,6 @@ func NewServer(config *Config, options ...ConsulOption) (*Server, error) {
|
||||||
if err := config.CheckACL(); err != nil {
|
if err := config.CheckACL(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if logger == nil {
|
|
||||||
return nil, fmt.Errorf("logger is required")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if TLS is enabled
|
// Check if TLS is enabled
|
||||||
if config.CAFile != "" || config.CAPath != "" {
|
if config.CAFile != "" || config.CAPath != "" {
|
||||||
|
@ -373,40 +355,24 @@ func NewServer(config *Config, options ...ConsulOption) (*Server, error) {
|
||||||
// Create the shutdown channel - this is closed but never written to.
|
// Create the shutdown channel - this is closed but never written to.
|
||||||
shutdownCh := make(chan struct{})
|
shutdownCh := make(chan struct{})
|
||||||
|
|
||||||
if connPool == nil {
|
serverLogger := flat.Logger.NamedIntercept(logging.ConsulServer)
|
||||||
connPool = &pool.ConnPool{
|
|
||||||
Server: true,
|
|
||||||
SrcAddr: config.RPCSrcAddr,
|
|
||||||
Logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}),
|
|
||||||
MaxTime: serverRPCCache,
|
|
||||||
MaxStreams: serverMaxStreams,
|
|
||||||
TLSConfigurator: tlsConfigurator,
|
|
||||||
Datacenter: config.Datacenter,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
serverLogger := logger.NamedIntercept(logging.ConsulServer)
|
|
||||||
loggers := newLoggerStore(serverLogger)
|
loggers := newLoggerStore(serverLogger)
|
||||||
|
|
||||||
if rpcRouter == nil {
|
|
||||||
rpcRouter = router.NewRouter(serverLogger, config.Datacenter, fmt.Sprintf("%s.%s", config.NodeName, config.Datacenter))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create server.
|
// Create server.
|
||||||
s := &Server{
|
s := &Server{
|
||||||
config: config,
|
config: config,
|
||||||
tokens: tokens,
|
tokens: flat.Tokens,
|
||||||
connPool: connPool,
|
connPool: flat.ConnPool,
|
||||||
eventChLAN: make(chan serf.Event, serfEventChSize),
|
eventChLAN: make(chan serf.Event, serfEventChSize),
|
||||||
eventChWAN: make(chan serf.Event, serfEventChSize),
|
eventChWAN: make(chan serf.Event, serfEventChSize),
|
||||||
logger: serverLogger,
|
logger: serverLogger,
|
||||||
loggers: loggers,
|
loggers: loggers,
|
||||||
leaveCh: make(chan struct{}),
|
leaveCh: make(chan struct{}),
|
||||||
reconcileCh: make(chan serf.Member, reconcileChSize),
|
reconcileCh: make(chan serf.Member, reconcileChSize),
|
||||||
router: rpcRouter,
|
router: flat.Router,
|
||||||
rpcServer: rpc.NewServer(),
|
rpcServer: rpc.NewServer(),
|
||||||
insecureRPCServer: rpc.NewServer(),
|
insecureRPCServer: rpc.NewServer(),
|
||||||
tlsConfigurator: tlsConfigurator,
|
tlsConfigurator: flat.TLSConfigurator,
|
||||||
reassertLeaderCh: make(chan chan error),
|
reassertLeaderCh: make(chan chan error),
|
||||||
segmentLAN: make(map[string]*serf.Serf, len(config.Segments)),
|
segmentLAN: make(map[string]*serf.Serf, len(config.Segments)),
|
||||||
sessionTimers: NewSessionTimers(),
|
sessionTimers: NewSessionTimers(),
|
||||||
|
|
|
@ -30,7 +30,6 @@ import (
|
||||||
"github.com/hashicorp/consul/testrpc"
|
"github.com/hashicorp/consul/testrpc"
|
||||||
"github.com/hashicorp/consul/tlsutil"
|
"github.com/hashicorp/consul/tlsutil"
|
||||||
"github.com/hashicorp/consul/types"
|
"github.com/hashicorp/consul/types"
|
||||||
"github.com/hashicorp/go-hclog"
|
|
||||||
"github.com/hashicorp/go-uuid"
|
"github.com/hashicorp/go-uuid"
|
||||||
"golang.org/x/time/rate"
|
"golang.org/x/time/rate"
|
||||||
|
|
||||||
|
@ -292,19 +291,7 @@ func newServer(t *testing.T, c *Config) (*Server, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
logger := hclog.NewInterceptLogger(&hclog.LoggerOptions{
|
srv, err := NewServer(c, newDefaultDeps(t, c))
|
||||||
Name: c.NodeName,
|
|
||||||
Level: hclog.Debug,
|
|
||||||
Output: testutil.NewLogBuffer(t),
|
|
||||||
})
|
|
||||||
tlsConf, err := tlsutil.NewConfigurator(c.ToTLSUtilConfig(), logger)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
srv, err := NewServer(c,
|
|
||||||
WithLogger(logger),
|
|
||||||
WithTokenStore(new(token.Store)),
|
|
||||||
WithTLSConfigurator(tlsConf))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -1488,16 +1475,11 @@ func TestServer_CALogging(t *testing.T) {
|
||||||
var buf bytes.Buffer
|
var buf bytes.Buffer
|
||||||
logger := testutil.LoggerWithOutput(t, &buf)
|
logger := testutil.LoggerWithOutput(t, &buf)
|
||||||
|
|
||||||
c, err := tlsutil.NewConfigurator(conf1.ToTLSUtilConfig(), logger)
|
deps := newDefaultDeps(t, conf1)
|
||||||
require.NoError(t, err)
|
deps.Logger = logger
|
||||||
|
|
||||||
s1, err := NewServer(conf1,
|
s1, err := NewServer(conf1, deps)
|
||||||
WithLogger(logger),
|
require.NoError(t, err)
|
||||||
WithTokenStore(new(token.Store)),
|
|
||||||
WithTLSConfigurator(c))
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
defer s1.Shutdown()
|
defer s1.Shutdown()
|
||||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||||
|
|
||||||
|
|
|
@ -10,6 +10,7 @@ import (
|
||||||
autoconf "github.com/hashicorp/consul/agent/auto-config"
|
autoconf "github.com/hashicorp/consul/agent/auto-config"
|
||||||
"github.com/hashicorp/consul/agent/cache"
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
"github.com/hashicorp/consul/agent/config"
|
"github.com/hashicorp/consul/agent/config"
|
||||||
|
"github.com/hashicorp/consul/agent/consul"
|
||||||
"github.com/hashicorp/consul/agent/pool"
|
"github.com/hashicorp/consul/agent/pool"
|
||||||
"github.com/hashicorp/consul/agent/router"
|
"github.com/hashicorp/consul/agent/router"
|
||||||
"github.com/hashicorp/consul/agent/token"
|
"github.com/hashicorp/consul/agent/token"
|
||||||
|
@ -25,15 +26,12 @@ import (
|
||||||
// has been moved out in front of Agent.New, and we can better see the setup
|
// has been moved out in front of Agent.New, and we can better see the setup
|
||||||
// dependencies.
|
// dependencies.
|
||||||
type BaseDeps struct {
|
type BaseDeps struct {
|
||||||
Logger hclog.InterceptLogger
|
consul.Deps // TODO: un-embed
|
||||||
TLSConfigurator *tlsutil.Configurator // TODO: use an interface
|
|
||||||
MetricsHandler MetricsHandler
|
RuntimeConfig *config.RuntimeConfig
|
||||||
RuntimeConfig *config.RuntimeConfig
|
MetricsHandler MetricsHandler
|
||||||
Tokens *token.Store
|
AutoConfig *autoconf.AutoConfig // TODO: use an interface
|
||||||
Cache *cache.Cache
|
Cache *cache.Cache
|
||||||
AutoConfig *autoconf.AutoConfig // TODO: use an interface
|
|
||||||
ConnPool *pool.ConnPool // TODO: use an interface
|
|
||||||
Router *router.Router
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// MetricsHandler provides an http.Handler for displaying metrics.
|
// MetricsHandler provides an http.Handler for displaying metrics.
|
||||||
|
@ -120,6 +118,12 @@ func newConnPool(config *config.RuntimeConfig, logger hclog.Logger, tls *tlsutil
|
||||||
pool.MaxTime = 2 * time.Minute
|
pool.MaxTime = 2 * time.Minute
|
||||||
pool.MaxStreams = 64
|
pool.MaxStreams = 64
|
||||||
} else {
|
} else {
|
||||||
|
// MaxTime controls how long we keep an idle connection open to a server.
|
||||||
|
// 127s was chosen as the first prime above 120s
|
||||||
|
// (arbitrarily chose to use a prime) with the intent of reusing
|
||||||
|
// connections who are used by once-a-minute cron(8) jobs *and* who
|
||||||
|
// use a 60s jitter window (e.g. in vixie cron job execution can
|
||||||
|
// drift by up to 59s per job, or 119s for a once-a-minute cron job).
|
||||||
pool.MaxTime = 127 * time.Second
|
pool.MaxTime = 127 * time.Second
|
||||||
pool.MaxStreams = 32
|
pool.MaxStreams = 32
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue