Fix race condition in watch

The Shutdown() call returned immediately, without waiting for all event
distributions to be completed. Even worse, it would close all the watcher
result channels before all the info was sent to them.

Properly wait for all distributor goroutines - currently only one - to be
finished.

This fixes the flaky test TestBroadcasterDropIfChannelFull. Bonus cleanup on
said test too.
pull/6/head
Daniel Martí 2015-08-22 15:01:56 -07:00
parent c054b20148
commit ad243edaa3
2 changed files with 16 additions and 11 deletions

View File

@ -41,8 +41,9 @@ const incomingQueueLength = 25
type Broadcaster struct { type Broadcaster struct {
lock sync.Mutex lock sync.Mutex
watchers map[int64]*broadcasterWatcher watchers map[int64]*broadcasterWatcher
nextWatcher int64 nextWatcher int64
distributing sync.WaitGroup
incoming chan Event incoming chan Event
@ -67,6 +68,7 @@ func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *B
watchQueueLength: queueLength, watchQueueLength: queueLength,
fullChannelBehavior: fullChannelBehavior, fullChannelBehavior: fullChannelBehavior,
} }
m.distributing.Add(1)
go m.loop() go m.loop()
return m return m
} }
@ -146,9 +148,14 @@ func (m *Broadcaster) Action(action EventType, obj runtime.Object) {
} }
// Shutdown disconnects all watchers (but any queued events will still be distributed). // Shutdown disconnects all watchers (but any queued events will still be distributed).
// You must not call Action after calling Shutdown. // You must not call Action or Watch* after calling Shutdown. This call blocks
// until all events have been distributed through the outbound channels. Note
// that since they can be buffered, this means that the watchers might not
// have received the data yet as it can remain sitting in the buffered
// channel.
func (m *Broadcaster) Shutdown() { func (m *Broadcaster) Shutdown() {
close(m.incoming) close(m.incoming)
m.distributing.Wait()
} }
// loop receives from m.incoming and distributes to all watchers. // loop receives from m.incoming and distributes to all watchers.
@ -163,6 +170,7 @@ func (m *Broadcaster) loop() {
m.distribute(event) m.distribute(event)
} }
m.closeAll() m.closeAll()
m.distributing.Done()
} }
// distribute sends event to all watchers. Blocking. // distribute sends event to all watchers. Blocking.

View File

@ -124,9 +124,8 @@ func TestBroadcasterDropIfChannelFull(t *testing.T) {
event2 := Event{Added, &myType{"bar", "hello world 2"}} event2 := Event{Added, &myType{"bar", "hello world 2"}}
// Add a couple watchers // Add a couple watchers
const testWatchers = 2 watches := make([]Interface, 2)
watches := make([]Interface, testWatchers) for i := range watches {
for i := 0; i < testWatchers; i++ {
watches[i] = m.Watch() watches[i] = m.Watch()
} }
@ -139,8 +138,8 @@ func TestBroadcasterDropIfChannelFull(t *testing.T) {
// Pull events from the queue. // Pull events from the queue.
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(testWatchers) wg.Add(len(watches))
for i := 0; i < testWatchers; i++ { for i := range watches {
// Verify that each watcher only gets the first event because its watch // Verify that each watcher only gets the first event because its watch
// queue of length one was full from the first one. // queue of length one was full from the first one.
go func(watcher int, w Interface) { go func(watcher int, w Interface) {
@ -148,14 +147,12 @@ func TestBroadcasterDropIfChannelFull(t *testing.T) {
e1, ok := <-w.ResultChan() e1, ok := <-w.ResultChan()
if !ok { if !ok {
t.Errorf("Watcher %v failed to retrieve first event.", watcher) t.Errorf("Watcher %v failed to retrieve first event.", watcher)
return
} }
if e, a := event1, e1; !reflect.DeepEqual(e, a) { if e, a := event1, e1; !reflect.DeepEqual(e, a) {
t.Errorf("Watcher %v: Expected (%v, %#v), got (%v, %#v)", t.Errorf("Watcher %v: Expected (%v, %#v), got (%v, %#v)",
watcher, e.Type, e.Object, a.Type, a.Object) watcher, e.Type, e.Object, a.Type, a.Object)
} else {
t.Logf("Got (%v, %#v)", e1.Type, e1.Object)
} }
t.Logf("Got (%v, %#v)", e1.Type, e1.Object)
e2, ok := <-w.ResultChan() e2, ok := <-w.ResultChan()
if ok { if ok {
t.Errorf("Watcher %v received second event (%v, %#v) even though it shouldn't have.", t.Errorf("Watcher %v received second event (%v, %#v) even though it shouldn't have.",