FakeWatcher with channel size constructor

pull/6/head
Marcin Wielgus 2016-09-08 12:30:40 +02:00
parent aff7dfcaab
commit 13a80ce912
2 changed files with 34 additions and 32 deletions

View File

@ -44,7 +44,7 @@ func (wd *WatcherDispatcher) register(watcher *watch.FakeWatcher) {
defer wd.Unlock()
wd.watchers = append(wd.watchers, watcher)
for _, event := range wd.eventsSoFar {
go watcher.Action(event.Type, event.Object)
watcher.Action(event.Type, event.Object)
}
}
@ -52,13 +52,11 @@ func (wd *WatcherDispatcher) register(watcher *watch.FakeWatcher) {
func (wd *WatcherDispatcher) Add(obj runtime.Object) {
wd.Lock()
defer wd.Unlock()
event := &watch.Event{
Type: watch.Added,
Object: obj,
}
wd.eventsSoFar = append(wd.eventsSoFar, event)
wd.eventsSoFar = append(wd.eventsSoFar, &watch.Event{Type: watch.Added, Object: obj})
for _, watcher := range wd.watchers {
go watcher.Add(obj)
if !watcher.IsStopped() {
watcher.Add(obj)
}
}
}
@ -66,13 +64,11 @@ func (wd *WatcherDispatcher) Add(obj runtime.Object) {
func (wd *WatcherDispatcher) Modify(obj runtime.Object) {
wd.Lock()
defer wd.Unlock()
event := &watch.Event{
Type: watch.Modified,
Object: obj,
}
wd.eventsSoFar = append(wd.eventsSoFar, event)
wd.eventsSoFar = append(wd.eventsSoFar, &watch.Event{Type: watch.Modified, Object: obj})
for _, watcher := range wd.watchers {
go watcher.Modify(obj)
if !watcher.IsStopped() {
watcher.Modify(obj)
}
}
}
@ -80,13 +76,11 @@ func (wd *WatcherDispatcher) Modify(obj runtime.Object) {
func (wd *WatcherDispatcher) Delete(lastValue runtime.Object) {
wd.Lock()
defer wd.Unlock()
event := &watch.Event{
Type: watch.Deleted,
Object: lastValue,
}
wd.eventsSoFar = append(wd.eventsSoFar, event)
wd.eventsSoFar = append(wd.eventsSoFar, &watch.Event{Type: watch.Deleted, Object: lastValue})
for _, watcher := range wd.watchers {
go watcher.Delete(lastValue)
if !watcher.IsStopped() {
watcher.Delete(lastValue)
}
}
}
@ -94,13 +88,11 @@ func (wd *WatcherDispatcher) Delete(lastValue runtime.Object) {
func (wd *WatcherDispatcher) Error(errValue runtime.Object) {
wd.Lock()
defer wd.Unlock()
event := &watch.Event{
Type: watch.Error,
Object: errValue,
}
wd.eventsSoFar = append(wd.eventsSoFar, event)
wd.eventsSoFar = append(wd.eventsSoFar, &watch.Event{Type: watch.Error, Object: errValue})
for _, watcher := range wd.watchers {
go watcher.Error(errValue)
if !watcher.IsStopped() {
watcher.Error(errValue)
}
}
}
@ -108,13 +100,11 @@ func (wd *WatcherDispatcher) Error(errValue runtime.Object) {
func (wd *WatcherDispatcher) Action(action watch.EventType, obj runtime.Object) {
wd.Lock()
defer wd.Unlock()
event := &watch.Event{
Type: action,
Object: obj,
}
wd.eventsSoFar = append(wd.eventsSoFar, event)
wd.eventsSoFar = append(wd.eventsSoFar, &watch.Event{Type: action, Object: obj})
for _, watcher := range wd.watchers {
go watcher.Action(action, obj)
if !watcher.IsStopped() {
watcher.Action(action, obj)
}
}
}
@ -127,7 +117,7 @@ func RegisterFakeWatch(resource string, client *core.Fake) *WatcherDispatcher {
}
client.AddWatchReactor(resource, func(action core.Action) (bool, watch.Interface, error) {
watcher := watch.NewFake()
watcher := watch.NewFakeWithChanSize(100)
dispatcher.register(watcher)
return true, watcher, nil
})

View File

@ -89,6 +89,12 @@ func NewFake() *FakeWatcher {
}
}
func NewFakeWithChanSize(size int) *FakeWatcher {
return &FakeWatcher{
result: make(chan Event, size),
}
}
// Stop implements Interface.Stop().
func (f *FakeWatcher) Stop() {
f.Lock()
@ -99,6 +105,12 @@ func (f *FakeWatcher) Stop() {
}
}
func (f *FakeWatcher) IsStopped() bool {
f.Lock()
defer f.Unlock()
return f.Stopped
}
// Reset prepares the watcher to be reused.
func (f *FakeWatcher) Reset() {
f.Lock()