mirror of https://github.com/hashicorp/consul
agent: decouple anti-entropy from local state
The anti-entropy code manages background synchronizations of the local state on a regular basis or on demand when either the state has changed or a new consul server has been added. This patch moves the anti-entropy code into its own package and decouples it from the local state code since they are performing two different functions. To simplify code-review this revision does not make any optimizations, renames or refactorings. This will happen in subsequent commits.pull/3585/head
parent
dca5dcb68c
commit
a842dc9c2b
|
@ -0,0 +1,146 @@
|
||||||
|
// Package ae provides an anti-entropy mechanism for the local state.
|
||||||
|
package ae
|
||||||
|
|
||||||
|
import (
|
||||||
|
"log"
|
||||||
|
"math"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/lib"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// This scale factor means we will add a minute after we cross 128 nodes,
|
||||||
|
// another at 256, another at 512, etc. By 8192 nodes, we will scale up
|
||||||
|
// by a factor of 8.
|
||||||
|
//
|
||||||
|
// If you update this, you may need to adjust the tuning of
|
||||||
|
// CoordinateUpdatePeriod and CoordinateUpdateMaxBatchSize.
|
||||||
|
aeScaleThreshold = 128
|
||||||
|
|
||||||
|
syncStaggerIntv = 3 * time.Second
|
||||||
|
syncRetryIntv = 15 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
|
// aeScale is used to scale the time interval at which anti-entropy updates take
|
||||||
|
// place. It is used to prevent saturation as the cluster size grows.
|
||||||
|
func aeScale(d time.Duration, n int) time.Duration {
|
||||||
|
// Don't scale until we cross the threshold
|
||||||
|
if n <= aeScaleThreshold {
|
||||||
|
return d
|
||||||
|
}
|
||||||
|
|
||||||
|
mult := math.Ceil(math.Log2(float64(n))-math.Log2(aeScaleThreshold)) + 1.0
|
||||||
|
return time.Duration(mult) * d
|
||||||
|
}
|
||||||
|
|
||||||
|
type StateSyncer struct {
|
||||||
|
// paused is used to check if we are paused. Must be the first
|
||||||
|
// element due to a go bug.
|
||||||
|
// todo(fs): which bug? still relevant?
|
||||||
|
paused int32
|
||||||
|
|
||||||
|
// State contains the data that needs to be synchronized.
|
||||||
|
State interface {
|
||||||
|
UpdateSyncState() error
|
||||||
|
SyncChanges() error
|
||||||
|
}
|
||||||
|
|
||||||
|
// Interval is the time between two sync runs.
|
||||||
|
Interval time.Duration
|
||||||
|
|
||||||
|
// ClusterSize returns the number of members in the cluster.
|
||||||
|
// todo(fs): we use this for staggering but what about a random number?
|
||||||
|
ClusterSize func() int
|
||||||
|
|
||||||
|
// ShutdownCh is closed when the application is shutting down.
|
||||||
|
ShutdownCh chan struct{}
|
||||||
|
|
||||||
|
// ConsulCh contains data when a new consul server has been added to the cluster.
|
||||||
|
ConsulCh chan struct{}
|
||||||
|
|
||||||
|
// TriggerCh contains data when a sync should run immediately.
|
||||||
|
TriggerCh chan struct{}
|
||||||
|
|
||||||
|
Logger *log.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pause is used to pause state synchronization, this can be
|
||||||
|
// used to make batch changes
|
||||||
|
func (ae *StateSyncer) Pause() {
|
||||||
|
atomic.AddInt32(&ae.paused, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Resume is used to resume state synchronization
|
||||||
|
func (ae *StateSyncer) Resume() {
|
||||||
|
paused := atomic.AddInt32(&ae.paused, -1)
|
||||||
|
if paused < 0 {
|
||||||
|
panic("unbalanced State.Resume() detected")
|
||||||
|
}
|
||||||
|
ae.changeMade()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Paused is used to check if we are paused
|
||||||
|
func (ae *StateSyncer) Paused() bool {
|
||||||
|
return atomic.LoadInt32(&ae.paused) > 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ae *StateSyncer) changeMade() {
|
||||||
|
select {
|
||||||
|
case ae.TriggerCh <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// antiEntropy is a long running method used to perform anti-entropy
|
||||||
|
// between local and remote state.
|
||||||
|
func (ae *StateSyncer) Run() {
|
||||||
|
SYNC:
|
||||||
|
// Sync our state with the servers
|
||||||
|
for {
|
||||||
|
err := ae.State.UpdateSyncState()
|
||||||
|
if err == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
ae.Logger.Printf("[ERR] agent: failed to sync remote state: %v", err)
|
||||||
|
select {
|
||||||
|
case <-ae.ConsulCh:
|
||||||
|
// Stagger the retry on leader election, avoid a thundering heard
|
||||||
|
select {
|
||||||
|
case <-time.After(lib.RandomStagger(aeScale(syncStaggerIntv, ae.ClusterSize()))):
|
||||||
|
case <-ae.ShutdownCh:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
case <-time.After(syncRetryIntv + lib.RandomStagger(aeScale(syncRetryIntv, ae.ClusterSize()))):
|
||||||
|
case <-ae.ShutdownCh:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Force-trigger AE to pickup any changes
|
||||||
|
ae.changeMade()
|
||||||
|
|
||||||
|
// Schedule the next full sync, with a random stagger
|
||||||
|
aeIntv := aeScale(ae.Interval, ae.ClusterSize())
|
||||||
|
aeIntv = aeIntv + lib.RandomStagger(aeIntv)
|
||||||
|
aeTimer := time.After(aeIntv)
|
||||||
|
|
||||||
|
// Wait for sync events
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-aeTimer:
|
||||||
|
goto SYNC
|
||||||
|
case <-ae.TriggerCh:
|
||||||
|
// Skip the sync if we are paused
|
||||||
|
if ae.Paused() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err := ae.State.SyncChanges(); err != nil {
|
||||||
|
ae.Logger.Printf("[ERR] agent: failed to sync changes: %v", err)
|
||||||
|
}
|
||||||
|
case <-ae.ShutdownCh:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,55 @@
|
||||||
|
package ae
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestAE_scale(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
intv := time.Minute
|
||||||
|
if v := aeScale(intv, 100); v != intv {
|
||||||
|
t.Fatalf("Bad: %v", v)
|
||||||
|
}
|
||||||
|
if v := aeScale(intv, 200); v != 2*intv {
|
||||||
|
t.Fatalf("Bad: %v", v)
|
||||||
|
}
|
||||||
|
if v := aeScale(intv, 1000); v != 4*intv {
|
||||||
|
t.Fatalf("Bad: %v", v)
|
||||||
|
}
|
||||||
|
if v := aeScale(intv, 10000); v != 8*intv {
|
||||||
|
t.Fatalf("Bad: %v", v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAE_nestedPauseResume(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
l := new(StateSyncer)
|
||||||
|
if l.Paused() != false {
|
||||||
|
t.Fatal("syncer should be unPaused after init")
|
||||||
|
}
|
||||||
|
l.Pause()
|
||||||
|
if l.Paused() != true {
|
||||||
|
t.Fatal("syncer should be Paused after first call to Pause()")
|
||||||
|
}
|
||||||
|
l.Pause()
|
||||||
|
if l.Paused() != true {
|
||||||
|
t.Fatal("syncer should STILL be Paused after second call to Pause()")
|
||||||
|
}
|
||||||
|
l.Resume()
|
||||||
|
if l.Paused() != true {
|
||||||
|
t.Fatal("syncer should STILL be Paused after FIRST call to Resume()")
|
||||||
|
}
|
||||||
|
l.Resume()
|
||||||
|
if l.Paused() != false {
|
||||||
|
t.Fatal("syncer should NOT be Paused after SECOND call to Resume()")
|
||||||
|
}
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
err := recover()
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("unbalanced Resume() should cause a panic()")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
l.Resume()
|
||||||
|
}
|
|
@ -20,6 +20,7 @@ import (
|
||||||
|
|
||||||
"github.com/armon/go-metrics"
|
"github.com/armon/go-metrics"
|
||||||
"github.com/hashicorp/consul/acl"
|
"github.com/hashicorp/consul/acl"
|
||||||
|
"github.com/hashicorp/consul/agent/ae"
|
||||||
"github.com/hashicorp/consul/agent/config"
|
"github.com/hashicorp/consul/agent/config"
|
||||||
"github.com/hashicorp/consul/agent/consul"
|
"github.com/hashicorp/consul/agent/consul"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
@ -109,6 +110,10 @@ type Agent struct {
|
||||||
// services and checks. Used for anti-entropy.
|
// services and checks. Used for anti-entropy.
|
||||||
state *localState
|
state *localState
|
||||||
|
|
||||||
|
// sync manages the synchronization of the local
|
||||||
|
// and the remote state.
|
||||||
|
sync *ae.StateSyncer
|
||||||
|
|
||||||
// checkReapAfter maps the check ID to a timeout after which we should
|
// checkReapAfter maps the check ID to a timeout after which we should
|
||||||
// reap its associated service
|
// reap its associated service
|
||||||
checkReapAfter map[types.CheckID]time.Duration
|
checkReapAfter map[types.CheckID]time.Duration
|
||||||
|
@ -241,8 +246,27 @@ func (a *Agent) Start() error {
|
||||||
return fmt.Errorf("Failed to setup node ID: %v", err)
|
return fmt.Errorf("Failed to setup node ID: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// create a notif channel to trigger state sychronizations
|
||||||
|
// when a consul server was added to the cluster.
|
||||||
|
consulCh := make(chan struct{}, 1)
|
||||||
|
|
||||||
|
// create a notif channel to trigger state synchronizations
|
||||||
|
// when the state has changed.
|
||||||
|
triggerCh := make(chan struct{}, 1)
|
||||||
|
|
||||||
// create the local state
|
// create the local state
|
||||||
a.state = NewLocalState(c, a.logger, a.tokens)
|
a.state = NewLocalState(c, a.logger, a.tokens, triggerCh)
|
||||||
|
|
||||||
|
// create the state synchronization manager which performs
|
||||||
|
// regular and on-demand state synchronizations (anti-entropy).
|
||||||
|
a.sync = &ae.StateSyncer{
|
||||||
|
State: a.state,
|
||||||
|
Interval: c.AEInterval,
|
||||||
|
ShutdownCh: a.shutdownCh,
|
||||||
|
ConsulCh: consulCh,
|
||||||
|
TriggerCh: triggerCh,
|
||||||
|
Logger: a.logger,
|
||||||
|
}
|
||||||
|
|
||||||
// create the config for the rpc server/client
|
// create the config for the rpc server/client
|
||||||
consulCfg, err := a.consulConfig()
|
consulCfg, err := a.consulConfig()
|
||||||
|
@ -250,8 +274,16 @@ func (a *Agent) Start() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// link consul client/server with the state
|
// ServerUp is used to inform that a new consul server is now
|
||||||
consulCfg.ServerUp = a.state.ConsulServerUp
|
// up. This can be used to speed up the sync process if we are blocking
|
||||||
|
// waiting to discover a consul server
|
||||||
|
// todo(fs): IMO, the non-blocking nature of this call should be hidden in the syncer
|
||||||
|
consulCfg.ServerUp = func() {
|
||||||
|
select {
|
||||||
|
case consulCh <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Setup either the client or the server.
|
// Setup either the client or the server.
|
||||||
if c.ServerMode {
|
if c.ServerMode {
|
||||||
|
@ -262,6 +294,7 @@ func (a *Agent) Start() error {
|
||||||
|
|
||||||
a.delegate = server
|
a.delegate = server
|
||||||
a.state.delegate = server
|
a.state.delegate = server
|
||||||
|
a.sync.ClusterSize = func() int { return len(server.LANMembers()) }
|
||||||
} else {
|
} else {
|
||||||
client, err := consul.NewClientLogger(consulCfg, a.logger)
|
client, err := consul.NewClientLogger(consulCfg, a.logger)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -270,6 +303,7 @@ func (a *Agent) Start() error {
|
||||||
|
|
||||||
a.delegate = client
|
a.delegate = client
|
||||||
a.state.delegate = client
|
a.state.delegate = client
|
||||||
|
a.sync.ClusterSize = func() int { return len(client.LANMembers()) }
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load checks/services/metadata.
|
// Load checks/services/metadata.
|
||||||
|
@ -1264,18 +1298,18 @@ func (a *Agent) WANMembers() []serf.Member {
|
||||||
// StartSync is called once Services and Checks are registered.
|
// StartSync is called once Services and Checks are registered.
|
||||||
// This is called to prevent a race between clients and the anti-entropy routines
|
// This is called to prevent a race between clients and the anti-entropy routines
|
||||||
func (a *Agent) StartSync() {
|
func (a *Agent) StartSync() {
|
||||||
// Start the anti entropy routine
|
go a.sync.Run()
|
||||||
go a.state.antiEntropy(a.shutdownCh)
|
a.logger.Printf("[INFO] agent: starting state syncer")
|
||||||
}
|
}
|
||||||
|
|
||||||
// PauseSync is used to pause anti-entropy while bulk changes are make
|
// PauseSync is used to pause anti-entropy while bulk changes are make
|
||||||
func (a *Agent) PauseSync() {
|
func (a *Agent) PauseSync() {
|
||||||
a.state.Pause()
|
a.sync.Pause()
|
||||||
}
|
}
|
||||||
|
|
||||||
// ResumeSync is used to unpause anti-entropy after bulk changes are make
|
// ResumeSync is used to unpause anti-entropy after bulk changes are make
|
||||||
func (a *Agent) ResumeSync() {
|
func (a *Agent) ResumeSync() {
|
||||||
a.state.Resume()
|
a.sync.Resume()
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetLANCoordinate returns the coordinates of this node in the local pools
|
// GetLANCoordinate returns the coordinates of this node in the local pools
|
||||||
|
|
|
@ -304,7 +304,7 @@ func (s *HTTPServer) AgentForceLeave(resp http.ResponseWriter, req *http.Request
|
||||||
// services and checks to the server. If the operation fails, we only
|
// services and checks to the server. If the operation fails, we only
|
||||||
// only warn because the write did succeed and anti-entropy will sync later.
|
// only warn because the write did succeed and anti-entropy will sync later.
|
||||||
func (s *HTTPServer) syncChanges() {
|
func (s *HTTPServer) syncChanges() {
|
||||||
if err := s.agent.state.syncChanges(); err != nil {
|
if err := s.agent.state.SyncChanges(); err != nil {
|
||||||
s.agent.logger.Printf("[ERR] agent: failed to sync changes: %v", err)
|
s.agent.logger.Printf("[ERR] agent: failed to sync changes: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
114
agent/local.go
114
agent/local.go
|
@ -18,11 +18,6 @@ import (
|
||||||
"github.com/hashicorp/consul/types"
|
"github.com/hashicorp/consul/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
|
||||||
syncStaggerIntv = 3 * time.Second
|
|
||||||
syncRetryIntv = 15 * time.Second
|
|
||||||
)
|
|
||||||
|
|
||||||
// syncStatus is used to represent the difference between
|
// syncStatus is used to represent the difference between
|
||||||
// the local and remote state, and if action needs to be taken
|
// the local and remote state, and if action needs to be taken
|
||||||
type syncStatus struct {
|
type syncStatus struct {
|
||||||
|
@ -33,7 +28,6 @@ type syncStatus struct {
|
||||||
// populated during NewLocalAgent from the agent configuration to avoid
|
// populated during NewLocalAgent from the agent configuration to avoid
|
||||||
// race conditions with the agent configuration.
|
// race conditions with the agent configuration.
|
||||||
type localStateConfig struct {
|
type localStateConfig struct {
|
||||||
AEInterval time.Duration
|
|
||||||
AdvertiseAddr string
|
AdvertiseAddr string
|
||||||
CheckUpdateInterval time.Duration
|
CheckUpdateInterval time.Duration
|
||||||
Datacenter string
|
Datacenter string
|
||||||
|
@ -47,10 +41,6 @@ type localStateConfig struct {
|
||||||
// and checks. We used it to perform anti-entropy with the
|
// and checks. We used it to perform anti-entropy with the
|
||||||
// catalog representation
|
// catalog representation
|
||||||
type localState struct {
|
type localState struct {
|
||||||
// paused is used to check if we are paused. Must be the first
|
|
||||||
// element due to a go bug.
|
|
||||||
paused int32
|
|
||||||
|
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
logger *log.Logger
|
logger *log.Logger
|
||||||
|
|
||||||
|
@ -81,10 +71,6 @@ type localState struct {
|
||||||
// metadata tracks the local metadata fields
|
// metadata tracks the local metadata fields
|
||||||
metadata map[string]string
|
metadata map[string]string
|
||||||
|
|
||||||
// consulCh is used to inform of a change to the known
|
|
||||||
// consul nodes. This may be used to retry a sync run
|
|
||||||
consulCh chan struct{}
|
|
||||||
|
|
||||||
// triggerCh is used to inform of a change to local state
|
// triggerCh is used to inform of a change to local state
|
||||||
// that requires anti-entropy with the server
|
// that requires anti-entropy with the server
|
||||||
triggerCh chan struct{}
|
triggerCh chan struct{}
|
||||||
|
@ -95,9 +81,8 @@ type localState struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewLocalState creates a is used to initialize the local state
|
// NewLocalState creates a is used to initialize the local state
|
||||||
func NewLocalState(c *config.RuntimeConfig, lg *log.Logger, tokens *token.Store) *localState {
|
func NewLocalState(c *config.RuntimeConfig, lg *log.Logger, tokens *token.Store, triggerCh chan struct{}) *localState {
|
||||||
lc := localStateConfig{
|
lc := localStateConfig{
|
||||||
AEInterval: c.AEInterval,
|
|
||||||
AdvertiseAddr: c.AdvertiseAddrLAN.String(),
|
AdvertiseAddr: c.AdvertiseAddrLAN.String(),
|
||||||
CheckUpdateInterval: c.CheckUpdateInterval,
|
CheckUpdateInterval: c.CheckUpdateInterval,
|
||||||
Datacenter: c.Datacenter,
|
Datacenter: c.Datacenter,
|
||||||
|
@ -122,8 +107,7 @@ func NewLocalState(c *config.RuntimeConfig, lg *log.Logger, tokens *token.Store)
|
||||||
checkCriticalTime: make(map[types.CheckID]time.Time),
|
checkCriticalTime: make(map[types.CheckID]time.Time),
|
||||||
deferCheck: make(map[types.CheckID]*time.Timer),
|
deferCheck: make(map[types.CheckID]*time.Timer),
|
||||||
metadata: make(map[string]string),
|
metadata: make(map[string]string),
|
||||||
consulCh: make(chan struct{}, 1),
|
triggerCh: triggerCh,
|
||||||
triggerCh: make(chan struct{}, 1),
|
|
||||||
}
|
}
|
||||||
l.discardCheckOutput.Store(c.DiscardCheckOutput)
|
l.discardCheckOutput.Store(c.DiscardCheckOutput)
|
||||||
return l
|
return l
|
||||||
|
@ -131,42 +115,13 @@ func NewLocalState(c *config.RuntimeConfig, lg *log.Logger, tokens *token.Store)
|
||||||
|
|
||||||
// changeMade is used to trigger an anti-entropy run
|
// changeMade is used to trigger an anti-entropy run
|
||||||
func (l *localState) changeMade() {
|
func (l *localState) changeMade() {
|
||||||
|
// todo(fs): IMO, the non-blocking nature of this call should be hidden in the syncer
|
||||||
select {
|
select {
|
||||||
case l.triggerCh <- struct{}{}:
|
case l.triggerCh <- struct{}{}:
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ConsulServerUp is used to inform that a new consul server is now
|
|
||||||
// up. This can be used to speed up the sync process if we are blocking
|
|
||||||
// waiting to discover a consul server
|
|
||||||
func (l *localState) ConsulServerUp() {
|
|
||||||
select {
|
|
||||||
case l.consulCh <- struct{}{}:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Pause is used to pause state synchronization, this can be
|
|
||||||
// used to make batch changes
|
|
||||||
func (l *localState) Pause() {
|
|
||||||
atomic.AddInt32(&l.paused, 1)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Resume is used to resume state synchronization
|
|
||||||
func (l *localState) Resume() {
|
|
||||||
paused := atomic.AddInt32(&l.paused, -1)
|
|
||||||
if paused < 0 {
|
|
||||||
panic("unbalanced localState.Resume() detected")
|
|
||||||
}
|
|
||||||
l.changeMade()
|
|
||||||
}
|
|
||||||
|
|
||||||
// isPaused is used to check if we are paused
|
|
||||||
func (l *localState) isPaused() bool {
|
|
||||||
return atomic.LoadInt32(&l.paused) > 0
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *localState) SetDiscardCheckOutput(b bool) {
|
func (l *localState) SetDiscardCheckOutput(b bool) {
|
||||||
l.discardCheckOutput.Store(b)
|
l.discardCheckOutput.Store(b)
|
||||||
}
|
}
|
||||||
|
@ -412,61 +367,12 @@ func (l *localState) Metadata() map[string]string {
|
||||||
return metadata
|
return metadata
|
||||||
}
|
}
|
||||||
|
|
||||||
// antiEntropy is a long running method used to perform anti-entropy
|
// UpdateSyncState does a read of the server state, and updates
|
||||||
// between local and remote state.
|
// the local sync status as appropriate
|
||||||
func (l *localState) antiEntropy(shutdownCh chan struct{}) {
|
func (l *localState) UpdateSyncState() error {
|
||||||
SYNC:
|
if l == nil {
|
||||||
// Sync our state with the servers
|
panic("config == nil")
|
||||||
for {
|
|
||||||
err := l.setSyncState()
|
|
||||||
if err == nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
l.logger.Printf("[ERR] agent: failed to sync remote state: %v", err)
|
|
||||||
select {
|
|
||||||
case <-l.consulCh:
|
|
||||||
// Stagger the retry on leader election, avoid a thundering heard
|
|
||||||
select {
|
|
||||||
case <-time.After(lib.RandomStagger(aeScale(syncStaggerIntv, len(l.delegate.LANMembers())))):
|
|
||||||
case <-shutdownCh:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
case <-time.After(syncRetryIntv + lib.RandomStagger(aeScale(syncRetryIntv, len(l.delegate.LANMembers())))):
|
|
||||||
case <-shutdownCh:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Force-trigger AE to pickup any changes
|
|
||||||
l.changeMade()
|
|
||||||
|
|
||||||
// Schedule the next full sync, with a random stagger
|
|
||||||
aeIntv := aeScale(l.config.AEInterval, len(l.delegate.LANMembers()))
|
|
||||||
aeIntv = aeIntv + lib.RandomStagger(aeIntv)
|
|
||||||
aeTimer := time.After(aeIntv)
|
|
||||||
|
|
||||||
// Wait for sync events
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-aeTimer:
|
|
||||||
goto SYNC
|
|
||||||
case <-l.triggerCh:
|
|
||||||
// Skip the sync if we are paused
|
|
||||||
if l.isPaused() {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if err := l.syncChanges(); err != nil {
|
|
||||||
l.logger.Printf("[ERR] agent: failed to sync changes: %v", err)
|
|
||||||
}
|
|
||||||
case <-shutdownCh:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// setSyncState does a read of the server state, and updates
|
|
||||||
// the local syncStatus as appropriate
|
|
||||||
func (l *localState) setSyncState() error {
|
|
||||||
req := structs.NodeSpecificRequest{
|
req := structs.NodeSpecificRequest{
|
||||||
Datacenter: l.config.Datacenter,
|
Datacenter: l.config.Datacenter,
|
||||||
Node: l.config.NodeName,
|
Node: l.config.NodeName,
|
||||||
|
@ -590,9 +496,9 @@ func (l *localState) setSyncState() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// syncChanges is used to scan the status our local services and checks
|
// SyncChanges is used to scan the status our local services and checks
|
||||||
// and update any that are out of sync with the server
|
// and update any that are out of sync with the server
|
||||||
func (l *localState) syncChanges() error {
|
func (l *localState) SyncChanges() error {
|
||||||
l.Lock()
|
l.Lock()
|
||||||
defer l.Unlock()
|
defer l.Unlock()
|
||||||
|
|
||||||
|
|
|
@ -1482,7 +1482,7 @@ func TestAgent_serviceTokens(t *testing.T) {
|
||||||
|
|
||||||
tokens := new(token.Store)
|
tokens := new(token.Store)
|
||||||
tokens.UpdateUserToken("default")
|
tokens.UpdateUserToken("default")
|
||||||
l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, tokens)
|
l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, tokens, make(chan struct{}, 1))
|
||||||
|
|
||||||
l.AddService(&structs.NodeService{
|
l.AddService(&structs.NodeService{
|
||||||
ID: "redis",
|
ID: "redis",
|
||||||
|
@ -1511,7 +1511,7 @@ func TestAgent_checkTokens(t *testing.T) {
|
||||||
|
|
||||||
tokens := new(token.Store)
|
tokens := new(token.Store)
|
||||||
tokens.UpdateUserToken("default")
|
tokens.UpdateUserToken("default")
|
||||||
l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, tokens)
|
l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, tokens, make(chan struct{}, 1))
|
||||||
|
|
||||||
// Returns default when no token is set
|
// Returns default when no token is set
|
||||||
if token := l.CheckToken("mem"); token != "default" {
|
if token := l.CheckToken("mem"); token != "default" {
|
||||||
|
@ -1533,7 +1533,7 @@ func TestAgent_checkTokens(t *testing.T) {
|
||||||
|
|
||||||
func TestAgent_checkCriticalTime(t *testing.T) {
|
func TestAgent_checkCriticalTime(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, new(token.Store))
|
l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, new(token.Store), make(chan struct{}, 1))
|
||||||
|
|
||||||
svc := &structs.NodeService{ID: "redis", Service: "redis", Port: 8000}
|
svc := &structs.NodeService{ID: "redis", Service: "redis", Port: 8000}
|
||||||
l.AddService(svc, "")
|
l.AddService(svc, "")
|
||||||
|
@ -1595,7 +1595,7 @@ func TestAgent_checkCriticalTime(t *testing.T) {
|
||||||
|
|
||||||
func TestAgent_AddCheckFailure(t *testing.T) {
|
func TestAgent_AddCheckFailure(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, new(token.Store))
|
l := NewLocalState(config.DefaultRuntimeConfig(`bind_addr = "127.0.0.1" data_dir = "dummy"`), nil, new(token.Store), make(chan struct{}, 1))
|
||||||
|
|
||||||
// Add a check for a service that does not exist and verify that it fails
|
// Add a check for a service that does not exist and verify that it fails
|
||||||
checkID := types.CheckID("redis:1")
|
checkID := types.CheckID("redis:1")
|
||||||
|
@ -1613,38 +1613,6 @@ func TestAgent_AddCheckFailure(t *testing.T) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAgent_nestedPauseResume(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
l := new(localState)
|
|
||||||
if l.isPaused() != false {
|
|
||||||
t.Fatal("localState should be unPaused after init")
|
|
||||||
}
|
|
||||||
l.Pause()
|
|
||||||
if l.isPaused() != true {
|
|
||||||
t.Fatal("localState should be Paused after first call to Pause()")
|
|
||||||
}
|
|
||||||
l.Pause()
|
|
||||||
if l.isPaused() != true {
|
|
||||||
t.Fatal("localState should STILL be Paused after second call to Pause()")
|
|
||||||
}
|
|
||||||
l.Resume()
|
|
||||||
if l.isPaused() != true {
|
|
||||||
t.Fatal("localState should STILL be Paused after FIRST call to Resume()")
|
|
||||||
}
|
|
||||||
l.Resume()
|
|
||||||
if l.isPaused() != false {
|
|
||||||
t.Fatal("localState should NOT be Paused after SECOND call to Resume()")
|
|
||||||
}
|
|
||||||
|
|
||||||
defer func() {
|
|
||||||
err := recover()
|
|
||||||
if err == nil {
|
|
||||||
t.Fatal("unbalanced Resume() should cause a panic()")
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
l.Resume()
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestAgent_sendCoordinate(t *testing.T) {
|
func TestAgent_sendCoordinate(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
a := NewTestAgent(t.Name(), `
|
a := NewTestAgent(t.Name(), `
|
||||||
|
|
|
@ -4,28 +4,16 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"crypto/md5"
|
"crypto/md5"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
osuser "os/user"
|
osuser "os/user"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/hashicorp/consul/types"
|
"github.com/hashicorp/consul/types"
|
||||||
"github.com/hashicorp/go-msgpack/codec"
|
"github.com/hashicorp/go-msgpack/codec"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
|
||||||
// This scale factor means we will add a minute after we cross 128 nodes,
|
|
||||||
// another at 256, another at 512, etc. By 8192 nodes, we will scale up
|
|
||||||
// by a factor of 8.
|
|
||||||
//
|
|
||||||
// If you update this, you may need to adjust the tuning of
|
|
||||||
// CoordinateUpdatePeriod and CoordinateUpdateMaxBatchSize.
|
|
||||||
aeScaleThreshold = 128
|
|
||||||
)
|
|
||||||
|
|
||||||
// msgpackHandle is a shared handle for encoding/decoding of
|
// msgpackHandle is a shared handle for encoding/decoding of
|
||||||
// messages
|
// messages
|
||||||
var msgpackHandle = &codec.MsgpackHandle{
|
var msgpackHandle = &codec.MsgpackHandle{
|
||||||
|
@ -33,18 +21,6 @@ var msgpackHandle = &codec.MsgpackHandle{
|
||||||
WriteExt: true,
|
WriteExt: true,
|
||||||
}
|
}
|
||||||
|
|
||||||
// aeScale is used to scale the time interval at which anti-entropy updates take
|
|
||||||
// place. It is used to prevent saturation as the cluster size grows.
|
|
||||||
func aeScale(interval time.Duration, n int) time.Duration {
|
|
||||||
// Don't scale until we cross the threshold
|
|
||||||
if n <= aeScaleThreshold {
|
|
||||||
return interval
|
|
||||||
}
|
|
||||||
|
|
||||||
multiplier := math.Ceil(math.Log2(float64(n))-math.Log2(aeScaleThreshold)) + 1.0
|
|
||||||
return time.Duration(multiplier) * interval
|
|
||||||
}
|
|
||||||
|
|
||||||
// decodeMsgPack is used to decode a MsgPack encoded object
|
// decodeMsgPack is used to decode a MsgPack encoded object
|
||||||
func decodeMsgPack(buf []byte, out interface{}) error {
|
func decodeMsgPack(buf []byte, out interface{}) error {
|
||||||
return codec.NewDecoder(bytes.NewReader(buf), msgpackHandle).Decode(out)
|
return codec.NewDecoder(bytes.NewReader(buf), msgpackHandle).Decode(out)
|
||||||
|
|
|
@ -4,28 +4,10 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"runtime"
|
"runtime"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/hashicorp/consul/testutil"
|
"github.com/hashicorp/consul/testutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestAEScale(t *testing.T) {
|
|
||||||
t.Parallel()
|
|
||||||
intv := time.Minute
|
|
||||||
if v := aeScale(intv, 100); v != intv {
|
|
||||||
t.Fatalf("Bad: %v", v)
|
|
||||||
}
|
|
||||||
if v := aeScale(intv, 200); v != 2*intv {
|
|
||||||
t.Fatalf("Bad: %v", v)
|
|
||||||
}
|
|
||||||
if v := aeScale(intv, 1000); v != 4*intv {
|
|
||||||
t.Fatalf("Bad: %v", v)
|
|
||||||
}
|
|
||||||
if v := aeScale(intv, 10000); v != 8*intv {
|
|
||||||
t.Fatalf("Bad: %v", v)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestStringHash(t *testing.T) {
|
func TestStringHash(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
in := "hello world"
|
in := "hello world"
|
||||||
|
|
Loading…
Reference in New Issue