From 066ad01c3804a39161c4d46147aa2eb86e48cd67 Mon Sep 17 00:00:00 2001 From: Frank Schroeder Date: Thu, 19 Oct 2017 11:12:56 +0200 Subject: [PATCH] ae: make stagger function pluggable for testing --- agent/ae/ae.go | 52 +++++++++++++++++++++++++++++++------------------- 1 file changed, 32 insertions(+), 20 deletions(-) diff --git a/agent/ae/ae.go b/agent/ae/ae.go index e20d918391..36950df6b0 100644 --- a/agent/ae/ae.go +++ b/agent/ae/ae.go @@ -53,7 +53,7 @@ type StateSyncer struct { // State contains the data that needs to be synchronized. State State - // Interval is the time between two regular sync runs. + // Interval is the time between two full sync runs. Interval time.Duration // ShutdownCh is closed when the application is shutting down. @@ -78,17 +78,16 @@ type StateSyncer struct { // paused stores whether sync runs are temporarily disabled. pauseLock sync.Mutex paused int -} -func NewStateSyner(state State, intv time.Duration, shutdownCh chan struct{}, logger *log.Logger) *StateSyncer { - return &StateSyncer{ - State: state, - Interval: intv, - ShutdownCh: shutdownCh, - Logger: logger, - SyncFull: NewTrigger(), - SyncChanges: NewTrigger(), - } + // stagger randomly picks a duration between 0s and the given duration. + stagger func(time.Duration) time.Duration + + // serverUpInterval is the max time after which a full sync is + // performed when a server has been added to the cluster. + serverUpInterval time.Duration + + // retryFailInterval is the time after which a failed full sync is retried. + retryFailInterval time.Duration } const ( @@ -100,6 +99,24 @@ const ( retryFailIntv = 15 * time.Second ) +func NewStateSyner(state State, intv time.Duration, shutdownCh chan struct{}, logger *log.Logger) *StateSyncer { + s := &StateSyncer{ + State: state, + Interval: intv, + ShutdownCh: shutdownCh, + Logger: logger, + SyncFull: NewTrigger(), + SyncChanges: NewTrigger(), + serverUpInterval: serverUpIntv, + retryFailInterval: retryFailIntv, + } + s.stagger = func(d time.Duration) time.Duration { + f := scaleFactor(s.ClusterSize()) + return lib.RandomStagger(time.Duration(f) * d) + } + return s +} + var errPaused = errors.New("paused") // Run is the long running method to perform state synchronization @@ -109,11 +126,6 @@ func (s *StateSyncer) Run() { panic("ClusterSize not set") } - stagger := func(d time.Duration) time.Duration { - f := scaleFactor(s.ClusterSize()) - return lib.RandomStagger(time.Duration(f) * d) - } - FullSync: for { // attempt a full sync @@ -129,14 +141,14 @@ FullSync: // stagger the delay to avoid a thundering herd. case <-s.SyncFull.Notif(): select { - case <-time.After(stagger(serverUpIntv)): + case <-time.After(s.stagger(s.serverUpInterval)): case <-s.ShutdownCh: return } // retry full sync after some time // todo(fs): why don't we use s.Interval here? - case <-time.After(retryFailIntv + stagger(retryFailIntv)): + case <-time.After(s.retryFailInterval + s.stagger(s.retryFailInterval)): case <-s.ShutdownCh: return @@ -153,14 +165,14 @@ FullSync: // stagger the delay to avoid a thundering herd. case <-s.SyncFull.Notif(): select { - case <-time.After(stagger(serverUpIntv)): + case <-time.After(s.stagger(s.serverUpInterval)): continue FullSync case <-s.ShutdownCh: return } // time for a full sync again - case <-time.After(s.Interval + stagger(s.Interval)): + case <-time.After(s.Interval + s.stagger(s.Interval)): continue FullSync // do partial syncs on demand