From 64f1084c1a7b7b6830d264bb3c91aedb0c9d886b Mon Sep 17 00:00:00 2001 From: Eric Tune Date: Wed, 3 Dec 2014 23:54:40 -0800 Subject: [PATCH] Rename util.config.Watcher -> util.config.Broadcaster. Watch is a widely used term in the codebase, which doesn't capture the key feature of this type: broadcasting a change to several listeners. --- pkg/proxy/config/config.go | 26 +++++++++++++------------- pkg/util/config/config.go | 26 +++++++++++++------------- pkg/util/config/config_test.go | 12 ++++++------ 3 files changed, 32 insertions(+), 32 deletions(-) diff --git a/pkg/proxy/config/config.go b/pkg/proxy/config/config.go index ecb7159cd9..269a672ec4 100644 --- a/pkg/proxy/config/config.go +++ b/pkg/proxy/config/config.go @@ -73,7 +73,7 @@ type EndpointsConfigHandler interface { // It accepts "set", "add" and "remove" operations of endpoints via channels, and invokes registered handlers on change. type EndpointsConfig struct { mux *config.Mux - watcher *config.Watcher + bcaster *config.Broadcaster store *endpointsStore } @@ -83,13 +83,13 @@ func NewEndpointsConfig() *EndpointsConfig { updates := make(chan struct{}) store := &endpointsStore{updates: updates, endpoints: make(map[string]map[string]api.Endpoints)} mux := config.NewMux(store) - watcher := config.NewWatcher() - go watchForUpdates(watcher, store, updates) - return &EndpointsConfig{mux, watcher, store} + bcaster := config.NewBroadcaster() + go watchForUpdates(bcaster, store, updates) + return &EndpointsConfig{mux, bcaster, store} } func (c *EndpointsConfig) RegisterHandler(handler EndpointsConfigHandler) { - c.watcher.Add(config.ListenerFunc(func(instance interface{}) { + c.bcaster.Add(config.ListenerFunc(func(instance interface{}) { handler.OnUpdate(instance.([]api.Endpoints)) })) } @@ -168,7 +168,7 @@ func (s *endpointsStore) MergedState() interface{} { // It accepts "set", "add" and "remove" operations of services via channels, and invokes registered handlers on change. type ServiceConfig struct { mux *config.Mux - watcher *config.Watcher + bcaster *config.Broadcaster store *serviceStore } @@ -178,13 +178,13 @@ func NewServiceConfig() *ServiceConfig { updates := make(chan struct{}) store := &serviceStore{updates: updates, services: make(map[string]map[string]api.Service)} mux := config.NewMux(store) - watcher := config.NewWatcher() - go watchForUpdates(watcher, store, updates) - return &ServiceConfig{mux, watcher, store} + bcaster := config.NewBroadcaster() + go watchForUpdates(bcaster, store, updates) + return &ServiceConfig{mux, bcaster, store} } func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) { - c.watcher.Add(config.ListenerFunc(func(instance interface{}) { + c.bcaster.Add(config.ListenerFunc(func(instance interface{}) { handler.OnUpdate(instance.([]api.Service)) })) } @@ -259,10 +259,10 @@ func (s *serviceStore) MergedState() interface{} { return services } -// watchForUpdates invokes watcher.Notify() with the latest version of an object +// watchForUpdates invokes bcaster.Notify() with the latest version of an object // when changes occur. -func watchForUpdates(watcher *config.Watcher, accessor config.Accessor, updates <-chan struct{}) { +func watchForUpdates(bcaster *config.Broadcaster, accessor config.Accessor, updates <-chan struct{}) { for _ = range updates { - watcher.Notify(accessor.MergedState()) + bcaster.Notify(accessor.MergedState()) } } diff --git a/pkg/util/config/config.go b/pkg/util/config/config.go index f1d4c4bc9c..279ea621aa 100644 --- a/pkg/util/config/config.go +++ b/pkg/util/config/config.go @@ -110,30 +110,30 @@ func (f ListenerFunc) OnUpdate(instance interface{}) { f(instance) } -type Watcher struct { +type Broadcaster struct { // Listeners for changes and their lock. listenerLock sync.RWMutex listeners []Listener } -// NewWatcher registers a set of listeners that support the Listener interface -// and notify them on changes. -func NewWatcher() *Watcher { - return &Watcher{} +// NewBroadcaster registers a set of listeners that support the Listener interface +// and notifies them all on changes. +func NewBroadcaster() *Broadcaster { + return &Broadcaster{} } // Add registers listener to receive updates of changes. -func (m *Watcher) Add(listener Listener) { - m.listenerLock.Lock() - defer m.listenerLock.Unlock() - m.listeners = append(m.listeners, listener) +func (b *Broadcaster) Add(listener Listener) { + b.listenerLock.Lock() + defer b.listenerLock.Unlock() + b.listeners = append(b.listeners, listener) } // Notify notifies all listeners. -func (m *Watcher) Notify(instance interface{}) { - m.listenerLock.RLock() - listeners := m.listeners - m.listenerLock.RUnlock() +func (b *Broadcaster) Notify(instance interface{}) { + b.listenerLock.RLock() + listeners := b.listeners + b.listenerLock.RUnlock() for _, listener := range listeners { listener.OnUpdate(instance) } diff --git a/pkg/util/config/config_test.go b/pkg/util/config/config_test.go index fbf1a4b600..03d570292d 100644 --- a/pkg/util/config/config_test.go +++ b/pkg/util/config/config_test.go @@ -97,24 +97,24 @@ func TestSimultaneousMerge(t *testing.T) { <-ch } -func TestWatcher(t *testing.T) { - watch := NewWatcher() - watch.Notify(struct{}{}) +func TestBroadcaster(t *testing.T) { + b := NewBroadcaster() + b.Notify(struct{}{}) ch := make(chan bool, 2) - watch.Add(ListenerFunc(func(object interface{}) { + b.Add(ListenerFunc(func(object interface{}) { if object != "test" { t.Errorf("Expected %s, Got %s", "test", object) } ch <- true })) - watch.Add(ListenerFunc(func(object interface{}) { + b.Add(ListenerFunc(func(object interface{}) { if object != "test" { t.Errorf("Expected %s, Got %s", "test", object) } ch <- true })) - watch.Notify("test") + b.Notify("test") <-ch <-ch }