diff --git a/pkg/watch/mux.go b/pkg/watch/mux.go index 5d1f71768d..ccae32264f 100644 --- a/pkg/watch/mux.go +++ b/pkg/watch/mux.go @@ -41,8 +41,9 @@ const incomingQueueLength = 25 type Broadcaster struct { lock sync.Mutex - watchers map[int64]*broadcasterWatcher - nextWatcher int64 + watchers map[int64]*broadcasterWatcher + nextWatcher int64 + distributing sync.WaitGroup incoming chan Event @@ -67,6 +68,7 @@ func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *B watchQueueLength: queueLength, fullChannelBehavior: fullChannelBehavior, } + m.distributing.Add(1) go m.loop() return m } @@ -146,9 +148,14 @@ func (m *Broadcaster) Action(action EventType, obj runtime.Object) { } // Shutdown disconnects all watchers (but any queued events will still be distributed). -// You must not call Action after calling Shutdown. +// You must not call Action or Watch* after calling Shutdown. This call blocks +// until all events have been distributed through the outbound channels. Note +// that since they can be buffered, this means that the watchers might not +// have received the data yet as it can remain sitting in the buffered +// channel. func (m *Broadcaster) Shutdown() { close(m.incoming) + m.distributing.Wait() } // loop receives from m.incoming and distributes to all watchers. @@ -163,6 +170,7 @@ func (m *Broadcaster) loop() { m.distribute(event) } m.closeAll() + m.distributing.Done() } // distribute sends event to all watchers. Blocking. diff --git a/pkg/watch/mux_test.go b/pkg/watch/mux_test.go index fd31910060..d3e48279cc 100644 --- a/pkg/watch/mux_test.go +++ b/pkg/watch/mux_test.go @@ -124,9 +124,8 @@ func TestBroadcasterDropIfChannelFull(t *testing.T) { event2 := Event{Added, &myType{"bar", "hello world 2"}} // Add a couple watchers - const testWatchers = 2 - watches := make([]Interface, testWatchers) - for i := 0; i < testWatchers; i++ { + watches := make([]Interface, 2) + for i := range watches { watches[i] = m.Watch() } @@ -139,8 +138,8 @@ func TestBroadcasterDropIfChannelFull(t *testing.T) { // Pull events from the queue. wg := sync.WaitGroup{} - wg.Add(testWatchers) - for i := 0; i < testWatchers; i++ { + wg.Add(len(watches)) + for i := range watches { // Verify that each watcher only gets the first event because its watch // queue of length one was full from the first one. go func(watcher int, w Interface) { @@ -148,14 +147,12 @@ func TestBroadcasterDropIfChannelFull(t *testing.T) { e1, ok := <-w.ResultChan() if !ok { t.Errorf("Watcher %v failed to retrieve first event.", watcher) - return } if e, a := event1, e1; !reflect.DeepEqual(e, a) { t.Errorf("Watcher %v: Expected (%v, %#v), got (%v, %#v)", watcher, e.Type, e.Object, a.Type, a.Object) - } else { - t.Logf("Got (%v, %#v)", e1.Type, e1.Object) } + t.Logf("Got (%v, %#v)", e1.Type, e1.Object) e2, ok := <-w.ResultChan() if ok { t.Errorf("Watcher %v received second event (%v, %#v) even though it shouldn't have.",