Merge pull request #16947 from lavalamp/wojtek-t-timeout_watchers

Auto commit by PR queue bot
pull/6/head
k8s-merge-robot 2015-11-07 00:31:13 -08:00
commit c5ca43f4bb
2 changed files with 45 additions and 8 deletions

View File

@ -22,6 +22,7 @@ import (
"strconv"
"strings"
"sync"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
@ -352,10 +353,12 @@ func (c *Cacher) terminateAllWatchers() {
}
}
func forgetWatcher(c *Cacher, index int) func() {
return func() {
c.Lock()
defer c.Unlock()
func forgetWatcher(c *Cacher, index int) func(bool) {
return func(lock bool) {
if lock {
c.Lock()
defer c.Unlock()
}
// It's possible that the watcher is already not in the map (e.g. in case of
// simulaneous Stop() and terminateAllWatchers(), but it doesn't break anything.
delete(c.watchers, index)
@ -428,10 +431,10 @@ type cacheWatcher struct {
result chan watch.Event
filter FilterFunc
stopped bool
forget func()
forget func(bool)
}
func newCacheWatcher(initEvents []watchCacheEvent, filter FilterFunc, forget func()) *cacheWatcher {
func newCacheWatcher(initEvents []watchCacheEvent, filter FilterFunc, forget func(bool)) *cacheWatcher {
watcher := &cacheWatcher{
input: make(chan watchCacheEvent, 10),
result: make(chan watch.Event, 10),
@ -450,7 +453,7 @@ func (c *cacheWatcher) ResultChan() <-chan watch.Event {
// Implements watch.Interface.
func (c *cacheWatcher) Stop() {
c.forget()
c.forget(true)
c.stop()
}
@ -464,7 +467,15 @@ func (c *cacheWatcher) stop() {
}
func (c *cacheWatcher) add(event watchCacheEvent) {
c.input <- event
select {
case c.input <- event:
case <-time.After(5 * time.Second):
// This means that we couldn't send event to that watcher.
// Since we don't want to blockin on it infinitely,
// we simply terminate it.
c.forget(false)
c.stop()
}
}
func (c *cacheWatcher) sendWatchCacheEvent(event watchCacheEvent) {

View File

@ -223,6 +223,32 @@ func TestWatch(t *testing.T) {
verifyWatchEvent(t, nowWatcher, watch.Modified, podFooBis)
}
func TestWatcherTimeout(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
defer server.Terminate(t)
cacher := newTestCacher(etcdStorage)
// Create a watcher that will not be reading any result.
watcher, err := cacher.WatchList(context.TODO(), "pods/ns", 1, storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer watcher.Stop()
// Create a second watcher that will be reading result.
readingWatcher, err := cacher.WatchList(context.TODO(), "pods/ns", 1, storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer readingWatcher.Stop()
for i := 1; i <= 22; i++ {
pod := makeTestPod(strconv.Itoa(i))
_ = updatePod(t, etcdStorage, pod, nil)
verifyWatchEvent(t, readingWatcher, watch.Added, pod)
}
}
func TestFiltering(t *testing.T) {
server, etcdStorage := newEtcdTestStorage(t, testapi.Default.Codec(), etcdtest.PathPrefix())
defer server.Terminate(t)