From 651549c97d38f75262b2fcce79b23d3a857404ed Mon Sep 17 00:00:00 2001 From: Dan Upton Date: Fri, 31 Mar 2023 13:24:19 +0100 Subject: [PATCH] storage: fix resource leak in Watch (#16817) --- agent/grpc-external/services/resource/watch.go | 1 + internal/storage/conformance/conformance.go | 2 ++ internal/storage/inmem/watch.go | 3 +++ internal/storage/storage.go | 5 ++++- 4 files changed, 10 insertions(+), 1 deletion(-) diff --git a/agent/grpc-external/services/resource/watch.go b/agent/grpc-external/services/resource/watch.go index 54372221b9..23be7177f5 100644 --- a/agent/grpc-external/services/resource/watch.go +++ b/agent/grpc-external/services/resource/watch.go @@ -24,6 +24,7 @@ func (s *Server) WatchList(req *pbresource.WatchListRequest, stream pbresource.R if err != nil { return err } + defer watch.Close() for { event, err := watch.Next(stream.Context()) diff --git a/internal/storage/conformance/conformance.go b/internal/storage/conformance/conformance.go index 56e49f1fea..a4ef0e85d7 100644 --- a/internal/storage/conformance/conformance.go +++ b/internal/storage/conformance/conformance.go @@ -452,6 +452,7 @@ func testListWatch(t *testing.T, opts TestOptions) { watch, err := backend.WatchList(ctx, tc.resourceType, tc.tenancy, tc.namePrefix) require.NoError(t, err) + t.Cleanup(watch.Close) for i := 0; i < len(tc.results); i++ { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) @@ -471,6 +472,7 @@ func testListWatch(t *testing.T, opts TestOptions) { watch, err := backend.WatchList(ctx, tc.resourceType, tc.tenancy, tc.namePrefix) require.NoError(t, err) + t.Cleanup(watch.Close) // Write the seed data after the watch has been established. for _, r := range seedData { diff --git a/internal/storage/inmem/watch.go b/internal/storage/inmem/watch.go index 9d61f7179d..b3e6132590 100644 --- a/internal/storage/inmem/watch.go +++ b/internal/storage/inmem/watch.go @@ -71,6 +71,9 @@ func (w *Watch) nextEvent(ctx context.Context) (*stream.Event, error) { } } +// Close the watch and free its associated resources. +func (w *Watch) Close() { w.sub.Unsubscribe() } + var eventTopic = stream.StringTopic("resources") type eventPayload struct { diff --git a/internal/storage/storage.go b/internal/storage/storage.go index f9b2e6748d..aa0331c392 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -269,10 +269,13 @@ type Backend interface { } // Watch represents a watch on a given set of resources. Call Next to get the -// next event (i.e. upsert or deletion). +// next event (i.e. upsert or deletion) and Close when you're done watching. type Watch interface { // Next returns the next event (i.e. upsert or deletion) Next(ctx context.Context) (*pbresource.WatchEvent, error) + + // Close the watch and free its associated resources. + Close() } // UnversionedType represents a pbresource.Type as it is stored without the