mirror of https://github.com/hashicorp/consul
commit
af910bda39
|
@ -73,6 +73,7 @@ type delegate interface {
|
||||||
SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer, replyFn structs.SnapshotReplyFn) error
|
SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer, replyFn structs.SnapshotReplyFn) error
|
||||||
Shutdown() error
|
Shutdown() error
|
||||||
Stats() map[string]map[string]string
|
Stats() map[string]map[string]string
|
||||||
|
ReloadConfig(config *consul.Config) error
|
||||||
enterpriseDelegate
|
enterpriseDelegate
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2491,6 +2492,11 @@ func (a *Agent) DisableNodeMaintenance() {
|
||||||
a.logger.Printf("[INFO] agent: Node left maintenance mode")
|
a.logger.Printf("[INFO] agent: Node left maintenance mode")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *Agent) loadLimits(conf *config.RuntimeConfig) {
|
||||||
|
a.config.RPCRateLimit = conf.RPCRateLimit
|
||||||
|
a.config.RPCMaxBurst = conf.RPCMaxBurst
|
||||||
|
}
|
||||||
|
|
||||||
func (a *Agent) ReloadConfig(newCfg *config.RuntimeConfig) error {
|
func (a *Agent) ReloadConfig(newCfg *config.RuntimeConfig) error {
|
||||||
// Bulk update the services and checks
|
// Bulk update the services and checks
|
||||||
a.PauseSync()
|
a.PauseSync()
|
||||||
|
@ -2525,6 +2531,18 @@ func (a *Agent) ReloadConfig(newCfg *config.RuntimeConfig) error {
|
||||||
return fmt.Errorf("Failed reloading watches: %v", err)
|
return fmt.Errorf("Failed reloading watches: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
a.loadLimits(newCfg)
|
||||||
|
|
||||||
|
// create the config for the rpc server/client
|
||||||
|
consulCfg, err := a.consulConfig()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := a.delegate.ReloadConfig(consulCfg); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// Update filtered metrics
|
// Update filtered metrics
|
||||||
metrics.UpdateFilter(newCfg.TelemetryAllowedPrefixes, newCfg.TelemetryBlockedPrefixes)
|
metrics.UpdateFilter(newCfg.TelemetryAllowedPrefixes, newCfg.TelemetryBlockedPrefixes)
|
||||||
|
|
||||||
|
|
|
@ -281,6 +281,10 @@ func TestAgent_Reload(t *testing.T) {
|
||||||
handler = "true"
|
handler = "true"
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
|
limits = {
|
||||||
|
rpc_rate=1
|
||||||
|
rpc_max_burst=100
|
||||||
|
}
|
||||||
`)
|
`)
|
||||||
defer a.Shutdown()
|
defer a.Shutdown()
|
||||||
|
|
||||||
|
@ -302,6 +306,10 @@ func TestAgent_Reload(t *testing.T) {
|
||||||
name = "redis-reloaded"
|
name = "redis-reloaded"
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
|
limits = {
|
||||||
|
rpc_rate=2
|
||||||
|
rpc_max_burst=200
|
||||||
|
}
|
||||||
`,
|
`,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -312,6 +320,14 @@ func TestAgent_Reload(t *testing.T) {
|
||||||
t.Fatal("missing redis-reloaded service")
|
t.Fatal("missing redis-reloaded service")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if a.config.RPCRateLimit != 2 {
|
||||||
|
t.Fatalf("RPC rate not set correctly. Got %v. Want 2", a.config.RPCRateLimit)
|
||||||
|
}
|
||||||
|
|
||||||
|
if a.config.RPCMaxBurst != 200 {
|
||||||
|
t.Fatalf("RPC max burst not set correctly. Got %v. Want 200", a.config.RPCMaxBurst)
|
||||||
|
}
|
||||||
|
|
||||||
for _, wp := range a.watchPlans {
|
for _, wp := range a.watchPlans {
|
||||||
if !wp.IsStopped() {
|
if !wp.IsStopped() {
|
||||||
t.Fatalf("Reloading configs should stop watch plans of the previous configuration")
|
t.Fatalf("Reloading configs should stop watch plans of the previous configuration")
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/armon/go-metrics"
|
"github.com/armon/go-metrics"
|
||||||
|
@ -56,7 +57,7 @@ type Client struct {
|
||||||
|
|
||||||
// rpcLimiter is used to rate limit the total number of RPCs initiated
|
// rpcLimiter is used to rate limit the total number of RPCs initiated
|
||||||
// from an agent.
|
// from an agent.
|
||||||
rpcLimiter *rate.Limiter
|
rpcLimiter atomic.Value
|
||||||
|
|
||||||
// eventCh is used to receive events from the
|
// eventCh is used to receive events from the
|
||||||
// serf cluster in the datacenter
|
// serf cluster in the datacenter
|
||||||
|
@ -128,12 +129,13 @@ func NewClientLogger(config *Config, logger *log.Logger) (*Client, error) {
|
||||||
c := &Client{
|
c := &Client{
|
||||||
config: config,
|
config: config,
|
||||||
connPool: connPool,
|
connPool: connPool,
|
||||||
rpcLimiter: rate.NewLimiter(config.RPCRate, config.RPCMaxBurst),
|
|
||||||
eventCh: make(chan serf.Event, serfEventBacklog),
|
eventCh: make(chan serf.Event, serfEventBacklog),
|
||||||
logger: logger,
|
logger: logger,
|
||||||
shutdownCh: make(chan struct{}),
|
shutdownCh: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
c.rpcLimiter.Store(rate.NewLimiter(config.RPCRate, config.RPCMaxBurst))
|
||||||
|
|
||||||
if err := c.initEnterprise(); err != nil {
|
if err := c.initEnterprise(); err != nil {
|
||||||
c.Shutdown()
|
c.Shutdown()
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -263,7 +265,7 @@ TRY:
|
||||||
|
|
||||||
// Enforce the RPC limit.
|
// Enforce the RPC limit.
|
||||||
metrics.IncrCounter([]string{"client", "rpc"}, 1)
|
metrics.IncrCounter([]string{"client", "rpc"}, 1)
|
||||||
if !c.rpcLimiter.Allow() {
|
if !c.rpcLimiter.Load().(*rate.Limiter).Allow() {
|
||||||
metrics.IncrCounter([]string{"client", "rpc", "exceeded"}, 1)
|
metrics.IncrCounter([]string{"client", "rpc", "exceeded"}, 1)
|
||||||
return structs.ErrRPCRateExceeded
|
return structs.ErrRPCRateExceeded
|
||||||
}
|
}
|
||||||
|
@ -306,7 +308,7 @@ func (c *Client) SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io
|
||||||
|
|
||||||
// Enforce the RPC limit.
|
// Enforce the RPC limit.
|
||||||
metrics.IncrCounter([]string{"client", "rpc"}, 1)
|
metrics.IncrCounter([]string{"client", "rpc"}, 1)
|
||||||
if !c.rpcLimiter.Allow() {
|
if !c.rpcLimiter.Load().(*rate.Limiter).Allow() {
|
||||||
metrics.IncrCounter([]string{"client", "rpc", "exceeded"}, 1)
|
metrics.IncrCounter([]string{"client", "rpc", "exceeded"}, 1)
|
||||||
return structs.ErrRPCRateExceeded
|
return structs.ErrRPCRateExceeded
|
||||||
}
|
}
|
||||||
|
@ -381,3 +383,10 @@ func (c *Client) GetLANCoordinate() (lib.CoordinateSet, error) {
|
||||||
cs := lib.CoordinateSet{c.config.Segment: lan}
|
cs := lib.CoordinateSet{c.config.Segment: lan}
|
||||||
return cs, nil
|
return cs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ReloadConfig is used to have the Client do an online reload of
|
||||||
|
// relevant configuration information
|
||||||
|
func (c *Client) ReloadConfig(config *Config) error {
|
||||||
|
c.rpcLimiter.Store(rate.NewLimiter(config.RPCRate, config.RPCMaxBurst))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -15,6 +15,8 @@ import (
|
||||||
"github.com/hashicorp/consul/testutil/retry"
|
"github.com/hashicorp/consul/testutil/retry"
|
||||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||||
"github.com/hashicorp/serf/serf"
|
"github.com/hashicorp/serf/serf"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"golang.org/x/time/rate"
|
||||||
)
|
)
|
||||||
|
|
||||||
func testClientConfig(t *testing.T) (string, *Config) {
|
func testClientConfig(t *testing.T) (string, *Config) {
|
||||||
|
@ -665,3 +667,25 @@ func TestClient_Encrypted(t *testing.T) {
|
||||||
t.Fatalf("should be encrypted")
|
t.Fatalf("should be encrypted")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestClient_Reload(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
dir1, c := testClientWithConfig(t, func(c *Config) {
|
||||||
|
c.RPCRate = 500
|
||||||
|
c.RPCMaxBurst = 5000
|
||||||
|
})
|
||||||
|
defer os.RemoveAll(dir1)
|
||||||
|
defer c.Shutdown()
|
||||||
|
|
||||||
|
limiter := c.rpcLimiter.Load().(*rate.Limiter)
|
||||||
|
require.Equal(t, rate.Limit(500), limiter.Limit())
|
||||||
|
require.Equal(t, 5000, limiter.Burst())
|
||||||
|
|
||||||
|
c.config.RPCRate = 1000
|
||||||
|
c.config.RPCMaxBurst = 10000
|
||||||
|
|
||||||
|
require.NoError(t, c.ReloadConfig(c.config))
|
||||||
|
limiter = c.rpcLimiter.Load().(*rate.Limiter)
|
||||||
|
require.Equal(t, rate.Limit(1000), limiter.Limit())
|
||||||
|
require.Equal(t, 10000, limiter.Burst())
|
||||||
|
}
|
||||||
|
|
|
@ -1066,6 +1066,12 @@ func (s *Server) GetLANCoordinate() (lib.CoordinateSet, error) {
|
||||||
return cs, nil
|
return cs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ReloadConfig is used to have the Server do an online reload of
|
||||||
|
// relevant configuration information
|
||||||
|
func (s *Server) ReloadConfig(config *Config) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Atomically sets a readiness state flag when leadership is obtained, to indicate that server is past its barrier write
|
// Atomically sets a readiness state flag when leadership is obtained, to indicate that server is past its barrier write
|
||||||
func (s *Server) setConsistentReadReady() {
|
func (s *Server) setConsistentReadReady() {
|
||||||
atomic.StoreInt32(&s.readyForConsistentReads, 1)
|
atomic.StoreInt32(&s.readyForConsistentReads, 1)
|
||||||
|
|
|
@ -1302,3 +1302,4 @@ items which are reloaded include:
|
||||||
* <a href="#node_meta">Node Metadata</a>
|
* <a href="#node_meta">Node Metadata</a>
|
||||||
* <a href="#telemetry-prefix_filter">Metric Prefix Filter</a>
|
* <a href="#telemetry-prefix_filter">Metric Prefix Filter</a>
|
||||||
* <a href="#discard_check_output">Discard Check Output</a>
|
* <a href="#discard_check_output">Discard Check Output</a>
|
||||||
|
* <a href="#limits">RPC rate limiting</a>
|
Loading…
Reference in New Issue