From 26a155eb4119aabcad5f676a4a4e050599817083 Mon Sep 17 00:00:00 2001 From: Frank Schroeder Date: Mon, 23 Oct 2017 10:08:35 +0200 Subject: [PATCH] Revert "agent: refactor sync loop to linear flow of control" This reverts commit 7a2af206ea964fc0846f9b80c10ea9d91cb3c99e. --- agent/ae/ae.go | 105 ++++++++++++++++++++++++------------------------- 1 file changed, 52 insertions(+), 53 deletions(-) diff --git a/agent/ae/ae.go b/agent/ae/ae.go index f3b6745036..78bf93e251 100644 --- a/agent/ae/ae.go +++ b/agent/ae/ae.go @@ -93,67 +93,66 @@ func (s *StateSyncer) Run() { Sync: for { - switch err := s.State.UpdateSyncState(); { - - // update sync status failed - case err != nil: - s.Logger.Printf("[ERR] agent: failed to sync remote state: %v", err) - - // retry updating sync status after some time or when a consul - // server was added. - select { + // update the sync status + err := s.State.UpdateSyncState() + if err == nil { + break + } - // consul server added to cluster. - // retry sooner than retryFailIntv to converge cluster sooner - // but stagger delay to avoid thundering herd - case <-s.ServerUpCh: - select { - case <-time.After(stagger(serverUpIntv)): - case <-s.ShutdownCh: - return - } + s.Logger.Printf("[ERR] agent: failed to sync remote state: %v", err) - // retry full sync after some time - // todo(fs): why don't we use s.Interval here? - case <-time.After(retryFailIntv + stagger(retryFailIntv)): + // retry updating sync status after some time or when a consul + // server was added. + select { + // consul server added to cluster. + // retry sooner than retryFailIntv to converge cluster quicker + // but stagger delay to avoid thundering herd + case <-s.ServerUpCh: + select { + case <-time.After(stagger(serverUpIntv)): case <-s.ShutdownCh: return } - // update sync status OK - default: - // force-trigger sync to pickup any changes - s.triggerSync() - - // do partial syncs until it is time for a full sync again - for { - select { - // todo(fs): why don't we honor the ServerUpCh here as well? - // todo(fs): by default, s.Interval is 60s which is >> 3s (serverUpIntv) - // case <-s.ServerUpCh: - // select { - // case <-time.After(stagger(serverUpIntv)): - // continue Sync - // case <-s.ShutdownCh: - // return - // } - - case <-time.After(s.Interval + stagger(s.Interval)): - continue Sync - - case <-s.TriggerCh: - if s.Paused() { - continue - } - if err := s.State.SyncChanges(); err != nil { - s.Logger.Printf("[ERR] agent: failed to sync changes: %v", err) - } - - case <-s.ShutdownCh: - return - } + // retry full sync after some time + // todo(fs): why don't we use s.Interval here? + case <-time.After(retryFailIntv + stagger(retryFailIntv)): + + case <-s.ShutdownCh: + return + } + } + + // Force-trigger sync to pickup any changes + s.triggerSync() + + // Wait for sync events + for { + select { + // todo(fs): why don't we honor the ServerUpCh here as well? + // todo(fs): by default, s.Interval is 60s which is >> 3s (serverUpIntv) + // case <-s.ServerUpCh: + // select { + // case <-time.After(stagger(serverUpIntv)): + // continue Sync + // case <-s.ShutdownCh: + // return + // } + + case <-time.After(s.Interval + stagger(s.Interval)): + goto Sync + + case <-s.TriggerCh: + if s.Paused() { + continue + } + if err := s.State.SyncChanges(); err != nil { + s.Logger.Printf("[ERR] agent: failed to sync changes: %v", err) } + + case <-s.ShutdownCh: + return } } }