From 3f285269ccc54a37c4433c073ad615c62c82ffa4 Mon Sep 17 00:00:00 2001 From: Eric Tune Date: Thu, 4 Dec 2014 00:30:51 -0800 Subject: [PATCH] Rename watch.Mux -> watch.Broadcaster A few reasons: - Mux is already widely used in the codebase to refer to a http handler mux. - Original meaning of Mux was something which sent a chose one of several inputs to and output. This sends one output to all outputs. Broadcast captures that idea better. - Aligns with similar class config.Broadcaster (see #2747) --- pkg/client/record/event.go | 2 +- pkg/registry/event/rest_test.go | 2 +- pkg/registry/registrytest/generic.go | 14 ++++----- pkg/registry/registrytest/pod.go | 16 +++++----- pkg/watch/mux.go | 44 ++++++++++++++-------------- pkg/watch/mux_test.go | 16 +++++----- 6 files changed, 47 insertions(+), 47 deletions(-) diff --git a/pkg/client/record/event.go b/pkg/client/record/event.go index c3e52119c0..bd941ae451 100644 --- a/pkg/client/record/event.go +++ b/pkg/client/record/event.go @@ -118,7 +118,7 @@ func GetEvents(f func(*api.Event)) watch.Interface { const queueLen = 1000 -var events = watch.NewMux(queueLen) +var events = watch.NewBroadcaster(queueLen) // Event constructs an event from the given information and puts it in the queue for sending. // 'object' is the object this event is about. Event will make a reference-- or you may also diff --git a/pkg/registry/event/rest_test.go b/pkg/registry/event/rest_test.go index 0f62ad8e67..095813c6ed 100644 --- a/pkg/registry/event/rest_test.go +++ b/pkg/registry/event/rest_test.go @@ -257,7 +257,7 @@ func TestRESTWatch(t *testing.T) { t.Fatalf("Unexpected error %v", err) } go func() { - reg.Mux.Action(watch.Added, eventA) + reg.Broadcaster.Action(watch.Added, eventA) }() got := <-wi.ResultChan() if e, a := eventA, got.Object; !reflect.DeepEqual(e, a) { diff --git a/pkg/registry/registrytest/generic.go b/pkg/registry/registrytest/generic.go index 6142f90f7e..28a0d66b5e 100644 --- a/pkg/registry/registrytest/generic.go +++ b/pkg/registry/registrytest/generic.go @@ -33,13 +33,13 @@ type GenericRegistry struct { ObjectList runtime.Object sync.Mutex - Mux *watch.Mux + Broadcaster *watch.Broadcaster } func NewGeneric(list runtime.Object) *GenericRegistry { return &GenericRegistry{ - ObjectList: list, - Mux: watch.NewMux(0), + ObjectList: list, + Broadcaster: watch.NewBroadcaster(0), } } @@ -54,7 +54,7 @@ func (r *GenericRegistry) List(ctx api.Context, m generic.Matcher) (runtime.Obje func (r *GenericRegistry) Watch(ctx api.Context, m generic.Matcher, resourceVersion string) (watch.Interface, error) { // TODO: wire filter down into the mux; it needs access to current and previous state :( - return r.Mux.Watch(), nil + return r.Broadcaster.Watch(), nil } func (r *GenericRegistry) Get(ctx api.Context, id string) (runtime.Object, error) { @@ -67,7 +67,7 @@ func (r *GenericRegistry) Create(ctx api.Context, id string, obj runtime.Object) r.Lock() defer r.Unlock() r.Object = obj - r.Mux.Action(watch.Added, obj) + r.Broadcaster.Action(watch.Added, obj) return r.Err } @@ -75,13 +75,13 @@ func (r *GenericRegistry) Update(ctx api.Context, id string, obj runtime.Object) r.Lock() defer r.Unlock() r.Object = obj - r.Mux.Action(watch.Modified, obj) + r.Broadcaster.Action(watch.Modified, obj) return r.Err } func (r *GenericRegistry) Delete(ctx api.Context, id string) error { r.Lock() defer r.Unlock() - r.Mux.Action(watch.Deleted, r.Object) + r.Broadcaster.Action(watch.Deleted, r.Object) return r.Err } diff --git a/pkg/registry/registrytest/pod.go b/pkg/registry/registrytest/pod.go index 1ee58294fc..72046e1f0d 100644 --- a/pkg/registry/registrytest/pod.go +++ b/pkg/registry/registrytest/pod.go @@ -30,13 +30,13 @@ type PodRegistry struct { Pods *api.PodList sync.Mutex - mux *watch.Mux + broadcaster *watch.Broadcaster } func NewPodRegistry(pods *api.PodList) *PodRegistry { return &PodRegistry{ - Pods: pods, - mux: watch.NewMux(0), + Pods: pods, + broadcaster: watch.NewBroadcaster(0), } } @@ -64,8 +64,8 @@ func (r *PodRegistry) ListPods(ctx api.Context, selector labels.Selector) (*api. } func (r *PodRegistry) WatchPods(ctx api.Context, resourceVersion string, filter func(*api.Pod) bool) (watch.Interface, error) { - // TODO: wire filter down into the mux; it needs access to current and previous state :( - return r.mux.Watch(), nil + // TODO: wire filter down into the broadcaster; it needs access to current and previous state :( + return r.broadcaster.Watch(), nil } func (r *PodRegistry) GetPod(ctx api.Context, podId string) (*api.Pod, error) { @@ -78,7 +78,7 @@ func (r *PodRegistry) CreatePod(ctx api.Context, pod *api.Pod) error { r.Lock() defer r.Unlock() r.Pod = pod - r.mux.Action(watch.Added, pod) + r.broadcaster.Action(watch.Added, pod) return r.Err } @@ -86,13 +86,13 @@ func (r *PodRegistry) UpdatePod(ctx api.Context, pod *api.Pod) error { r.Lock() defer r.Unlock() r.Pod = pod - r.mux.Action(watch.Modified, pod) + r.broadcaster.Action(watch.Modified, pod) return r.Err } func (r *PodRegistry) DeletePod(ctx api.Context, podId string) error { r.Lock() defer r.Unlock() - r.mux.Action(watch.Deleted, r.Pod) + r.broadcaster.Action(watch.Deleted, r.Pod) return r.Err } diff --git a/pkg/watch/mux.go b/pkg/watch/mux.go index 2d40014fb5..e4d8bef160 100644 --- a/pkg/watch/mux.go +++ b/pkg/watch/mux.go @@ -22,25 +22,25 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" ) -// Mux distributes event notifications among any number of watchers. Every event +// Broadcaster distributes event notifications among any number of watchers. Every event // is delivered to every watcher. -type Mux struct { +type Broadcaster struct { lock sync.Mutex - watchers map[int64]*muxWatcher + watchers map[int64]*broadcasterWatcher nextWatcher int64 incoming chan Event } -// NewMux creates a new Mux. queueLength is the maximum number of events to queue. +// NewBroadcaster creates a new Broadcaster. queueLength is the maximum number of events to queue. // When queueLength is 0, Action will block until any prior event has been // completely distributed. It is guaranteed that events will be distibuted in the // order in which they ocurr, but the order in which a single event is distributed // among all of the watchers is unspecified. -func NewMux(queueLength int) *Mux { - m := &Mux{ - watchers: map[int64]*muxWatcher{}, +func NewBroadcaster(queueLength int) *Broadcaster { + m := &Broadcaster{ + watchers: map[int64]*broadcasterWatcher{}, incoming: make(chan Event, queueLength), } go m.loop() @@ -50,12 +50,12 @@ func NewMux(queueLength int) *Mux { // Watch adds a new watcher to the list and returns an Interface for it. // Note: new watchers will only receive new events. They won't get an entire history // of previous events. -func (m *Mux) Watch() Interface { +func (m *Broadcaster) Watch() Interface { m.lock.Lock() defer m.lock.Unlock() id := m.nextWatcher m.nextWatcher++ - w := &muxWatcher{ + w := &broadcasterWatcher{ result: make(chan Event), stopped: make(chan struct{}), id: id, @@ -66,7 +66,7 @@ func (m *Mux) Watch() Interface { } // stopWatching stops the given watcher and removes it from the list. -func (m *Mux) stopWatching(id int64) { +func (m *Broadcaster) stopWatching(id int64) { m.lock.Lock() defer m.lock.Unlock() w, ok := m.watchers[id] @@ -79,7 +79,7 @@ func (m *Mux) stopWatching(id int64) { } // closeAll disconnects all watchers (presumably in response to a Shutdown call). -func (m *Mux) closeAll() { +func (m *Broadcaster) closeAll() { m.lock.Lock() defer m.lock.Unlock() for _, w := range m.watchers { @@ -87,24 +87,24 @@ func (m *Mux) closeAll() { } // Delete everything from the map, since presence/absence in the map is used // by stopWatching to avoid double-closing the channel. - m.watchers = map[int64]*muxWatcher{} + m.watchers = map[int64]*broadcasterWatcher{} } // Action distributes the given event among all watchers. -func (m *Mux) Action(action EventType, obj runtime.Object) { +func (m *Broadcaster) Action(action EventType, obj runtime.Object) { m.incoming <- Event{action, obj} } // Shutdown disconnects all watchers (but any queued events will still be distributed). // You must not call Action after calling Shutdown. -func (m *Mux) Shutdown() { +func (m *Broadcaster) Shutdown() { close(m.incoming) } // loop recieves from m.incoming and distributes to all watchers. -func (m *Mux) loop() { +func (m *Broadcaster) loop() { // Deliberately not catching crashes here. Yes, bring down the process if there's a - // bug in watch.Mux. + // bug in watch.Broadcaster. for { event, ok := <-m.incoming if !ok { @@ -116,7 +116,7 @@ func (m *Mux) loop() { } // distribute sends event to all watchers. Blocking. -func (m *Mux) distribute(event Event) { +func (m *Broadcaster) distribute(event Event) { m.lock.Lock() defer m.lock.Unlock() for _, w := range m.watchers { @@ -127,22 +127,22 @@ func (m *Mux) distribute(event Event) { } } -// muxWatcher handles a single watcher of a mux -type muxWatcher struct { +// broadcasterWatcher handles a single watcher of a broadcaster +type broadcasterWatcher struct { result chan Event stopped chan struct{} stop sync.Once id int64 - m *Mux + m *Broadcaster } // ResultChan returns a channel to use for waiting on events. -func (mw *muxWatcher) ResultChan() <-chan Event { +func (mw *broadcasterWatcher) ResultChan() <-chan Event { return mw.result } // Stop stops watching and removes mw from its list. -func (mw *muxWatcher) Stop() { +func (mw *broadcasterWatcher) Stop() { mw.stop.Do(func() { close(mw.stopped) mw.m.stopWatching(mw.id) diff --git a/pkg/watch/mux_test.go b/pkg/watch/mux_test.go index 11c6758b2c..8958461871 100644 --- a/pkg/watch/mux_test.go +++ b/pkg/watch/mux_test.go @@ -30,7 +30,7 @@ type myType struct { func (*myType) IsAnAPIObject() {} -func TestMux(t *testing.T) { +func TestBroadcaster(t *testing.T) { table := []Event{ {Added, &myType{"foo", "hello world 1"}}, {Added, &myType{"bar", "hello world 2"}}, @@ -38,8 +38,8 @@ func TestMux(t *testing.T) { {Deleted, &myType{"bar", "hello world 4"}}, } - // The mux we're testing - m := NewMux(0) + // The broadcaster we're testing + m := NewBroadcaster(0) // Add a bunch of watchers const testWatchers = 2 @@ -76,8 +76,8 @@ func TestMux(t *testing.T) { wg.Wait() } -func TestMuxWatcherClose(t *testing.T) { - m := NewMux(0) +func TestBroadcasterWatcherClose(t *testing.T) { + m := NewBroadcaster(0) w := m.Watch() w2 := m.Watch() w.Stop() @@ -93,11 +93,11 @@ func TestMuxWatcherClose(t *testing.T) { w2.Stop() } -func TestMuxWatcherStopDeadlock(t *testing.T) { +func TestBroadcasterWatcherStopDeadlock(t *testing.T) { done := make(chan bool) - m := NewMux(0) + m := NewBroadcaster(0) go func(w0, w1 Interface) { - // We know Mux is in the distribute loop once one watcher receives + // We know Broadcaster is in the distribute loop once one watcher receives // an event. Stop the other watcher while distribute is trying to // send to it. select {