diff --git a/.changelog/14916.txt b/.changelog/14916.txt new file mode 100644 index 0000000000..2012424890 --- /dev/null +++ b/.changelog/14916.txt @@ -0,0 +1,3 @@ +```release-note:bug +server: fix goroutine/memory leaks in the xDS subsystem (these were present regardless of whether or not xDS was in-use) +``` diff --git a/agent/consul/xdscapacity/capacity.go b/agent/consul/xdscapacity/capacity.go index 2e24a09e0e..f1974b9641 100644 --- a/agent/consul/xdscapacity/capacity.go +++ b/agent/consul/xdscapacity/capacity.go @@ -79,7 +79,7 @@ func NewController(cfg Config) *Controller { func (c *Controller) Run(ctx context.Context) { defer close(c.doneCh) - ws, numProxies, err := c.countProxies(ctx) + watchCh, numProxies, err := c.countProxies(ctx) if err != nil { return } @@ -90,8 +90,8 @@ func (c *Controller) Run(ctx context.Context) { case s := <-c.serverCh: numServers = s c.updateMaxSessions(numServers, numProxies) - case <-ws.WatchCh(ctx): - ws, numProxies, err = c.countProxies(ctx) + case <-watchCh: + watchCh, numProxies, err = c.countProxies(ctx) if err != nil { return } @@ -170,7 +170,7 @@ func (c *Controller) updateMaxSessions(numServers, numProxies uint32) { // countProxies counts the number of registered proxy services, retrying on // error until the given context is cancelled. -func (c *Controller) countProxies(ctx context.Context) (memdb.WatchSet, uint32, error) { +func (c *Controller) countProxies(ctx context.Context) (<-chan error, uint32, error) { retryWaiter := &retry.Waiter{ MinFailures: 1, MinWait: 1 * time.Second, @@ -200,7 +200,7 @@ func (c *Controller) countProxies(ctx context.Context) (memdb.WatchSet, uint32, count += uint32(kindCount) } } - return ws, count, nil + return ws.WatchCh(ctx), count, nil } } diff --git a/agent/proxycfg-sources/catalog/config_source.go b/agent/proxycfg-sources/catalog/config_source.go index 56f222fcda..6a49e2812e 100644 --- a/agent/proxycfg-sources/catalog/config_source.go +++ b/agent/proxycfg-sources/catalog/config_source.go @@ -141,10 +141,17 @@ func (m *ConfigSource) startSync(closeCh <-chan chan struct{}, proxyID proxycfg. } syncLoop := func(ws memdb.WatchSet) { + // Cancel the context on return to clean up the goroutine started by WatchCh. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + for { select { - case <-ws.WatchCh(context.Background()): + case <-ws.WatchCh(ctx): // Something changed, unblock and re-run the query. + // + // It is expected that all other branches of this select will return and + // cancel the context given to WatchCh (to clean up its goroutine). case doneCh := <-closeCh: // All watchers of this service (xDS streams) have gone away, so it's time // to free its resources.