agent/proxy: local state event coalescing

pull/4275/head
Mitchell Hashimoto 2018-05-02 13:38:24 -07:00
parent b0f377b519
commit 39974df52a
No known key found for this signature in database
GPG Key ID: 744E147AA52F5B0A
2 changed files with 79 additions and 24 deletions

View File

@ -6,12 +6,26 @@ import (
"os" "os"
"os/exec" "os/exec"
"sync" "sync"
"time"
"github.com/hashicorp/consul/agent/local" "github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-multierror" "github.com/hashicorp/go-multierror"
) )
const (
// ManagerCoalescePeriod and ManagerQuiescentPeriod relate to how
// notifications in updates from the local state are colaesced to prevent
// lots of churn in the manager.
//
// When the local state updates, the manager will wait for quiescence.
// For each update, the quiscence timer is reset. If the coalesce period
// is reached, the manager will update proxies regardless of the frequent
// changes. Then the whole cycle resets.
ManagerCoalescePeriod = 5 * time.Second
ManagerQuiescentPeriod = 500 * time.Millisecond
)
// Manager starts, stops, snapshots, and restores managed proxies. // Manager starts, stops, snapshots, and restores managed proxies.
// //
// The manager will not start or stop any processes until Start is called. // The manager will not start or stop any processes until Start is called.
@ -26,11 +40,9 @@ import (
// and state updates may occur while the Manger is syncing. This is okay, // and state updates may occur while the Manger is syncing. This is okay,
// since a change notification will be queued to trigger another sync. // since a change notification will be queued to trigger another sync.
// //
// NOTE(mitchellh): Change notifications are not coalesced currently. Under // The change notifications from the local state are coalesced (see
// conditions where managed proxy configurations are changing in a hot // ManagerCoalescePeriod) so that frequent changes within the local state
// loop, it is possible for the manager to constantly attempt to sync. This // do not trigger dozens of proxy resyncs.
// is unlikely, but its also easy to introduce basic coalescing (even over
// millisecond intervals) to prevent total waste compute cycles.
type Manager struct { type Manager struct {
// State is the local state that is the source of truth for all // State is the local state that is the source of truth for all
// configured managed proxies. // configured managed proxies.
@ -41,6 +53,13 @@ type Manager struct {
// implementation type. // implementation type.
Logger *log.Logger Logger *log.Logger
// CoalescePeriod and QuiescencePeriod control the timers for coalescing
// updates from the local state. See the defaults at the top of this
// file for more documentation. These will be set to those defaults
// by NewManager.
CoalescePeriod time.Duration
QuiescentPeriod time.Duration
// lock is held while reading/writing any internal state of the manager. // lock is held while reading/writing any internal state of the manager.
// cond is a condition variable on lock that is broadcasted for runState // cond is a condition variable on lock that is broadcasted for runState
// changes. // changes.
@ -55,22 +74,24 @@ type Manager struct {
proxies map[string]Proxy proxies map[string]Proxy
} }
// defaultLogger is the defaultLogger for NewManager so there it is never nil
var defaultLogger = log.New(os.Stderr, "", log.LstdFlags)
// NewManager initializes a Manager. After initialization, the exported // NewManager initializes a Manager. After initialization, the exported
// fields should be configured as desired. To start the Manager, execute // fields should be configured as desired. To start the Manager, execute
// Run in a goroutine. // Run in a goroutine.
func NewManager() *Manager { func NewManager() *Manager {
var lock sync.Mutex var lock sync.Mutex
return &Manager{ return &Manager{
Logger: defaultLogger, Logger: defaultLogger,
lock: &lock, CoalescePeriod: ManagerCoalescePeriod,
cond: sync.NewCond(&lock), QuiescentPeriod: ManagerQuiescentPeriod,
proxies: make(map[string]Proxy), lock: &lock,
cond: sync.NewCond(&lock),
proxies: make(map[string]Proxy),
} }
} }
// defaultLogger is the defaultLogger for NewManager so there it is never nil
var defaultLogger = log.New(os.Stderr, "", log.LstdFlags)
// managerRunState is the state of the Manager. // managerRunState is the state of the Manager.
// //
// This is a basic state machine with the following transitions: // This is a basic state machine with the following transitions:
@ -193,19 +214,43 @@ func (m *Manager) Run() {
defer m.State.StopNotifyProxy(notifyCh) defer m.State.StopNotifyProxy(notifyCh)
m.Logger.Println("[DEBUG] agent/proxy: managed Connect proxy manager started") m.Logger.Println("[DEBUG] agent/proxy: managed Connect proxy manager started")
SYNC:
for { for {
// Sync first, before waiting on further notifications so that // Sync first, before waiting on further notifications so that
// we can start with a known-current state. // we can start with a known-current state.
m.sync() m.sync()
select { // Note for these variables we don't use a time.Timer because both
case <-notifyCh: // periods are relatively short anyways so they end up being eligible
// Changes exit select so we can reloop and reconfigure proxies // for GC very quickly, so overhead is not a concern.
var quiescent, quantum <-chan time.Time
case <-stopCh: // Start a loop waiting for events from the local state store. This
// Stop immediately, no cleanup // loops rather than just `select` so we can coalesce many state
m.Logger.Println("[DEBUG] agent/proxy: Stopping managed Connect proxy manager") // updates over a period of time.
return for {
select {
case <-notifyCh:
// If this is our first notification since the last sync,
// reset the quantum timer which is the max time we'll wait.
if quantum == nil {
quantum = time.After(m.CoalescePeriod)
}
// Always reset the quiescent timer
quiescent = time.After(m.QuiescentPeriod)
case <-quantum:
continue SYNC
case <-quiescent:
continue SYNC
case <-stopCh:
// Stop immediately, no cleanup
m.Logger.Println("[DEBUG] agent/proxy: Stopping managed Connect proxy manager")
return
}
} }
} }
} }

View File

@ -17,7 +17,7 @@ func TestManagerClose_noRun(t *testing.T) {
t.Parallel() t.Parallel()
// Really we're testing that it doesn't deadlock here. // Really we're testing that it doesn't deadlock here.
m := NewManager() m := testManager(t)
require.NoError(t, m.Close()) require.NoError(t, m.Close())
// Close again for sanity // Close again for sanity
@ -30,7 +30,7 @@ func TestManagerRun_initialSync(t *testing.T) {
t.Parallel() t.Parallel()
state := local.TestState(t) state := local.TestState(t)
m := NewManager() m := testManager(t)
m.State = state m.State = state
defer m.Kill() defer m.Kill()
@ -57,7 +57,7 @@ func TestManagerRun_syncNew(t *testing.T) {
t.Parallel() t.Parallel()
state := local.TestState(t) state := local.TestState(t)
m := NewManager() m := testManager(t)
m.State = state m.State = state
defer m.Kill() defer m.Kill()
@ -99,7 +99,7 @@ func TestManagerRun_syncDelete(t *testing.T) {
t.Parallel() t.Parallel()
state := local.TestState(t) state := local.TestState(t)
m := NewManager() m := testManager(t)
m.State = state m.State = state
defer m.Kill() defer m.Kill()
@ -138,7 +138,7 @@ func TestManagerRun_syncUpdate(t *testing.T) {
t.Parallel() t.Parallel()
state := local.TestState(t) state := local.TestState(t)
m := NewManager() m := testManager(t)
m.State = state m.State = state
defer m.Kill() defer m.Kill()
@ -181,6 +181,16 @@ func TestManagerRun_syncUpdate(t *testing.T) {
}) })
} }
func testManager(t *testing.T) *Manager {
m := NewManager()
// Set these periods low to speed up tests
m.CoalescePeriod = 1 * time.Millisecond
m.QuiescentPeriod = 1 * time.Millisecond
return m
}
// testStateProxy registers a proxy with the given local state and the command // testStateProxy registers a proxy with the given local state and the command
// (expected to be from the helperProcess function call). It returns the // (expected to be from the helperProcess function call). It returns the
// ID for deregistration. // ID for deregistration.