Browse Source

agent: ensure node info sync and full sync. (#7189)

This fixes #7020.

There are two problems this PR solves:
  * if the node info changes it is highly likely to get service and check registration permission errors unless those service tokens have node:write. Hopefully services you register don’t have this permission.
  * the timer for a full sync gets reset for every partial sync which means that many partial syncs are preventing a full sync from happening

Instead of syncing node info last, after services and checks, and possibly saving one RPC because it is included in every service sync, I am syncing node info first. It is only ever going to be a single RPC that we are only doing when node info has changed. This way we are guaranteed to sync node info even when something goes wrong with services or checks which is more likely because there are more syncs happening for them.
pull/4855/head
Hans Hasselberg 5 years ago committed by GitHub
parent
commit
6a18f01b42
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 22
      agent/ae/ae.go
  2. 1
      agent/ae/ae_test.go
  3. 18
      agent/local/state.go

22
agent/ae/ae.go

@ -96,6 +96,10 @@ type StateSyncer struct {
// syncChangesEvent generates an event based on multiple conditions
// when the state machine is performing partial state syncs.
syncChangesEvent func() event
// nextFullSyncCh is a chan that receives a time.Time when the next
// full sync should occur.
nextFullSyncCh <-chan time.Time
}
const (
@ -148,6 +152,7 @@ func (s *StateSyncer) Run() {
if s.ClusterSize == nil {
panic("ClusterSize not set")
}
s.resetNextFullSyncCh()
s.runFSM(fullSyncState, s.nextFSMState)
}
@ -245,8 +250,9 @@ func (s *StateSyncer) retrySyncFullEventFn() event {
}
// retry full sync after some time
// todo(fs): why don't we use s.Interval here?
// it is using retryFailInterval because it is retrying the sync
case <-time.After(s.retryFailInterval + s.stagger(s.retryFailInterval)):
s.resetNextFullSyncCh()
return syncFullTimerEvent
case <-s.ShutdownCh:
@ -266,13 +272,15 @@ func (s *StateSyncer) syncChangesEventFn() event {
case <-s.SyncFull.Notif():
select {
case <-time.After(s.stagger(s.serverUpInterval)):
s.resetNextFullSyncCh()
return syncFullNotifEvent
case <-s.ShutdownCh:
return shutdownEvent
}
// time for a full sync again
case <-time.After(s.Interval + s.stagger(s.Interval)):
case <-s.nextFullSyncCh:
s.resetNextFullSyncCh()
return syncFullTimerEvent
// do partial syncs on demand
@ -284,6 +292,16 @@ func (s *StateSyncer) syncChangesEventFn() event {
}
}
// resetNextFullSyncCh resets nextFullSyncCh and sets it to interval+stagger.
// Call this function everytime a full sync is performed.
func (s *StateSyncer) resetNextFullSyncCh() {
if s.stagger != nil {
s.nextFullSyncCh = time.After(s.Interval + s.stagger(s.Interval))
} else {
s.nextFullSyncCh = time.After(s.Interval)
}
}
// stubbed out for testing
var libRandomStagger = lib.RandomStagger

1
agent/ae/ae_test.go

@ -400,5 +400,6 @@ func testSyncer(t *testing.T) *StateSyncer {
l := NewStateSyncer(nil, time.Second, nil, logger)
l.stagger = func(d time.Duration) time.Duration { return d }
l.ClusterSize = func() int { return 1 }
l.resetNextFullSyncCh()
return l
}

18
agent/local/state.go

@ -1023,6 +1023,15 @@ func (l *State) SyncChanges() error {
l.Lock()
defer l.Unlock()
// Sync the node level info if we need to.
if l.nodeInfoInSync {
l.logger.Debug("Node info in sync")
} else {
if err := l.syncNodeInfo(); err != nil {
return err
}
}
// We will do node-level info syncing at the end, since it will get
// updated by a service or check sync anyway, given how the register
// API works.
@ -1064,15 +1073,8 @@ func (l *State) SyncChanges() error {
return err
}
}
// Now sync the node level info if we need to, and didn't do any of
// the other sync operations.
if l.nodeInfoInSync {
l.logger.Debug("Node info in sync")
return nil
}
return l.syncNodeInfo()
}
// deleteService is used to delete a service from the server
func (l *State) deleteService(key structs.ServiceID) error {
@ -1204,6 +1206,7 @@ func (l *State) syncService(key structs.ServiceID) error {
Service: l.services[key].Service,
EnterpriseMeta: key.EnterpriseMeta,
WriteRequest: structs.WriteRequest{Token: st},
SkipNodeUpdate: l.nodeInfoInSync,
}
// Backwards-compatibility for Consul < 0.5
@ -1266,6 +1269,7 @@ func (l *State) syncCheck(key structs.CheckID) error {
Check: c.Check,
EnterpriseMeta: c.Check.EnterpriseMeta,
WriteRequest: structs.WriteRequest{Token: ct},
SkipNodeUpdate: l.nodeInfoInSync,
}
var serviceKey structs.ServiceID

Loading…
Cancel
Save