diff --git a/agent/agent.go b/agent/agent.go
index 4410ff2935..cacd085216 100644
--- a/agent/agent.go
+++ b/agent/agent.go
@@ -73,6 +73,7 @@ type delegate interface {
SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer, replyFn structs.SnapshotReplyFn) error
Shutdown() error
Stats() map[string]map[string]string
+ ReloadConfig(config *consul.Config) error
enterpriseDelegate
}
@@ -2491,6 +2492,11 @@ func (a *Agent) DisableNodeMaintenance() {
a.logger.Printf("[INFO] agent: Node left maintenance mode")
}
+func (a *Agent) loadLimits(conf *config.RuntimeConfig) {
+ a.config.RPCRateLimit = conf.RPCRateLimit
+ a.config.RPCMaxBurst = conf.RPCMaxBurst
+}
+
func (a *Agent) ReloadConfig(newCfg *config.RuntimeConfig) error {
// Bulk update the services and checks
a.PauseSync()
@@ -2525,6 +2531,18 @@ func (a *Agent) ReloadConfig(newCfg *config.RuntimeConfig) error {
return fmt.Errorf("Failed reloading watches: %v", err)
}
+ a.loadLimits(newCfg)
+
+ // create the config for the rpc server/client
+ consulCfg, err := a.consulConfig()
+ if err != nil {
+ return err
+ }
+
+ if err := a.delegate.ReloadConfig(consulCfg); err != nil {
+ return err
+ }
+
// Update filtered metrics
metrics.UpdateFilter(newCfg.TelemetryAllowedPrefixes, newCfg.TelemetryBlockedPrefixes)
diff --git a/agent/agent_endpoint_test.go b/agent/agent_endpoint_test.go
index 1a62a84279..50b672b165 100644
--- a/agent/agent_endpoint_test.go
+++ b/agent/agent_endpoint_test.go
@@ -281,6 +281,10 @@ func TestAgent_Reload(t *testing.T) {
handler = "true"
}
]
+ limits = {
+ rpc_rate=1
+ rpc_max_burst=100
+ }
`)
defer a.Shutdown()
@@ -302,6 +306,10 @@ func TestAgent_Reload(t *testing.T) {
name = "redis-reloaded"
}
]
+ limits = {
+ rpc_rate=2
+ rpc_max_burst=200
+ }
`,
})
@@ -312,6 +320,14 @@ func TestAgent_Reload(t *testing.T) {
t.Fatal("missing redis-reloaded service")
}
+ if a.config.RPCRateLimit != 2 {
+ t.Fatalf("RPC rate not set correctly. Got %v. Want 2", a.config.RPCRateLimit)
+ }
+
+ if a.config.RPCMaxBurst != 200 {
+ t.Fatalf("RPC max burst not set correctly. Got %v. Want 200", a.config.RPCMaxBurst)
+ }
+
for _, wp := range a.watchPlans {
if !wp.IsStopped() {
t.Fatalf("Reloading configs should stop watch plans of the previous configuration")
diff --git a/agent/consul/client.go b/agent/consul/client.go
index 4dc1f33655..000cb66f88 100644
--- a/agent/consul/client.go
+++ b/agent/consul/client.go
@@ -7,6 +7,7 @@ import (
"os"
"strconv"
"sync"
+ "sync/atomic"
"time"
"github.com/armon/go-metrics"
@@ -56,7 +57,7 @@ type Client struct {
// rpcLimiter is used to rate limit the total number of RPCs initiated
// from an agent.
- rpcLimiter *rate.Limiter
+ rpcLimiter atomic.Value
// eventCh is used to receive events from the
// serf cluster in the datacenter
@@ -128,12 +129,13 @@ func NewClientLogger(config *Config, logger *log.Logger) (*Client, error) {
c := &Client{
config: config,
connPool: connPool,
- rpcLimiter: rate.NewLimiter(config.RPCRate, config.RPCMaxBurst),
eventCh: make(chan serf.Event, serfEventBacklog),
logger: logger,
shutdownCh: make(chan struct{}),
}
+ c.rpcLimiter.Store(rate.NewLimiter(config.RPCRate, config.RPCMaxBurst))
+
if err := c.initEnterprise(); err != nil {
c.Shutdown()
return nil, err
@@ -263,7 +265,7 @@ TRY:
// Enforce the RPC limit.
metrics.IncrCounter([]string{"client", "rpc"}, 1)
- if !c.rpcLimiter.Allow() {
+ if !c.rpcLimiter.Load().(*rate.Limiter).Allow() {
metrics.IncrCounter([]string{"client", "rpc", "exceeded"}, 1)
return structs.ErrRPCRateExceeded
}
@@ -306,7 +308,7 @@ func (c *Client) SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io
// Enforce the RPC limit.
metrics.IncrCounter([]string{"client", "rpc"}, 1)
- if !c.rpcLimiter.Allow() {
+ if !c.rpcLimiter.Load().(*rate.Limiter).Allow() {
metrics.IncrCounter([]string{"client", "rpc", "exceeded"}, 1)
return structs.ErrRPCRateExceeded
}
@@ -381,3 +383,10 @@ func (c *Client) GetLANCoordinate() (lib.CoordinateSet, error) {
cs := lib.CoordinateSet{c.config.Segment: lan}
return cs, nil
}
+
+// ReloadConfig is used to have the Client do an online reload of
+// relevant configuration information
+func (c *Client) ReloadConfig(config *Config) error {
+ c.rpcLimiter.Store(rate.NewLimiter(config.RPCRate, config.RPCMaxBurst))
+ return nil
+}
diff --git a/agent/consul/client_test.go b/agent/consul/client_test.go
index 20647b3f66..f61541b5ef 100644
--- a/agent/consul/client_test.go
+++ b/agent/consul/client_test.go
@@ -15,6 +15,8 @@ import (
"github.com/hashicorp/consul/testutil/retry"
"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/serf/serf"
+ "github.com/stretchr/testify/require"
+ "golang.org/x/time/rate"
)
func testClientConfig(t *testing.T) (string, *Config) {
@@ -665,3 +667,25 @@ func TestClient_Encrypted(t *testing.T) {
t.Fatalf("should be encrypted")
}
}
+
+func TestClient_Reload(t *testing.T) {
+ t.Parallel()
+ dir1, c := testClientWithConfig(t, func(c *Config) {
+ c.RPCRate = 500
+ c.RPCMaxBurst = 5000
+ })
+ defer os.RemoveAll(dir1)
+ defer c.Shutdown()
+
+ limiter := c.rpcLimiter.Load().(*rate.Limiter)
+ require.Equal(t, rate.Limit(500), limiter.Limit())
+ require.Equal(t, 5000, limiter.Burst())
+
+ c.config.RPCRate = 1000
+ c.config.RPCMaxBurst = 10000
+
+ require.NoError(t, c.ReloadConfig(c.config))
+ limiter = c.rpcLimiter.Load().(*rate.Limiter)
+ require.Equal(t, rate.Limit(1000), limiter.Limit())
+ require.Equal(t, 10000, limiter.Burst())
+}
diff --git a/agent/consul/server.go b/agent/consul/server.go
index 23fbf337c3..1205e63be6 100644
--- a/agent/consul/server.go
+++ b/agent/consul/server.go
@@ -1066,6 +1066,12 @@ func (s *Server) GetLANCoordinate() (lib.CoordinateSet, error) {
return cs, nil
}
+// ReloadConfig is used to have the Server do an online reload of
+// relevant configuration information
+func (s *Server) ReloadConfig(config *Config) error {
+ return nil
+}
+
// Atomically sets a readiness state flag when leadership is obtained, to indicate that server is past its barrier write
func (s *Server) setConsistentReadReady() {
atomic.StoreInt32(&s.readyForConsistentReads, 1)
diff --git a/website/source/docs/agent/options.html.md b/website/source/docs/agent/options.html.md
index 3f259e9ef5..23e38edbf5 100644
--- a/website/source/docs/agent/options.html.md
+++ b/website/source/docs/agent/options.html.md
@@ -1302,3 +1302,4 @@ items which are reloaded include:
* Node Metadata
* Metric Prefix Filter
* Discard Check Output
+* RPC rate limiting
\ No newline at end of file