ae: make stagger function pluggable for testing

pull/3585/head
Frank Schroeder 7 years ago committed by Frank Schröder
parent 126046be23
commit 066ad01c38

@ -53,7 +53,7 @@ type StateSyncer struct {
// State contains the data that needs to be synchronized.
State State
// Interval is the time between two regular sync runs.
// Interval is the time between two full sync runs.
Interval time.Duration
// ShutdownCh is closed when the application is shutting down.
@ -78,17 +78,16 @@ type StateSyncer struct {
// paused stores whether sync runs are temporarily disabled.
pauseLock sync.Mutex
paused int
}
func NewStateSyner(state State, intv time.Duration, shutdownCh chan struct{}, logger *log.Logger) *StateSyncer {
return &StateSyncer{
State: state,
Interval: intv,
ShutdownCh: shutdownCh,
Logger: logger,
SyncFull: NewTrigger(),
SyncChanges: NewTrigger(),
}
// stagger randomly picks a duration between 0s and the given duration.
stagger func(time.Duration) time.Duration
// serverUpInterval is the max time after which a full sync is
// performed when a server has been added to the cluster.
serverUpInterval time.Duration
// retryFailInterval is the time after which a failed full sync is retried.
retryFailInterval time.Duration
}
const (
@ -100,6 +99,24 @@ const (
retryFailIntv = 15 * time.Second
)
func NewStateSyner(state State, intv time.Duration, shutdownCh chan struct{}, logger *log.Logger) *StateSyncer {
s := &StateSyncer{
State: state,
Interval: intv,
ShutdownCh: shutdownCh,
Logger: logger,
SyncFull: NewTrigger(),
SyncChanges: NewTrigger(),
serverUpInterval: serverUpIntv,
retryFailInterval: retryFailIntv,
}
s.stagger = func(d time.Duration) time.Duration {
f := scaleFactor(s.ClusterSize())
return lib.RandomStagger(time.Duration(f) * d)
}
return s
}
var errPaused = errors.New("paused")
// Run is the long running method to perform state synchronization
@ -109,11 +126,6 @@ func (s *StateSyncer) Run() {
panic("ClusterSize not set")
}
stagger := func(d time.Duration) time.Duration {
f := scaleFactor(s.ClusterSize())
return lib.RandomStagger(time.Duration(f) * d)
}
FullSync:
for {
// attempt a full sync
@ -129,14 +141,14 @@ FullSync:
// stagger the delay to avoid a thundering herd.
case <-s.SyncFull.Notif():
select {
case <-time.After(stagger(serverUpIntv)):
case <-time.After(s.stagger(s.serverUpInterval)):
case <-s.ShutdownCh:
return
}
// retry full sync after some time
// todo(fs): why don't we use s.Interval here?
case <-time.After(retryFailIntv + stagger(retryFailIntv)):
case <-time.After(s.retryFailInterval + s.stagger(s.retryFailInterval)):
case <-s.ShutdownCh:
return
@ -153,14 +165,14 @@ FullSync:
// stagger the delay to avoid a thundering herd.
case <-s.SyncFull.Notif():
select {
case <-time.After(stagger(serverUpIntv)):
case <-time.After(s.stagger(s.serverUpInterval)):
continue FullSync
case <-s.ShutdownCh:
return
}
// time for a full sync again
case <-time.After(s.Interval + stagger(s.Interval)):
case <-time.After(s.Interval + s.stagger(s.Interval)):
continue FullSync
// do partial syncs on demand

Loading…
Cancel
Save