mirror of https://github.com/k3s-io/k3s
Fix RunUntil and stop leaking watch channel on etcd error
parent
6ef0f8c3c5
commit
5234c2dc82
|
@ -85,13 +85,13 @@ func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyn
|
||||||
// Run starts a watch and handles watch events. Will restart the watch if it is closed.
|
// Run starts a watch and handles watch events. Will restart the watch if it is closed.
|
||||||
// Run starts a goroutine and returns immediately.
|
// Run starts a goroutine and returns immediately.
|
||||||
func (r *Reflector) Run() {
|
func (r *Reflector) Run() {
|
||||||
go util.Forever(func() { r.listAndWatch() }, r.period)
|
go util.Forever(func() { r.listAndWatch(util.NeverStop) }, r.period)
|
||||||
}
|
}
|
||||||
|
|
||||||
// RunUntil starts a watch and handles watch events. Will restart the watch if it is closed.
|
// RunUntil starts a watch and handles watch events. Will restart the watch if it is closed.
|
||||||
// RunUntil starts a goroutine and returns immediately. It will exit when stopCh is closed.
|
// RunUntil starts a goroutine and returns immediately. It will exit when stopCh is closed.
|
||||||
func (r *Reflector) RunUntil(stopCh <-chan struct{}) {
|
func (r *Reflector) RunUntil(stopCh <-chan struct{}) {
|
||||||
go util.Until(func() { r.listAndWatch() }, r.period, stopCh)
|
go util.Until(func() { r.listAndWatch(stopCh) }, r.period, stopCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -100,6 +100,10 @@ var (
|
||||||
|
|
||||||
// Used to indicate that watching stopped so that a resync could happen.
|
// Used to indicate that watching stopped so that a resync could happen.
|
||||||
errorResyncRequested = errors.New("resync channel fired")
|
errorResyncRequested = errors.New("resync channel fired")
|
||||||
|
|
||||||
|
// Used to indicate that watching stopped because of a signal from the stop
|
||||||
|
// channel passed in from a client of the reflector.
|
||||||
|
errorStopRequested = errors.New("Stop requested")
|
||||||
)
|
)
|
||||||
|
|
||||||
// resyncChan returns a channel which will receive something when a resync is required.
|
// resyncChan returns a channel which will receive something when a resync is required.
|
||||||
|
@ -110,9 +114,9 @@ func (r *Reflector) resyncChan() <-chan time.Time {
|
||||||
return time.After(r.resyncPeriod)
|
return time.After(r.resyncPeriod)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Reflector) listAndWatch() {
|
func (r *Reflector) listAndWatch(stopCh <-chan struct{}) {
|
||||||
var resourceVersion string
|
var resourceVersion string
|
||||||
exitWatch := r.resyncChan()
|
resyncCh := r.resyncChan()
|
||||||
|
|
||||||
list, err := r.listerWatcher.List()
|
list, err := r.listerWatcher.List()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -149,9 +153,9 @@ func (r *Reflector) listAndWatch() {
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err := r.watchHandler(w, &resourceVersion, exitWatch); err != nil {
|
if err := r.watchHandler(w, &resourceVersion, resyncCh, stopCh); err != nil {
|
||||||
if err != errorResyncRequested {
|
if err != errorResyncRequested {
|
||||||
glog.Errorf("watch of %v ended with error: %v", r.expectedType, err)
|
glog.Errorf("watch of %v ended with: %v", r.expectedType, err)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -169,14 +173,20 @@ func (r *Reflector) syncWith(items []runtime.Object) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// watchHandler watches w and keeps *resourceVersion up to date.
|
// watchHandler watches w and keeps *resourceVersion up to date.
|
||||||
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, exitWatch <-chan time.Time) error {
|
func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, resyncCh <-chan time.Time, stopCh <-chan struct{}) error {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
eventCount := 0
|
eventCount := 0
|
||||||
|
|
||||||
|
// Stopping the watcher should be idempotent and if we return from this function there's no way
|
||||||
|
// we're coming back in with the same watch interface.
|
||||||
|
defer w.Stop()
|
||||||
|
|
||||||
loop:
|
loop:
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-exitWatch:
|
case <-stopCh:
|
||||||
w.Stop()
|
return errorStopRequested
|
||||||
|
case <-resyncCh:
|
||||||
return errorResyncRequested
|
return errorResyncRequested
|
||||||
case event, ok := <-w.ResultChan():
|
case event, ok := <-w.ResultChan():
|
||||||
if !ok {
|
if !ok {
|
||||||
|
|
|
@ -24,6 +24,7 @@ import (
|
||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -37,6 +38,60 @@ func (t *testLW) Watch(resourceVersion string) (watch.Interface, error) {
|
||||||
return t.WatchFunc(resourceVersion)
|
return t.WatchFunc(resourceVersion)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCloseWatchChannelOnError(t *testing.T) {
|
||||||
|
r := NewReflector(&testLW{}, &api.Pod{}, NewStore(MetaNamespaceKeyFunc), 0)
|
||||||
|
pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "bar"}}
|
||||||
|
fw := watch.NewFake()
|
||||||
|
r.listerWatcher = &testLW{
|
||||||
|
WatchFunc: func(rv string) (watch.Interface, error) {
|
||||||
|
return fw, nil
|
||||||
|
},
|
||||||
|
ListFunc: func() (runtime.Object, error) {
|
||||||
|
return &api.PodList{ListMeta: api.ListMeta{ResourceVersion: "1"}}, nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
go r.listAndWatch(util.NeverStop)
|
||||||
|
fw.Error(pod)
|
||||||
|
select {
|
||||||
|
case _, ok := <-fw.ResultChan():
|
||||||
|
if ok {
|
||||||
|
t.Errorf("Watch channel left open after cancellation")
|
||||||
|
}
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
t.Errorf("the cancellation is at least 99 milliseconds late")
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRunUntil(t *testing.T) {
|
||||||
|
stopCh := make(chan struct{})
|
||||||
|
store := NewStore(MetaNamespaceKeyFunc)
|
||||||
|
r := NewReflector(&testLW{}, &api.Pod{}, store, 0)
|
||||||
|
fw := watch.NewFake()
|
||||||
|
r.listerWatcher = &testLW{
|
||||||
|
WatchFunc: func(rv string) (watch.Interface, error) {
|
||||||
|
return fw, nil
|
||||||
|
},
|
||||||
|
ListFunc: func() (runtime.Object, error) {
|
||||||
|
return &api.PodList{ListMeta: api.ListMeta{ResourceVersion: "1"}}, nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
r.RunUntil(stopCh)
|
||||||
|
// Synchronously add a dummy pod into the watch channel so we
|
||||||
|
// know the RunUntil go routine is in the watch handler.
|
||||||
|
fw.Add(&api.Pod{ObjectMeta: api.ObjectMeta{Name: "bar"}})
|
||||||
|
stopCh <- struct{}{}
|
||||||
|
select {
|
||||||
|
case _, ok := <-fw.ResultChan():
|
||||||
|
if ok {
|
||||||
|
t.Errorf("Watch channel left open after stopping the watch")
|
||||||
|
}
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
t.Errorf("the cancellation is at least 99 milliseconds late")
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestReflector_resyncChan(t *testing.T) {
|
func TestReflector_resyncChan(t *testing.T) {
|
||||||
s := NewStore(MetaNamespaceKeyFunc)
|
s := NewStore(MetaNamespaceKeyFunc)
|
||||||
g := NewReflector(&testLW{}, &api.Pod{}, s, time.Millisecond)
|
g := NewReflector(&testLW{}, &api.Pod{}, s, time.Millisecond)
|
||||||
|
@ -57,7 +112,7 @@ func TestReflector_watchHandlerError(t *testing.T) {
|
||||||
fw.Stop()
|
fw.Stop()
|
||||||
}()
|
}()
|
||||||
var resumeRV string
|
var resumeRV string
|
||||||
err := g.watchHandler(fw, &resumeRV, neverExitWatch)
|
err := g.watchHandler(fw, &resumeRV, neverExitWatch, util.NeverStop)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Errorf("unexpected non-error")
|
t.Errorf("unexpected non-error")
|
||||||
}
|
}
|
||||||
|
@ -77,7 +132,7 @@ func TestReflector_watchHandler(t *testing.T) {
|
||||||
fw.Stop()
|
fw.Stop()
|
||||||
}()
|
}()
|
||||||
var resumeRV string
|
var resumeRV string
|
||||||
err := g.watchHandler(fw, &resumeRV, neverExitWatch)
|
err := g.watchHandler(fw, &resumeRV, neverExitWatch, util.NeverStop)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error %v", err)
|
t.Errorf("unexpected error %v", err)
|
||||||
}
|
}
|
||||||
|
@ -126,12 +181,25 @@ func TestReflector_watchHandlerTimeout(t *testing.T) {
|
||||||
var resumeRV string
|
var resumeRV string
|
||||||
exit := make(chan time.Time, 1)
|
exit := make(chan time.Time, 1)
|
||||||
exit <- time.Now()
|
exit <- time.Now()
|
||||||
err := g.watchHandler(fw, &resumeRV, exit)
|
err := g.watchHandler(fw, &resumeRV, exit, util.NeverStop)
|
||||||
if err != errorResyncRequested {
|
if err != errorResyncRequested {
|
||||||
t.Errorf("expected timeout error, but got %q", err)
|
t.Errorf("expected timeout error, but got %q", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestReflectorStopWatch(t *testing.T) {
|
||||||
|
s := NewStore(MetaNamespaceKeyFunc)
|
||||||
|
g := NewReflector(&testLW{}, &api.Pod{}, s, 0)
|
||||||
|
fw := watch.NewFake()
|
||||||
|
var resumeRV string
|
||||||
|
stopWatch := make(chan struct{}, 1)
|
||||||
|
stopWatch <- struct{}{}
|
||||||
|
err := g.watchHandler(fw, &resumeRV, neverExitWatch, stopWatch)
|
||||||
|
if err != errorStopRequested {
|
||||||
|
t.Errorf("expected stop error, got %q", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestReflector_listAndWatch(t *testing.T) {
|
func TestReflector_listAndWatch(t *testing.T) {
|
||||||
createdFakes := make(chan *watch.FakeWatcher)
|
createdFakes := make(chan *watch.FakeWatcher)
|
||||||
|
|
||||||
|
@ -157,7 +225,7 @@ func TestReflector_listAndWatch(t *testing.T) {
|
||||||
}
|
}
|
||||||
s := NewFIFO(MetaNamespaceKeyFunc)
|
s := NewFIFO(MetaNamespaceKeyFunc)
|
||||||
r := NewReflector(lw, &api.Pod{}, s, 0)
|
r := NewReflector(lw, &api.Pod{}, s, 0)
|
||||||
go r.listAndWatch()
|
go r.listAndWatch(util.NeverStop)
|
||||||
|
|
||||||
ids := []string{"foo", "bar", "baz", "qux", "zoo"}
|
ids := []string{"foo", "bar", "baz", "qux", "zoo"}
|
||||||
var fw *watch.FakeWatcher
|
var fw *watch.FakeWatcher
|
||||||
|
@ -274,6 +342,6 @@ func TestReflector_listAndWatchWithErrors(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
r := NewReflector(lw, &api.Pod{}, s, 0)
|
r := NewReflector(lw, &api.Pod{}, s, 0)
|
||||||
r.listAndWatch()
|
r.listAndWatch(util.NeverStop)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue