diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index b6eb4aa09c..4452298b76 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -955,22 +955,43 @@ func (s *Store) ServicesByNodeMeta(ws memdb.WatchSet, filters map[string]string) // * block until an instance for this service is available, or another // service is unregistered. func maxIndexForService(tx *memdb.Txn, serviceName string, serviceExists, checks bool) uint64 { + idx, _ := maxIndexAndWatchChForService(tx, serviceName, serviceExists, checks) + return idx +} + +// maxIndexAndWatchChForService return the maximum Raft Index for a service. If +// the index is not set for the service, it will return the missing service +// index. The service_last_extinction is set to the last raft index when a +// service was unregistered (or 0 if no services were ever unregistered). This +// allows blocking queries to +// * return when the last instance of a service is removed +// * block until an instance for this service is available, or another +// service is unregistered. +// +// It also _may_ return a watch chan to add to a WatchSet. It will only return +// one if the service exists, and has a service index. If it doesn't then nil is +// returned for the chan. This allows for blocking watchers to _only_ watch this +// one chan in the common case, falling back to watching all touched MemDB +// indexes in more complicated cases. +func maxIndexAndWatchChForService(tx *memdb.Txn, serviceName string, serviceExists, checks bool) (uint64, <-chan struct{}) { if !serviceExists { res, err := tx.First("index", "id", serviceLastExtinctionIndexName) if missingIdx, ok := res.(*IndexEntry); ok && err == nil { - return missingIdx.Value + // Not safe to only watch the extinction index as it's not updated when + // new instances come along so return nil watchCh. + return missingIdx.Value, nil } } - res, err := tx.First("index", "id", serviceIndexName(serviceName)) + ch, res, err := tx.FirstWatch("index", "id", serviceIndexName(serviceName)) if idx, ok := res.(*IndexEntry); ok && err == nil { - return idx.Value + return idx.Value, ch } if checks { - return maxIndexTxn(tx, "nodes", "services", "checks") + return maxIndexTxn(tx, "nodes", "services", "checks"), nil } - return maxIndexTxn(tx, "nodes", "services") + return maxIndexTxn(tx, "nodes", "services"), nil } // ConnectServiceNodes returns the nodes associated with a Connect @@ -1870,7 +1891,8 @@ func (s *Store) checkServiceNodes(ws memdb.WatchSet, serviceName string, connect if err != nil { return 0, nil, fmt.Errorf("failed service lookup: %s", err) } - ws.Add(iter.WatchCh()) + // Note we decide if we want to watch this iterator or not down below. We need + // to see if it returned anything first. // Return the results. var results structs.ServiceNodes @@ -1879,9 +1901,31 @@ func (s *Store) checkServiceNodes(ws memdb.WatchSet, serviceName string, connect } // Get the table index. - idx := maxIndexForService(tx, serviceName, len(results) > 0, true) + idx, ch := maxIndexAndWatchChForService(tx, serviceName, len(results) > 0, true) + + // Create a nil watchset to pass below, we'll only pass the real one if we + // need to. Nil watchers are safe/allowed and saves some allocation too. + var fallbackWS memdb.WatchSet + if ch == nil { + // There was no explicit channel returned that corresponds to the service + // index. That means we need to fallback to watching everything we touch in + // the DB as normal. We plumb the caller's watchset through (note it's a map + // so this is a by-reference assignment.) + fallbackWS = ws + // We also need to watch the iterator from earlier too. + fallbackWS.Add(iter.WatchCh()) + } else { + // There was a valid service index, and non-empty result. In this case it is + // sufficient just to watch the service index's chan since that _must_ be + // written to if the result of this method is going to change. This saves us + // watching potentially thousands of watch chans for large services which + // may need many goroutines. It also avoid the performance cliff that is hit + // when watchLimit is hit (~682 service instances). See + // https://github.com/hashicorp/consul/issues/4984 + ws.Add(ch) + } - return s.parseCheckServiceNodes(tx, ws, idx, serviceName, results, err) + return s.parseCheckServiceNodes(tx, fallbackWS, idx, serviceName, results, err) } // CheckServiceTagNodes is used to query all nodes and checks for a given diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index 8eca00c5b6..ed29ebd040 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -3021,43 +3021,11 @@ func TestStateStore_CheckServiceNodes(t *testing.T) { t.Fatalf("bad") } - // Overwhelm node and check tracking. - idx = 13 - for i := 0; i < 2*watchLimit; i++ { - node := fmt.Sprintf("many%d", i) - testRegisterNode(t, s, idx, node) - idx++ - testRegisterCheck(t, s, idx, node, "", "check1", api.HealthPassing) - idx++ - testRegisterService(t, s, idx, node, "service1") - idx++ - testRegisterCheck(t, s, idx, node, "service1", "check2", api.HealthPassing) - idx++ - } - - // Now registering an unrelated node will fire the watch. - ws = memdb.NewWatchSet() - idx, results, err = s.CheckServiceNodes(ws, "service1") - if err != nil { - t.Fatalf("err: %s", err) - } - testRegisterNode(t, s, idx, "more-nope") - idx++ - if !watchFired(ws) { - t.Fatalf("bad") - } - - // Also, registering an unrelated check will fire the watch. - ws = memdb.NewWatchSet() - idx, results, err = s.CheckServiceNodes(ws, "service1") - if err != nil { - t.Fatalf("err: %s", err) - } - testRegisterCheck(t, s, idx, "more-nope", "", "check1", api.HealthPassing) - idx++ - if !watchFired(ws) { - t.Fatalf("bad") - } + // Note that we can't overwhelm chan tracking any more since we optimized it + // to only need to watch one chan in the happy path. The only path that does + // bees to watch more stuff is where there are no service instances which also + // means fewer than watchLimit chans too so effectively no way to trigger + // Fallback watch any more. } func TestStateStore_CheckConnectServiceNodes(t *testing.T) {