mirror of https://github.com/hashicorp/consul
ae: make stagger function pluggable for testing
parent
e2452efed8
commit
11e172d1e9
|
@ -53,7 +53,7 @@ type StateSyncer struct {
|
|||
// State contains the data that needs to be synchronized.
|
||||
State State
|
||||
|
||||
// Interval is the time between two regular sync runs.
|
||||
// Interval is the time between two full sync runs.
|
||||
Interval time.Duration
|
||||
|
||||
// ShutdownCh is closed when the application is shutting down.
|
||||
|
@ -78,17 +78,16 @@ type StateSyncer struct {
|
|||
// paused stores whether sync runs are temporarily disabled.
|
||||
pauseLock sync.Mutex
|
||||
paused int
|
||||
}
|
||||
|
||||
func NewStateSyner(state State, intv time.Duration, shutdownCh chan struct{}, logger *log.Logger) *StateSyncer {
|
||||
return &StateSyncer{
|
||||
State: state,
|
||||
Interval: intv,
|
||||
ShutdownCh: shutdownCh,
|
||||
Logger: logger,
|
||||
SyncFull: NewTrigger(),
|
||||
SyncChanges: NewTrigger(),
|
||||
}
|
||||
// stagger randomly picks a duration between 0s and the given duration.
|
||||
stagger func(time.Duration) time.Duration
|
||||
|
||||
// serverUpInterval is the max time after which a full sync is
|
||||
// performed when a server has been added to the cluster.
|
||||
serverUpInterval time.Duration
|
||||
|
||||
// retryFailInterval is the time after which a failed full sync is retried.
|
||||
retryFailInterval time.Duration
|
||||
}
|
||||
|
||||
const (
|
||||
|
@ -100,6 +99,24 @@ const (
|
|||
retryFailIntv = 15 * time.Second
|
||||
)
|
||||
|
||||
func NewStateSyner(state State, intv time.Duration, shutdownCh chan struct{}, logger *log.Logger) *StateSyncer {
|
||||
s := &StateSyncer{
|
||||
State: state,
|
||||
Interval: intv,
|
||||
ShutdownCh: shutdownCh,
|
||||
Logger: logger,
|
||||
SyncFull: NewTrigger(),
|
||||
SyncChanges: NewTrigger(),
|
||||
serverUpInterval: serverUpIntv,
|
||||
retryFailInterval: retryFailIntv,
|
||||
}
|
||||
s.stagger = func(d time.Duration) time.Duration {
|
||||
f := scaleFactor(s.ClusterSize())
|
||||
return lib.RandomStagger(time.Duration(f) * d)
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
var errPaused = errors.New("paused")
|
||||
|
||||
// Run is the long running method to perform state synchronization
|
||||
|
@ -109,11 +126,6 @@ func (s *StateSyncer) Run() {
|
|||
panic("ClusterSize not set")
|
||||
}
|
||||
|
||||
stagger := func(d time.Duration) time.Duration {
|
||||
f := scaleFactor(s.ClusterSize())
|
||||
return lib.RandomStagger(time.Duration(f) * d)
|
||||
}
|
||||
|
||||
FullSync:
|
||||
for {
|
||||
// attempt a full sync
|
||||
|
@ -129,14 +141,14 @@ FullSync:
|
|||
// stagger the delay to avoid a thundering herd.
|
||||
case <-s.SyncFull.Notif():
|
||||
select {
|
||||
case <-time.After(stagger(serverUpIntv)):
|
||||
case <-time.After(s.stagger(s.serverUpInterval)):
|
||||
case <-s.ShutdownCh:
|
||||
return
|
||||
}
|
||||
|
||||
// retry full sync after some time
|
||||
// todo(fs): why don't we use s.Interval here?
|
||||
case <-time.After(retryFailIntv + stagger(retryFailIntv)):
|
||||
case <-time.After(s.retryFailInterval + s.stagger(s.retryFailInterval)):
|
||||
|
||||
case <-s.ShutdownCh:
|
||||
return
|
||||
|
@ -153,14 +165,14 @@ FullSync:
|
|||
// stagger the delay to avoid a thundering herd.
|
||||
case <-s.SyncFull.Notif():
|
||||
select {
|
||||
case <-time.After(stagger(serverUpIntv)):
|
||||
case <-time.After(s.stagger(s.serverUpInterval)):
|
||||
continue FullSync
|
||||
case <-s.ShutdownCh:
|
||||
return
|
||||
}
|
||||
|
||||
// time for a full sync again
|
||||
case <-time.After(s.Interval + stagger(s.Interval)):
|
||||
case <-time.After(s.Interval + s.stagger(s.Interval)):
|
||||
continue FullSync
|
||||
|
||||
// do partial syncs on demand
|
||||
|
|
Loading…
Reference in New Issue