storage: fix resource leak in Watch (#16817)

pull/16835/head
Dan Upton 2023-03-31 13:24:19 +01:00 committed by GitHub
parent 99ba13b9fc
commit 651549c97d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 10 additions and 1 deletions

View File

@ -24,6 +24,7 @@ func (s *Server) WatchList(req *pbresource.WatchListRequest, stream pbresource.R
if err != nil {
return err
}
defer watch.Close()
for {
event, err := watch.Next(stream.Context())

View File

@ -452,6 +452,7 @@ func testListWatch(t *testing.T, opts TestOptions) {
watch, err := backend.WatchList(ctx, tc.resourceType, tc.tenancy, tc.namePrefix)
require.NoError(t, err)
t.Cleanup(watch.Close)
for i := 0; i < len(tc.results); i++ {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
@ -471,6 +472,7 @@ func testListWatch(t *testing.T, opts TestOptions) {
watch, err := backend.WatchList(ctx, tc.resourceType, tc.tenancy, tc.namePrefix)
require.NoError(t, err)
t.Cleanup(watch.Close)
// Write the seed data after the watch has been established.
for _, r := range seedData {

View File

@ -71,6 +71,9 @@ func (w *Watch) nextEvent(ctx context.Context) (*stream.Event, error) {
}
}
// Close the watch and free its associated resources.
func (w *Watch) Close() { w.sub.Unsubscribe() }
var eventTopic = stream.StringTopic("resources")
type eventPayload struct {

View File

@ -269,10 +269,13 @@ type Backend interface {
}
// Watch represents a watch on a given set of resources. Call Next to get the
// next event (i.e. upsert or deletion).
// next event (i.e. upsert or deletion) and Close when you're done watching.
type Watch interface {
// Next returns the next event (i.e. upsert or deletion)
Next(ctx context.Context) (*pbresource.WatchEvent, error)
// Close the watch and free its associated resources.
Close()
}
// UnversionedType represents a pbresource.Type as it is stored without the