From 86342e4bca8c553babb87008714c308cb4be8e31 Mon Sep 17 00:00:00 2001 From: freddygv Date: Mon, 13 Apr 2020 10:33:37 -0600 Subject: [PATCH] Fix bug in CheckConnectServiceNodes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, if a blocking query called CheckConnectServiceNodes before the gateway-services memdb table had any entries, a nil watchCh would be returned when calling serviceTerminatingGatewayNodes. This means that the blocking query would not fire if a gateway config entry was added after the watch started. In cases where the blocking query started on proxy registration, the proxy could potentially never become aware of an upstream endpoint if that upstream was going to be represented by a gateway. --- agent/consul/state/catalog.go | 2 ++ agent/consul/state/catalog_test.go | 37 ++++++++++++++++-------------- 2 files changed, 22 insertions(+), 17 deletions(-) diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 129fd9916a..11723d489d 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -2022,6 +2022,7 @@ func (s *Store) checkServiceNodesTxn(tx *memdb.Txn, ws memdb.WatchSet, serviceNa // Gateways are tracked in a separate table, and we append them to the result set. // We append rather than replace since it allows users to migrate a service // to the mesh with a mix of sidecars and gateways until all its instances have a sidecar. + var gatewayChan <-chan struct{} if connect { // Look up gateway nodes associated with the service _, nodes, _, err := s.serviceGatewayNodes(tx, serviceName, structs.ServiceKindTerminatingGateway, entMeta) @@ -2091,6 +2092,7 @@ func (s *Store) checkServiceNodesTxn(tx *memdb.Txn, ws memdb.WatchSet, serviceNa fallbackWS = ws // We also need to watch the iterator from earlier too. fallbackWS.Add(iter.WatchCh()) + fallbackWS.Add(gatewayChan) } else if connect { // If this is a connect query then there is a subtlety to watch out for. // In addition to watching the proxy service indexes for changes above, we diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index 485966bf88..453aaba501 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -3566,23 +3566,14 @@ func TestStateStore_CheckConnectServiceNodes_Gateways(t *testing.T) { assert.Nil(s.EnsureService(14, "bar", &structs.NodeService{ID: "db2", Service: "db", Tags: []string{"replica"}, Address: "", Port: 8001})) assert.False(watchFired(ws)) - // Register a sidecar and a gateway for db - assert.Nil(s.EnsureService(15, "foo", &structs.NodeService{Kind: structs.ServiceKindConnectProxy, ID: "proxy", Service: "proxy", Proxy: structs.ConnectProxyConfig{DestinationServiceName: "db"}, Port: 8000})) - assert.True(watchFired(ws)) + // Register node and service checks + testRegisterCheck(t, s, 15, "foo", "", "check1", api.HealthPassing) + testRegisterCheck(t, s, 16, "bar", "", "check2", api.HealthPassing) + testRegisterCheck(t, s, 17, "foo", "db", "check3", api.HealthPassing) + assert.False(watchFired(ws)) - assert.Nil(s.EnsureService(16, "bar", &structs.NodeService{Kind: structs.ServiceKindTerminatingGateway, ID: "gateway", Service: "gateway", Port: 443})) - assert.True(watchFired(ws)) - - // Register node checks - testRegisterCheck(t, s, 17, "foo", "", "check1", api.HealthPassing) - testRegisterCheck(t, s, 18, "bar", "", "check2", api.HealthPassing) - - // Register checks against the services. - testRegisterCheck(t, s, 19, "foo", "db", "check3", api.HealthPassing) - testRegisterCheck(t, s, 20, "bar", "gateway", "check4", api.HealthPassing) - - // Associate gateway with db - assert.Nil(s.EnsureConfigEntry(21, &structs.TerminatingGatewayConfigEntry{ + // Watch should fire when a gateway is associated with the service, even if the gateway doesn't exist yet + assert.Nil(s.EnsureConfigEntry(18, &structs.TerminatingGatewayConfigEntry{ Kind: "terminating-gateway", Name: "gateway", Services: []structs.LinkedService{ @@ -3593,11 +3584,23 @@ func TestStateStore_CheckConnectServiceNodes_Gateways(t *testing.T) { }, nil)) assert.True(watchFired(ws)) + // Watch should fire when a gateway is added + assert.Nil(s.EnsureService(19, "bar", &structs.NodeService{Kind: structs.ServiceKindTerminatingGateway, ID: "gateway", Service: "gateway", Port: 443})) + assert.True(watchFired(ws)) + + // Watch should fire when a check is added to the gateway + testRegisterCheck(t, s, 20, "bar", "gateway", "check4", api.HealthPassing) + assert.True(watchFired(ws)) + + // Watch should fire when a different connect service is registered for db + assert.Nil(s.EnsureService(21, "foo", &structs.NodeService{Kind: structs.ServiceKindConnectProxy, ID: "proxy", Service: "proxy", Proxy: structs.ConnectProxyConfig{DestinationServiceName: "db"}, Port: 8000})) + assert.True(watchFired(ws)) + // Read everything back. ws = memdb.NewWatchSet() idx, nodes, err = s.CheckConnectServiceNodes(ws, "db", nil) assert.Nil(err) - assert.Equal(idx, uint64(20)) + assert.Equal(idx, uint64(21)) assert.Len(nodes, 2) // Check sidecar