Revert "ae: make stagger function pluggable for testing"

This reverts commit 066ad01c38.
pull/3607/merge
Frank Schroeder 7 years ago
parent 3d202b59bc
commit 93d03595d1
No known key found for this signature in database
GPG Key ID: 4D65C6EAEC87DECD

@ -53,7 +53,7 @@ type StateSyncer struct {
// State contains the data that needs to be synchronized. // State contains the data that needs to be synchronized.
State State State State
// Interval is the time between two full sync runs. // Interval is the time between two regular sync runs.
Interval time.Duration Interval time.Duration
// ShutdownCh is closed when the application is shutting down. // ShutdownCh is closed when the application is shutting down.
@ -78,16 +78,17 @@ type StateSyncer struct {
// paused stores whether sync runs are temporarily disabled. // paused stores whether sync runs are temporarily disabled.
pauseLock sync.Mutex pauseLock sync.Mutex
paused int paused int
}
// stagger randomly picks a duration between 0s and the given duration. func NewStateSyner(state State, intv time.Duration, shutdownCh chan struct{}, logger *log.Logger) *StateSyncer {
stagger func(time.Duration) time.Duration return &StateSyncer{
State: state,
// serverUpInterval is the max time after which a full sync is Interval: intv,
// performed when a server has been added to the cluster. ShutdownCh: shutdownCh,
serverUpInterval time.Duration Logger: logger,
SyncFull: NewTrigger(),
// retryFailInterval is the time after which a failed full sync is retried. SyncChanges: NewTrigger(),
retryFailInterval time.Duration }
} }
const ( const (
@ -99,24 +100,6 @@ const (
retryFailIntv = 15 * time.Second retryFailIntv = 15 * time.Second
) )
func NewStateSyner(state State, intv time.Duration, shutdownCh chan struct{}, logger *log.Logger) *StateSyncer {
s := &StateSyncer{
State: state,
Interval: intv,
ShutdownCh: shutdownCh,
Logger: logger,
SyncFull: NewTrigger(),
SyncChanges: NewTrigger(),
serverUpInterval: serverUpIntv,
retryFailInterval: retryFailIntv,
}
s.stagger = func(d time.Duration) time.Duration {
f := scaleFactor(s.ClusterSize())
return lib.RandomStagger(time.Duration(f) * d)
}
return s
}
var errPaused = errors.New("paused") var errPaused = errors.New("paused")
// Run is the long running method to perform state synchronization // Run is the long running method to perform state synchronization
@ -126,6 +109,11 @@ func (s *StateSyncer) Run() {
panic("ClusterSize not set") panic("ClusterSize not set")
} }
stagger := func(d time.Duration) time.Duration {
f := scaleFactor(s.ClusterSize())
return lib.RandomStagger(time.Duration(f) * d)
}
FullSync: FullSync:
for { for {
// attempt a full sync // attempt a full sync
@ -141,14 +129,14 @@ FullSync:
// stagger the delay to avoid a thundering herd. // stagger the delay to avoid a thundering herd.
case <-s.SyncFull.Notif(): case <-s.SyncFull.Notif():
select { select {
case <-time.After(s.stagger(s.serverUpInterval)): case <-time.After(stagger(serverUpIntv)):
case <-s.ShutdownCh: case <-s.ShutdownCh:
return return
} }
// retry full sync after some time // retry full sync after some time
// todo(fs): why don't we use s.Interval here? // todo(fs): why don't we use s.Interval here?
case <-time.After(s.retryFailInterval + s.stagger(s.retryFailInterval)): case <-time.After(retryFailIntv + stagger(retryFailIntv)):
case <-s.ShutdownCh: case <-s.ShutdownCh:
return return
@ -165,14 +153,14 @@ FullSync:
// stagger the delay to avoid a thundering herd. // stagger the delay to avoid a thundering herd.
case <-s.SyncFull.Notif(): case <-s.SyncFull.Notif():
select { select {
case <-time.After(s.stagger(s.serverUpInterval)): case <-time.After(stagger(serverUpIntv)):
continue FullSync continue FullSync
case <-s.ShutdownCh: case <-s.ShutdownCh:
return return
} }
// time for a full sync again // time for a full sync again
case <-time.After(s.Interval + s.stagger(s.Interval)): case <-time.After(s.Interval + stagger(s.Interval)):
continue FullSync continue FullSync
// do partial syncs on demand // do partial syncs on demand

Loading…
Cancel
Save