You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
consul/agent/ae/ae.go

361 lines
10 KiB

// Copyright (c) HashiCorp, Inc.
[COMPLIANCE] License changes (#18443) * Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Updating the license from MPL to Business Source License Going forward, this project will be licensed under the Business Source License v1.1. Please see our blog post for more details at <Blog URL>, FAQ at www.hashicorp.com/licensing-faq, and details of the license at www.hashicorp.com/bsl. * add missing license headers * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 --------- Co-authored-by: hashicorp-copywrite[bot] <110428419+hashicorp-copywrite[bot]@users.noreply.github.com>
1 year ago
// SPDX-License-Identifier: BUSL-1.1
// Package ae provides tools to synchronize state between local and remote consul servers.
package ae
import (
"fmt"
"math"
"sync"
"time"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/logging"
)
// scaleThreshold is the number of nodes after which regular sync runs are
// spread out farther apart. The value should be a power of 2 since the
// scale function uses log2.
//
// When set to 128 nodes the delay between regular runs is doubled when the
// cluster is larger than 128 nodes. It doubles again when it passes 256
// nodes, and again at 512 nodes and so forth. At 8192 nodes, the delay
// factor is 8.
//
// If you update this, you may need to adjust the tuning of
// CoordinateUpdatePeriod and CoordinateUpdateMaxBatchSize.
const scaleThreshold = 128
// scaleFactor returns a factor by which the next sync run should be delayed to
// avoid saturation of the cluster. The larger the cluster grows the farther
// the sync runs should be spread apart.
//
// The current implementation uses a log2 scale which doubles the delay between
// runs every time the cluster doubles in size.
func scaleFactor(nodes int) int {
if nodes <= scaleThreshold {
return 1.0
}
return int(math.Ceil(math.Log2(float64(nodes))-math.Log2(float64(scaleThreshold))) + 1.0)
}
type SyncState interface {
SyncChanges() error
SyncFull() error
}
// StateSyncer manages background synchronization of the given state.
//
// The state is synchronized on a regular basis or on demand when either
// the state has changed or a new Consul server has joined the cluster.
//
// The regular state synchronization provides a self-healing mechanism
// for the cluster which is also called anti-entropy.
type StateSyncer struct {
// State contains the data that needs to be synchronized.
State SyncState
// Interval is the time between two full sync runs.
Interval time.Duration
// ShutdownCh is closed when the application is shutting down.
ShutdownCh chan struct{}
// Logger is the logger.
Logger hclog.Logger
// ClusterSize returns the number of members in the cluster to
// allow staggering the sync runs based on cluster size.
// This needs to be set before Run() is called.
ClusterSize func() int
// SyncFull allows triggering an immediate but staggered full sync
// in a non-blocking way.
SyncFull *Trigger
// SyncChanges allows triggering an immediate partial sync
// in a non-blocking way.
SyncChanges *Trigger
// paused stores whether sync runs are temporarily disabled.
pauseLock sync.Mutex
paused int
hardDisabled bool
// serverUpInterval is the max time after which a full sync is
// performed when a server has been added to the cluster.
serverUpInterval time.Duration
// retryFailInterval is the time after which a failed full sync is retried.
retryFailInterval time.Duration
// stagger randomly picks a duration between 0s and the given duration.
stagger func(time.Duration) time.Duration
// retrySyncFullEvent generates an event based on multiple conditions
// when the state machine is trying to retry a full state sync.
retrySyncFullEvent func() event
// syncChangesEvent generates an event based on multiple conditions
// when the state machine is performing partial state syncs.
syncChangesEvent func() event
// nextFullSyncCh is a chan that receives a time.Time when the next
// full sync should occur.
nextFullSyncCh <-chan time.Time
}
const (
// serverUpIntv is the max time to wait before a sync is triggered
// when a consul server has been added to the cluster.
serverUpIntv = 3 * time.Second
// retryFailIntv is the min time to wait before a failed sync is retried.
retryFailIntv = 15 * time.Second
)
func NewStateSyncer(state SyncState, intv time.Duration, shutdownCh chan struct{}, logger hclog.Logger) *StateSyncer {
if logger == nil {
logger = hclog.New(&hclog.LoggerOptions{})
}
s := &StateSyncer{
State: state,
Interval: intv,
ShutdownCh: shutdownCh,
Logger: logger.Named(logging.AntiEntropy),
SyncFull: NewTrigger(),
SyncChanges: NewTrigger(),
serverUpInterval: serverUpIntv,
retryFailInterval: retryFailIntv,
}
// retain these methods as member variables so that
// we can mock them for testing.
s.retrySyncFullEvent = s.retrySyncFullEventFn
s.syncChangesEvent = s.syncChangesEventFn
s.stagger = s.staggerFn
return s
}
// fsmState defines states for the state machine.
type fsmState string
const (
doneState fsmState = "done"
fullSyncState fsmState = "fullSync"
partialSyncState fsmState = "partialSync"
retryFullSyncState fsmState = "retryFullSync"
)
// Run is the long running method to perform state synchronization
// between local and remote servers.
func (s *StateSyncer) Run() {
if s.Disabled() {
return
}
if s.ClusterSize == nil {
panic("ClusterSize not set")
}
s.resetNextFullSyncCh()
s.runFSM(fullSyncState, s.nextFSMState)
}
// runFSM runs the state machine.
func (s *StateSyncer) runFSM(fs fsmState, next func(fsmState) fsmState) {
for {
if fs = next(fs); fs == doneState {
return
}
}
}
// nextFSMState determines the next state based on the current state.
func (s *StateSyncer) nextFSMState(fs fsmState) fsmState {
switch fs {
case fullSyncState:
if s.Paused() {
return retryFullSyncState
}
if err := s.State.SyncFull(); err != nil {
s.Logger.Error("failed to sync remote state", "error", err)
return retryFullSyncState
}
return partialSyncState
case retryFullSyncState:
e := s.retrySyncFullEvent()
switch e {
case syncFullNotifEvent, syncFullTimerEvent:
return fullSyncState
case shutdownEvent:
return doneState
default:
panic(fmt.Sprintf("invalid event: %s", e))
}
case partialSyncState:
e := s.syncChangesEvent()
switch e {
case syncFullNotifEvent, syncFullTimerEvent:
return fullSyncState
case syncChangesNotifEvent:
if s.Paused() {
return partialSyncState
}
err := s.State.SyncChanges()
if err != nil {
s.Logger.Error("failed to sync changes", "error", err)
}
return partialSyncState
case shutdownEvent:
return doneState
default:
panic(fmt.Sprintf("invalid event: %s", e))
}
default:
panic(fmt.Sprintf("invalid state: %s", fs))
}
}
// event defines a timing or notification event from multiple timers and
// channels.
type event string
const (
shutdownEvent event = "shutdown"
syncFullNotifEvent event = "syncFullNotif"
syncFullTimerEvent event = "syncFullTimer"
syncChangesNotifEvent event = "syncChangesNotif"
)
// retrySyncFullEventFn waits for an event which triggers a retry
// of a full sync or a termination signal. This function should not be
// called directly but through s.retryFullSyncState to allow mocking for
// testing.
func (s *StateSyncer) retrySyncFullEventFn() event {
select {
// trigger a full sync immediately.
// this is usually called when a consul server was added to the cluster.
// stagger the delay to avoid a thundering herd.
case <-s.SyncFull.Notif():
select {
case <-time.After(s.stagger(s.serverUpInterval)):
return syncFullNotifEvent
case <-s.ShutdownCh:
return shutdownEvent
}
// retry full sync after some time
// it is using retryFailInterval because it is retrying the sync
case <-time.After(s.retryFailInterval + s.stagger(s.retryFailInterval)):
s.resetNextFullSyncCh()
return syncFullTimerEvent
case <-s.ShutdownCh:
return shutdownEvent
}
}
// syncChangesEventFn waits for a event which either triggers a full
// or a partial sync or a termination signal. This function should not
// be called directly but through s.syncChangesEvent to allow mocking
// for testing.
func (s *StateSyncer) syncChangesEventFn() event {
select {
// trigger a full sync immediately
// this is usually called when a consul server was added to the cluster.
// stagger the delay to avoid a thundering herd.
case <-s.SyncFull.Notif():
select {
case <-time.After(s.stagger(s.serverUpInterval)):
s.resetNextFullSyncCh()
return syncFullNotifEvent
case <-s.ShutdownCh:
return shutdownEvent
}
// time for a full sync again
case <-s.nextFullSyncCh:
s.resetNextFullSyncCh()
return syncFullTimerEvent
// do partial syncs on demand
case <-s.SyncChanges.Notif():
return syncChangesNotifEvent
case <-s.ShutdownCh:
return shutdownEvent
}
}
// resetNextFullSyncCh resets nextFullSyncCh and sets it to interval+stagger.
// Call this function everytime a full sync is performed.
func (s *StateSyncer) resetNextFullSyncCh() {
if s.stagger != nil {
s.nextFullSyncCh = time.After(s.Interval + s.stagger(s.Interval))
} else {
s.nextFullSyncCh = time.After(s.Interval)
}
}
// stubbed out for testing
var libRandomStagger = lib.RandomStagger
// staggerFn returns a random duration which depends on the cluster size
// and a random factor which should provide some timely distribution of
// cluster wide events. This function should not be called directly
// but through s.stagger to allow mocking for testing.
func (s *StateSyncer) staggerFn(d time.Duration) time.Duration {
f := scaleFactor(s.ClusterSize())
return libRandomStagger(time.Duration(f) * d)
}
// Pause temporarily disables sync runs.
func (s *StateSyncer) Pause() {
s.pauseLock.Lock()
s.paused++
s.pauseLock.Unlock()
}
// Paused returns whether sync runs are temporarily disabled.
func (s *StateSyncer) Paused() bool {
s.pauseLock.Lock()
defer s.pauseLock.Unlock()
return s.paused != 0 || s.hardDisabled
}
// Disabled returns whether sync runs are permanently disabled.
func (s *StateSyncer) Disabled() bool {
s.pauseLock.Lock()
defer s.pauseLock.Unlock()
return s.hardDisabled
}
// Resume re-enables sync runs. It returns true if it was the last pause/resume
// pair on the stack and so actually caused the state syncer to resume.
func (s *StateSyncer) Resume() bool {
s.pauseLock.Lock()
s.paused--
if s.paused < 0 {
panic("unbalanced pause/resume")
}
trigger := s.paused == 0 && !s.hardDisabled
s.pauseLock.Unlock()
if trigger {
s.SyncChanges.Trigger()
}
return trigger
}