mirror of https://github.com/hashicorp/consul
reset `coalesceTimer` to nil as soon as the event is consumed (#11924)
* reset `coalesceTimer` to nil as soon as the event is consumed * add change log * refactor to add relevant test. * fix linter * Apply suggestions from code review Co-authored-by: Freddy <freddygv@users.noreply.github.com> * remove non needed check Co-authored-by: Freddy <freddygv@users.noreply.github.com>pull/11942/head
parent
0fdd1318e9
commit
e653f81919
|
@ -0,0 +1,3 @@
|
|||
```release-note:bug
|
||||
xds: fix a deadlock when the snapshot channel already have a snapshot to be consumed.
|
||||
```
|
|
@ -127,7 +127,7 @@ func (m *Manager) Run() error {
|
|||
defer m.State.StopNotify(stateCh)
|
||||
|
||||
for {
|
||||
m.syncState()
|
||||
m.syncState(m.notifyBroadcast)
|
||||
|
||||
// Wait for a state change
|
||||
_, ok := <-stateCh
|
||||
|
@ -140,7 +140,7 @@ func (m *Manager) Run() error {
|
|||
|
||||
// syncState is called whenever the local state notifies a change. It holds the
|
||||
// lock while finding any new or updated proxies and removing deleted ones.
|
||||
func (m *Manager) syncState() {
|
||||
func (m *Manager) syncState(notifyBroadcast func(ch <-chan ConfigSnapshot)) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
|
@ -160,7 +160,7 @@ func (m *Manager) syncState() {
|
|||
// know that so we'd need to set it here if not during registration of the
|
||||
// proxy service. Sidecar Service in the interim can do that, but we should
|
||||
// validate more generally that that is always true.
|
||||
err := m.ensureProxyServiceLocked(svc)
|
||||
err := m.ensureProxyServiceLocked(svc, notifyBroadcast)
|
||||
if err != nil {
|
||||
m.Logger.Error("failed to watch proxy service",
|
||||
"service", sid.String(),
|
||||
|
@ -179,7 +179,7 @@ func (m *Manager) syncState() {
|
|||
}
|
||||
|
||||
// ensureProxyServiceLocked adds or changes the proxy to our state.
|
||||
func (m *Manager) ensureProxyServiceLocked(ns *structs.NodeService) error {
|
||||
func (m *Manager) ensureProxyServiceLocked(ns *structs.NodeService, notifyBroadcast func(ch <-chan ConfigSnapshot)) error {
|
||||
sid := ns.CompoundServiceID()
|
||||
|
||||
// Retrieve the token used to register the service, or fallback to the
|
||||
|
@ -227,16 +227,18 @@ func (m *Manager) ensureProxyServiceLocked(ns *structs.NodeService) error {
|
|||
m.proxies[sid] = state
|
||||
|
||||
// Start a goroutine that will wait for changes and broadcast them to watchers.
|
||||
go func(ch <-chan ConfigSnapshot) {
|
||||
// Run until ch is closed
|
||||
for snap := range ch {
|
||||
m.notify(&snap)
|
||||
}
|
||||
}(ch)
|
||||
go notifyBroadcast(ch)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Manager) notifyBroadcast(ch <-chan ConfigSnapshot) {
|
||||
// Run until ch is closed
|
||||
for snap := range ch {
|
||||
m.notify(&snap)
|
||||
}
|
||||
}
|
||||
|
||||
// removeProxyService is called when a service deregisters and frees all
|
||||
// resources for that service.
|
||||
func (m *Manager) removeProxyServiceLocked(proxyID structs.ServiceID) {
|
||||
|
|
|
@ -598,7 +598,146 @@ func TestManager_SyncState_DefaultToken(t *testing.T) {
|
|||
|
||||
err = state.AddServiceWithChecks(srv, nil, "")
|
||||
require.NoError(t, err)
|
||||
m.syncState()
|
||||
m.syncState(m.notifyBroadcast)
|
||||
|
||||
require.Equal(t, "default-token", m.proxies[srv.CompoundServiceID()].serviceInstance.token)
|
||||
}
|
||||
|
||||
func TestManager_SyncState_No_Notify(t *testing.T) {
|
||||
types := NewTestCacheTypes(t)
|
||||
c := TestCacheWithTypes(t, types)
|
||||
logger := testutil.Logger(t)
|
||||
tokens := new(token.Store)
|
||||
tokens.UpdateUserToken("default-token", token.TokenSourceConfig)
|
||||
|
||||
state := local.NewState(local.Config{}, logger, tokens)
|
||||
state.TriggerSyncChanges = func() {}
|
||||
|
||||
m, err := NewManager(ManagerConfig{
|
||||
Cache: c,
|
||||
Health: &health.Client{Cache: c, CacheName: cachetype.HealthServicesName},
|
||||
State: state,
|
||||
Tokens: tokens,
|
||||
Source: &structs.QuerySource{Datacenter: "dc1"},
|
||||
Logger: logger,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
defer m.Close()
|
||||
|
||||
srv := &structs.NodeService{
|
||||
Kind: structs.ServiceKindConnectProxy,
|
||||
ID: "web-sidecar-proxy",
|
||||
Service: "web-sidecar-proxy",
|
||||
Port: 9999,
|
||||
Meta: map[string]string{},
|
||||
Proxy: structs.ConnectProxyConfig{
|
||||
DestinationServiceID: "web",
|
||||
DestinationServiceName: "web",
|
||||
LocalServiceAddress: "127.0.0.1",
|
||||
LocalServicePort: 8080,
|
||||
Config: map[string]interface{}{
|
||||
"foo": "bar",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
err = state.AddServiceWithChecks(srv, nil, "")
|
||||
require.NoError(t, err)
|
||||
|
||||
readEvent := make(chan bool, 1)
|
||||
snapSent := make(chan bool, 1)
|
||||
|
||||
m.syncState(func(ch <-chan ConfigSnapshot) {
|
||||
for {
|
||||
<-readEvent
|
||||
snap := <-ch
|
||||
m.notify(&snap)
|
||||
snapSent <- true
|
||||
}
|
||||
})
|
||||
|
||||
// Get the relevant notification Channel, should only have 1
|
||||
notifyCH := m.proxies[srv.CompoundServiceID()].ch
|
||||
|
||||
// update the leaf certs
|
||||
roots, issuedCert := TestCerts(t)
|
||||
notifyCH <- cache.UpdateEvent{
|
||||
CorrelationID: leafWatchID,
|
||||
Result: issuedCert,
|
||||
Err: nil,
|
||||
}
|
||||
// at this point the snapshot should not be valid and not be sent
|
||||
after := time.After(200 * time.Millisecond)
|
||||
select {
|
||||
case <-snapSent:
|
||||
t.Fatal("snap should not be valid")
|
||||
case <-after:
|
||||
|
||||
}
|
||||
|
||||
// update the root certs
|
||||
notifyCH <- cache.UpdateEvent{
|
||||
CorrelationID: rootsWatchID,
|
||||
Result: roots,
|
||||
Err: nil,
|
||||
}
|
||||
|
||||
// at this point the snapshot should not be valid and not be sent
|
||||
after = time.After(200 * time.Millisecond)
|
||||
select {
|
||||
case <-snapSent:
|
||||
t.Fatal("snap should not be valid")
|
||||
case <-after:
|
||||
|
||||
}
|
||||
|
||||
// prepare to read a snapshot update as the next update should make the snapshot valid
|
||||
readEvent <- true
|
||||
|
||||
// update the intentions
|
||||
notifyCH <- cache.UpdateEvent{
|
||||
CorrelationID: intentionsWatchID,
|
||||
Result: &structs.IndexedIntentionMatches{},
|
||||
Err: nil,
|
||||
}
|
||||
|
||||
// at this point we have a valid snapshot
|
||||
after = time.After(500 * time.Millisecond)
|
||||
select {
|
||||
case <-snapSent:
|
||||
case <-after:
|
||||
t.Fatal("snap should be valid")
|
||||
|
||||
}
|
||||
|
||||
// send two snapshots back to back without reading them to overflow the snapshot channel and get to the default use case
|
||||
for i := 0; i < 2; i++ {
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
notifyCH <- cache.UpdateEvent{
|
||||
CorrelationID: leafWatchID,
|
||||
Result: issuedCert,
|
||||
Err: nil,
|
||||
}
|
||||
}
|
||||
|
||||
// make sure that we are not receiving any snapshot and wait for the snapshots to be processed
|
||||
after = time.After(500 * time.Millisecond)
|
||||
select {
|
||||
case <-snapSent:
|
||||
t.Fatal("snap should not be sent")
|
||||
case <-after:
|
||||
}
|
||||
|
||||
// now make sure that both snapshots got propagated
|
||||
for i := 0; i < 2; i++ {
|
||||
|
||||
readEvent <- true
|
||||
after = time.After(500 * time.Millisecond)
|
||||
select {
|
||||
case <-snapSent:
|
||||
case <-after:
|
||||
t.Fatal("snap should be valid")
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -294,6 +294,8 @@ func (s *state) run(ctx context.Context, snap *ConfigSnapshot) {
|
|||
}
|
||||
|
||||
case <-sendCh:
|
||||
// Allow the next change to trigger a send
|
||||
coalesceTimer = nil
|
||||
// Make a deep copy of snap so we don't mutate any of the embedded structs
|
||||
// etc on future updates.
|
||||
snapCopy, err := snap.Clone()
|
||||
|
@ -307,9 +309,6 @@ func (s *state) run(ctx context.Context, snap *ConfigSnapshot) {
|
|||
case s.snapCh <- *snapCopy:
|
||||
s.logger.Trace("Delivered new snapshot to proxy config watchers")
|
||||
|
||||
// Allow the next change to trigger a send
|
||||
coalesceTimer = nil
|
||||
|
||||
// Skip rest of loop - there is nothing to send since nothing changed on
|
||||
// this iteration
|
||||
continue
|
||||
|
@ -320,11 +319,9 @@ func (s *state) run(ctx context.Context, snap *ConfigSnapshot) {
|
|||
s.logger.Trace("Failed to deliver new snapshot to proxy config watchers")
|
||||
|
||||
// Reset the timer to retry later. This is to ensure we attempt to redeliver the updated snapshot shortly.
|
||||
if coalesceTimer == nil {
|
||||
coalesceTimer = time.AfterFunc(coalesceTimeout, func() {
|
||||
sendCh <- struct{}{}
|
||||
})
|
||||
}
|
||||
coalesceTimer = time.AfterFunc(coalesceTimeout, func() {
|
||||
sendCh <- struct{}{}
|
||||
})
|
||||
|
||||
// Do not reset coalesceTimer since we just queued a timer-based refresh
|
||||
continue
|
||||
|
|
Loading…
Reference in New Issue