mirror of https://github.com/k3s-io/k3s
Rename util.config.Watcher -> util.config.Broadcaster.
Watch is a widely used term in the codebase, which doesn't capture the key feature of this type: broadcasting a change to several listeners.pull/6/head
parent
c31b3f04de
commit
64f1084c1a
|
@ -73,7 +73,7 @@ type EndpointsConfigHandler interface {
|
||||||
// It accepts "set", "add" and "remove" operations of endpoints via channels, and invokes registered handlers on change.
|
// It accepts "set", "add" and "remove" operations of endpoints via channels, and invokes registered handlers on change.
|
||||||
type EndpointsConfig struct {
|
type EndpointsConfig struct {
|
||||||
mux *config.Mux
|
mux *config.Mux
|
||||||
watcher *config.Watcher
|
bcaster *config.Broadcaster
|
||||||
store *endpointsStore
|
store *endpointsStore
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -83,13 +83,13 @@ func NewEndpointsConfig() *EndpointsConfig {
|
||||||
updates := make(chan struct{})
|
updates := make(chan struct{})
|
||||||
store := &endpointsStore{updates: updates, endpoints: make(map[string]map[string]api.Endpoints)}
|
store := &endpointsStore{updates: updates, endpoints: make(map[string]map[string]api.Endpoints)}
|
||||||
mux := config.NewMux(store)
|
mux := config.NewMux(store)
|
||||||
watcher := config.NewWatcher()
|
bcaster := config.NewBroadcaster()
|
||||||
go watchForUpdates(watcher, store, updates)
|
go watchForUpdates(bcaster, store, updates)
|
||||||
return &EndpointsConfig{mux, watcher, store}
|
return &EndpointsConfig{mux, bcaster, store}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *EndpointsConfig) RegisterHandler(handler EndpointsConfigHandler) {
|
func (c *EndpointsConfig) RegisterHandler(handler EndpointsConfigHandler) {
|
||||||
c.watcher.Add(config.ListenerFunc(func(instance interface{}) {
|
c.bcaster.Add(config.ListenerFunc(func(instance interface{}) {
|
||||||
handler.OnUpdate(instance.([]api.Endpoints))
|
handler.OnUpdate(instance.([]api.Endpoints))
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
@ -168,7 +168,7 @@ func (s *endpointsStore) MergedState() interface{} {
|
||||||
// It accepts "set", "add" and "remove" operations of services via channels, and invokes registered handlers on change.
|
// It accepts "set", "add" and "remove" operations of services via channels, and invokes registered handlers on change.
|
||||||
type ServiceConfig struct {
|
type ServiceConfig struct {
|
||||||
mux *config.Mux
|
mux *config.Mux
|
||||||
watcher *config.Watcher
|
bcaster *config.Broadcaster
|
||||||
store *serviceStore
|
store *serviceStore
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -178,13 +178,13 @@ func NewServiceConfig() *ServiceConfig {
|
||||||
updates := make(chan struct{})
|
updates := make(chan struct{})
|
||||||
store := &serviceStore{updates: updates, services: make(map[string]map[string]api.Service)}
|
store := &serviceStore{updates: updates, services: make(map[string]map[string]api.Service)}
|
||||||
mux := config.NewMux(store)
|
mux := config.NewMux(store)
|
||||||
watcher := config.NewWatcher()
|
bcaster := config.NewBroadcaster()
|
||||||
go watchForUpdates(watcher, store, updates)
|
go watchForUpdates(bcaster, store, updates)
|
||||||
return &ServiceConfig{mux, watcher, store}
|
return &ServiceConfig{mux, bcaster, store}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) {
|
func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) {
|
||||||
c.watcher.Add(config.ListenerFunc(func(instance interface{}) {
|
c.bcaster.Add(config.ListenerFunc(func(instance interface{}) {
|
||||||
handler.OnUpdate(instance.([]api.Service))
|
handler.OnUpdate(instance.([]api.Service))
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
@ -259,10 +259,10 @@ func (s *serviceStore) MergedState() interface{} {
|
||||||
return services
|
return services
|
||||||
}
|
}
|
||||||
|
|
||||||
// watchForUpdates invokes watcher.Notify() with the latest version of an object
|
// watchForUpdates invokes bcaster.Notify() with the latest version of an object
|
||||||
// when changes occur.
|
// when changes occur.
|
||||||
func watchForUpdates(watcher *config.Watcher, accessor config.Accessor, updates <-chan struct{}) {
|
func watchForUpdates(bcaster *config.Broadcaster, accessor config.Accessor, updates <-chan struct{}) {
|
||||||
for _ = range updates {
|
for _ = range updates {
|
||||||
watcher.Notify(accessor.MergedState())
|
bcaster.Notify(accessor.MergedState())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -110,30 +110,30 @@ func (f ListenerFunc) OnUpdate(instance interface{}) {
|
||||||
f(instance)
|
f(instance)
|
||||||
}
|
}
|
||||||
|
|
||||||
type Watcher struct {
|
type Broadcaster struct {
|
||||||
// Listeners for changes and their lock.
|
// Listeners for changes and their lock.
|
||||||
listenerLock sync.RWMutex
|
listenerLock sync.RWMutex
|
||||||
listeners []Listener
|
listeners []Listener
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewWatcher registers a set of listeners that support the Listener interface
|
// NewBroadcaster registers a set of listeners that support the Listener interface
|
||||||
// and notify them on changes.
|
// and notifies them all on changes.
|
||||||
func NewWatcher() *Watcher {
|
func NewBroadcaster() *Broadcaster {
|
||||||
return &Watcher{}
|
return &Broadcaster{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add registers listener to receive updates of changes.
|
// Add registers listener to receive updates of changes.
|
||||||
func (m *Watcher) Add(listener Listener) {
|
func (b *Broadcaster) Add(listener Listener) {
|
||||||
m.listenerLock.Lock()
|
b.listenerLock.Lock()
|
||||||
defer m.listenerLock.Unlock()
|
defer b.listenerLock.Unlock()
|
||||||
m.listeners = append(m.listeners, listener)
|
b.listeners = append(b.listeners, listener)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Notify notifies all listeners.
|
// Notify notifies all listeners.
|
||||||
func (m *Watcher) Notify(instance interface{}) {
|
func (b *Broadcaster) Notify(instance interface{}) {
|
||||||
m.listenerLock.RLock()
|
b.listenerLock.RLock()
|
||||||
listeners := m.listeners
|
listeners := b.listeners
|
||||||
m.listenerLock.RUnlock()
|
b.listenerLock.RUnlock()
|
||||||
for _, listener := range listeners {
|
for _, listener := range listeners {
|
||||||
listener.OnUpdate(instance)
|
listener.OnUpdate(instance)
|
||||||
}
|
}
|
||||||
|
|
|
@ -97,24 +97,24 @@ func TestSimultaneousMerge(t *testing.T) {
|
||||||
<-ch
|
<-ch
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWatcher(t *testing.T) {
|
func TestBroadcaster(t *testing.T) {
|
||||||
watch := NewWatcher()
|
b := NewBroadcaster()
|
||||||
watch.Notify(struct{}{})
|
b.Notify(struct{}{})
|
||||||
|
|
||||||
ch := make(chan bool, 2)
|
ch := make(chan bool, 2)
|
||||||
watch.Add(ListenerFunc(func(object interface{}) {
|
b.Add(ListenerFunc(func(object interface{}) {
|
||||||
if object != "test" {
|
if object != "test" {
|
||||||
t.Errorf("Expected %s, Got %s", "test", object)
|
t.Errorf("Expected %s, Got %s", "test", object)
|
||||||
}
|
}
|
||||||
ch <- true
|
ch <- true
|
||||||
}))
|
}))
|
||||||
watch.Add(ListenerFunc(func(object interface{}) {
|
b.Add(ListenerFunc(func(object interface{}) {
|
||||||
if object != "test" {
|
if object != "test" {
|
||||||
t.Errorf("Expected %s, Got %s", "test", object)
|
t.Errorf("Expected %s, Got %s", "test", object)
|
||||||
}
|
}
|
||||||
ch <- true
|
ch <- true
|
||||||
}))
|
}))
|
||||||
watch.Notify("test")
|
b.Notify("test")
|
||||||
<-ch
|
<-ch
|
||||||
<-ch
|
<-ch
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue