From 93d03595d100aa9ffb66a5a0ca2421a72f4fc2b8 Mon Sep 17 00:00:00 2001 From: Frank Schroeder Date: Mon, 23 Oct 2017 10:08:33 +0200 Subject: [PATCH] Revert "ae: make stagger function pluggable for testing" This reverts commit 066ad01c3804a39161c4d46147aa2eb86e48cd67. --- agent/ae/ae.go | 52 +++++++++++++++++++------------------------------- 1 file changed, 20 insertions(+), 32 deletions(-) diff --git a/agent/ae/ae.go b/agent/ae/ae.go index 36950df6b0..e20d918391 100644 --- a/agent/ae/ae.go +++ b/agent/ae/ae.go @@ -53,7 +53,7 @@ type StateSyncer struct { // State contains the data that needs to be synchronized. State State - // Interval is the time between two full sync runs. + // Interval is the time between two regular sync runs. Interval time.Duration // ShutdownCh is closed when the application is shutting down. @@ -78,16 +78,17 @@ type StateSyncer struct { // paused stores whether sync runs are temporarily disabled. pauseLock sync.Mutex paused int +} - // stagger randomly picks a duration between 0s and the given duration. - stagger func(time.Duration) time.Duration - - // serverUpInterval is the max time after which a full sync is - // performed when a server has been added to the cluster. - serverUpInterval time.Duration - - // retryFailInterval is the time after which a failed full sync is retried. - retryFailInterval time.Duration +func NewStateSyner(state State, intv time.Duration, shutdownCh chan struct{}, logger *log.Logger) *StateSyncer { + return &StateSyncer{ + State: state, + Interval: intv, + ShutdownCh: shutdownCh, + Logger: logger, + SyncFull: NewTrigger(), + SyncChanges: NewTrigger(), + } } const ( @@ -99,24 +100,6 @@ const ( retryFailIntv = 15 * time.Second ) -func NewStateSyner(state State, intv time.Duration, shutdownCh chan struct{}, logger *log.Logger) *StateSyncer { - s := &StateSyncer{ - State: state, - Interval: intv, - ShutdownCh: shutdownCh, - Logger: logger, - SyncFull: NewTrigger(), - SyncChanges: NewTrigger(), - serverUpInterval: serverUpIntv, - retryFailInterval: retryFailIntv, - } - s.stagger = func(d time.Duration) time.Duration { - f := scaleFactor(s.ClusterSize()) - return lib.RandomStagger(time.Duration(f) * d) - } - return s -} - var errPaused = errors.New("paused") // Run is the long running method to perform state synchronization @@ -126,6 +109,11 @@ func (s *StateSyncer) Run() { panic("ClusterSize not set") } + stagger := func(d time.Duration) time.Duration { + f := scaleFactor(s.ClusterSize()) + return lib.RandomStagger(time.Duration(f) * d) + } + FullSync: for { // attempt a full sync @@ -141,14 +129,14 @@ FullSync: // stagger the delay to avoid a thundering herd. case <-s.SyncFull.Notif(): select { - case <-time.After(s.stagger(s.serverUpInterval)): + case <-time.After(stagger(serverUpIntv)): case <-s.ShutdownCh: return } // retry full sync after some time // todo(fs): why don't we use s.Interval here? - case <-time.After(s.retryFailInterval + s.stagger(s.retryFailInterval)): + case <-time.After(retryFailIntv + stagger(retryFailIntv)): case <-s.ShutdownCh: return @@ -165,14 +153,14 @@ FullSync: // stagger the delay to avoid a thundering herd. case <-s.SyncFull.Notif(): select { - case <-time.After(s.stagger(s.serverUpInterval)): + case <-time.After(stagger(serverUpIntv)): continue FullSync case <-s.ShutdownCh: return } // time for a full sync again - case <-time.After(s.Interval + s.stagger(s.Interval)): + case <-time.After(s.Interval + stagger(s.Interval)): continue FullSync // do partial syncs on demand