From cbaf97bced8e791dce9792bfbadb46194d88f2b3 Mon Sep 17 00:00:00 2001 From: Frank Schroeder Date: Mon, 28 Aug 2017 14:17:10 +0200 Subject: [PATCH] agent: refactor sync loop to linear flow of control --- agent/ae/ae.go | 105 +++++++++++++++++++++++++------------------------ 1 file changed, 53 insertions(+), 52 deletions(-) diff --git a/agent/ae/ae.go b/agent/ae/ae.go index 78bf93e251..f3b6745036 100644 --- a/agent/ae/ae.go +++ b/agent/ae/ae.go @@ -93,66 +93,67 @@ func (s *StateSyncer) Run() { Sync: for { - // update the sync status - err := s.State.UpdateSyncState() - if err == nil { - break - } - - s.Logger.Printf("[ERR] agent: failed to sync remote state: %v", err) + switch err := s.State.UpdateSyncState(); { - // retry updating sync status after some time or when a consul - // server was added. - select { + // update sync status failed + case err != nil: + s.Logger.Printf("[ERR] agent: failed to sync remote state: %v", err) - // consul server added to cluster. - // retry sooner than retryFailIntv to converge cluster quicker - // but stagger delay to avoid thundering herd - case <-s.ServerUpCh: + // retry updating sync status after some time or when a consul + // server was added. select { - case <-time.After(stagger(serverUpIntv)): - case <-s.ShutdownCh: - return - } - - // retry full sync after some time - // todo(fs): why don't we use s.Interval here? - case <-time.After(retryFailIntv + stagger(retryFailIntv)): - case <-s.ShutdownCh: - return - } - } + // consul server added to cluster. + // retry sooner than retryFailIntv to converge cluster sooner + // but stagger delay to avoid thundering herd + case <-s.ServerUpCh: + select { + case <-time.After(stagger(serverUpIntv)): + case <-s.ShutdownCh: + return + } - // Force-trigger sync to pickup any changes - s.triggerSync() + // retry full sync after some time + // todo(fs): why don't we use s.Interval here? + case <-time.After(retryFailIntv + stagger(retryFailIntv)): - // Wait for sync events - for { - select { - // todo(fs): why don't we honor the ServerUpCh here as well? - // todo(fs): by default, s.Interval is 60s which is >> 3s (serverUpIntv) - // case <-s.ServerUpCh: - // select { - // case <-time.After(stagger(serverUpIntv)): - // continue Sync - // case <-s.ShutdownCh: - // return - // } - - case <-time.After(s.Interval + stagger(s.Interval)): - goto Sync - - case <-s.TriggerCh: - if s.Paused() { - continue - } - if err := s.State.SyncChanges(); err != nil { - s.Logger.Printf("[ERR] agent: failed to sync changes: %v", err) + case <-s.ShutdownCh: + return } - case <-s.ShutdownCh: - return + // update sync status OK + default: + // force-trigger sync to pickup any changes + s.triggerSync() + + // do partial syncs until it is time for a full sync again + for { + select { + // todo(fs): why don't we honor the ServerUpCh here as well? + // todo(fs): by default, s.Interval is 60s which is >> 3s (serverUpIntv) + // case <-s.ServerUpCh: + // select { + // case <-time.After(stagger(serverUpIntv)): + // continue Sync + // case <-s.ShutdownCh: + // return + // } + + case <-time.After(s.Interval + stagger(s.Interval)): + continue Sync + + case <-s.TriggerCh: + if s.Paused() { + continue + } + if err := s.State.SyncChanges(); err != nil { + s.Logger.Printf("[ERR] agent: failed to sync changes: %v", err) + } + + case <-s.ShutdownCh: + return + } + } } } }