mirror of https://github.com/k3s-io/k3s
Merge pull request #38669 from caesarxuchao/fix-cacher
Automatic merge from submit-queue Fix leaking goroutines in watch cache Fixes #38670 which causes leaked goroutines in the API server. The root cause is: if the WatchServer timeout [fires](https://github.com/kubernetes/kubernetes/blob/master/pkg/apiserver/watch.go#L187-L188) when the `result` channel is full, `sendWatchCacheEvent` will be blocked on the write to the `result` even though `cacheWatch.Stop()` is [called](https://github.com/kubernetes/kubernetes/blob/master/pkg/apiserver/watch.go#L171), because WatchServer stops consuming the `result` channel after the timeout. Thanks to @krousey for identifying the problem. cc @mml @yujuhongpull/6/head
commit
f02c8e47bc
|
@ -48,6 +48,7 @@ go_library(
|
|||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = [
|
||||
"cacher_whitebox_test.go",
|
||||
"selection_predicate_test.go",
|
||||
"time_budget_test.go",
|
||||
"util_test.go",
|
||||
|
|
|
@ -750,6 +750,7 @@ type cacheWatcher struct {
|
|||
sync.Mutex
|
||||
input chan watchCacheEvent
|
||||
result chan watch.Event
|
||||
done chan struct{}
|
||||
filter watchFilterFunc
|
||||
stopped bool
|
||||
forget func(bool)
|
||||
|
@ -759,6 +760,7 @@ func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []watchCac
|
|||
watcher := &cacheWatcher{
|
||||
input: make(chan watchCacheEvent, chanSize),
|
||||
result: make(chan watch.Event, chanSize),
|
||||
done: make(chan struct{}),
|
||||
filter: filter,
|
||||
stopped: false,
|
||||
forget: forget,
|
||||
|
@ -783,6 +785,7 @@ func (c *cacheWatcher) stop() {
|
|||
defer c.Unlock()
|
||||
if !c.stopped {
|
||||
c.stopped = true
|
||||
close(c.done)
|
||||
close(c.input)
|
||||
}
|
||||
}
|
||||
|
@ -847,13 +850,19 @@ func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
|
|||
glog.Errorf("unexpected copy error: %v", err)
|
||||
return
|
||||
}
|
||||
var watchEvent watch.Event
|
||||
switch {
|
||||
case curObjPasses && !oldObjPasses:
|
||||
c.result <- watch.Event{Type: watch.Added, Object: object}
|
||||
watchEvent = watch.Event{Type: watch.Added, Object: object}
|
||||
case curObjPasses && oldObjPasses:
|
||||
c.result <- watch.Event{Type: watch.Modified, Object: object}
|
||||
watchEvent = watch.Event{Type: watch.Modified, Object: object}
|
||||
case !curObjPasses && oldObjPasses:
|
||||
c.result <- watch.Event{Type: watch.Deleted, Object: object}
|
||||
watchEvent = watch.Event{Type: watch.Deleted, Object: object}
|
||||
}
|
||||
select {
|
||||
case c.result <- watchEvent:
|
||||
// don't block on c.result if c.done is closed
|
||||
case <-c.done:
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,56 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package storage
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/fields"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
)
|
||||
|
||||
// verifies the cacheWatcher.process goroutine is properly cleaned up even if
|
||||
// the writes to cacheWatcher.result channel is blocked.
|
||||
func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
|
||||
var lock sync.RWMutex
|
||||
count := 0
|
||||
filter := func(string, labels.Set, fields.Set) bool { return true }
|
||||
forget := func(bool) {
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
count++
|
||||
}
|
||||
initEvents := []watchCacheEvent{
|
||||
{Object: &api.Pod{}},
|
||||
{Object: &api.Pod{}},
|
||||
}
|
||||
// set the size of the buffer of w.result to 0, so that the writes to
|
||||
// w.result is blocked.
|
||||
w := newCacheWatcher(0, 0, initEvents, filter, forget)
|
||||
w.Stop()
|
||||
if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
|
||||
lock.RLock()
|
||||
defer lock.RUnlock()
|
||||
return count == 2, nil
|
||||
}); err != nil {
|
||||
t.Fatalf("expected forget() to be called twice, because sendWatchCacheEvent should not be blocked by the result channel: %v", err)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue