ae: ensure that syncs are blocked when paused

pull/3609/head
Frank Schroeder 7 years ago
parent 58d52ac580
commit aba072bd1d
No known key found for this signature in database
GPG Key ID: 4D65C6EAEC87DECD

@ -2,9 +2,10 @@
package ae
import (
"errors"
"log"
"math"
"sync/atomic"
"sync"
"time"
"github.com/hashicorp/consul/lib"
@ -75,7 +76,8 @@ type StateSyncer struct {
SyncChanges *Trigger
// paused stores whether sync runs are temporarily disabled.
paused *toggle
pauseLock sync.Mutex
paused bool
}
func NewStateSyner(state State, intv time.Duration, shutdownCh chan struct{}, logger *log.Logger) *StateSyncer {
@ -86,7 +88,6 @@ func NewStateSyner(state State, intv time.Duration, shutdownCh chan struct{}, lo
Logger: logger,
SyncFull: NewTrigger(),
SyncChanges: NewTrigger(),
paused: new(toggle),
}
}
@ -99,6 +100,8 @@ const (
retryFailIntv = 15 * time.Second
)
var errPaused = errors.New("paused")
// Run is the long running method to perform state synchronization
// between local and remote servers.
func (s *StateSyncer) Run() {
@ -114,13 +117,13 @@ func (s *StateSyncer) Run() {
FullSync:
for {
// attempt a full sync
if err := s.State.SyncFull(); err != nil {
s.Logger.Printf("[ERR] agent: failed to sync remote state: %v", err)
err := s.ifNotPausedRun(s.State.SyncFull)
if err != nil {
if err != errPaused {
s.Logger.Printf("[ERR] agent: failed to sync remote state: %v", err)
}
// retry full sync after some time or when a consul
// server was added.
select {
// trigger a full sync immediately.
// this is usually called when a consul server was added to the cluster.
// stagger the delay to avoid a thundering herd.
@ -162,10 +165,8 @@ FullSync:
// do partial syncs on demand
case <-s.SyncChanges.Notif():
if s.Paused() {
continue
}
if err := s.State.SyncChanges(); err != nil {
err := s.ifNotPausedRun(s.State.SyncChanges)
if err != nil && err != errPaused {
s.Logger.Printf("[ERR] agent: failed to sync changes: %v", err)
}
@ -176,40 +177,39 @@ FullSync:
}
}
func (s *StateSyncer) ifNotPausedRun(f func() error) error {
s.pauseLock.Lock()
defer s.pauseLock.Unlock()
if s.paused {
return errPaused
}
return f()
}
// Pause temporarily disables sync runs.
func (s *StateSyncer) Pause() {
s.paused.On()
s.pauseLock.Lock()
if s.paused {
panic("pause while paused")
}
s.paused = true
s.pauseLock.Unlock()
}
// Paused returns whether sync runs are temporarily disabled.
func (s *StateSyncer) Paused() bool {
return s.paused.IsOn()
s.pauseLock.Lock()
defer s.pauseLock.Unlock()
return s.paused
}
// Resume re-enables sync runs.
func (s *StateSyncer) Resume() {
s.paused.Off()
s.SyncChanges.Trigger()
}
// toggle implements an on/off switch using methods from the atomic
// package. Since fields in structs that are accessed via
// atomic.Load/Add methods need to be aligned properly on some platforms
// we move that code into a separate struct.
//
// See https://golang.org/pkg/sync/atomic/#pkg-note-BUG for details
type toggle int32
func (p *toggle) On() {
atomic.AddInt32((*int32)(p), 1)
}
func (p *toggle) Off() {
if atomic.AddInt32((*int32)(p), -1) < 0 {
panic("toggle not on")
s.pauseLock.Lock()
if !s.paused {
panic("resume while not paused")
}
}
func (p *toggle) IsOn() bool {
return atomic.LoadInt32((*int32)(p)) > 0
s.paused = false
s.pauseLock.Unlock()
s.SyncChanges.Trigger()
}

Loading…
Cancel
Save