mirror of https://github.com/k3s-io/k3s
add object type to cacheWatcher
parent
f0a4672e6d
commit
6c6d472039
|
@ -443,7 +443,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
|
|||
// given that memory allocation may trigger GC and block the thread.
|
||||
// Also note that emptyFunc is a placeholder, until we will be able
|
||||
// to compute watcher.forget function (which has to happen under lock).
|
||||
watcher := newCacheWatcher(chanSize, filterWithAttrsFunction(key, pred), emptyFunc, c.versioner, deadline, pred.AllowWatchBookmarks)
|
||||
watcher := newCacheWatcher(chanSize, filterWithAttrsFunction(key, pred), emptyFunc, c.versioner, deadline, pred.AllowWatchBookmarks, c.objectType)
|
||||
|
||||
// We explicitly use thread unsafe version and do locking ourself to ensure that
|
||||
// no new events will be processed in the meantime. The watchCache will be unlocked
|
||||
|
@ -1033,9 +1033,11 @@ type cacheWatcher struct {
|
|||
// save it here to send bookmark events before that.
|
||||
deadline time.Time
|
||||
allowWatchBookmarks bool
|
||||
// Object type of the cache watcher interests
|
||||
objectType reflect.Type
|
||||
}
|
||||
|
||||
func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(), versioner storage.Versioner, deadline time.Time, allowWatchBookmarks bool) *cacheWatcher {
|
||||
func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(), versioner storage.Versioner, deadline time.Time, allowWatchBookmarks bool, objectType reflect.Type) *cacheWatcher {
|
||||
return &cacheWatcher{
|
||||
input: make(chan *watchCacheEvent, chanSize),
|
||||
result: make(chan watch.Event, chanSize),
|
||||
|
@ -1046,6 +1048,7 @@ func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(), ve
|
|||
versioner: versioner,
|
||||
deadline: deadline,
|
||||
allowWatchBookmarks: allowWatchBookmarks,
|
||||
objectType: objectType,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1208,16 +1211,12 @@ func (c *cacheWatcher) process(ctx context.Context, initEvents []*watchCacheEven
|
|||
for _, event := range initEvents {
|
||||
c.sendWatchCacheEvent(event)
|
||||
}
|
||||
objType := c.objectType.String()
|
||||
if len(initEvents) > 0 {
|
||||
objType := reflect.TypeOf(initEvents[0].Object).String()
|
||||
initCounter.WithLabelValues(objType).Add(float64(len(initEvents)))
|
||||
}
|
||||
processingTime := time.Since(startTime)
|
||||
if processingTime > initProcessThreshold {
|
||||
objType := "<null>"
|
||||
if len(initEvents) > 0 {
|
||||
objType = reflect.TypeOf(initEvents[0].Object).String()
|
||||
}
|
||||
klog.V(2).Infof("processing %d initEvents of %s took %v", len(initEvents), objType, processingTime)
|
||||
}
|
||||
|
||||
|
|
|
@ -45,6 +45,10 @@ import (
|
|||
utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing"
|
||||
)
|
||||
|
||||
var (
|
||||
objectType = reflect.TypeOf(&v1.Pod{})
|
||||
)
|
||||
|
||||
// verifies the cacheWatcher.process goroutine is properly cleaned up even if
|
||||
// the writes to cacheWatcher.result channel is blocked.
|
||||
func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
|
||||
|
@ -67,7 +71,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) {
|
|||
}
|
||||
// set the size of the buffer of w.result to 0, so that the writes to
|
||||
// w.result is blocked.
|
||||
w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false)
|
||||
w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType)
|
||||
go w.process(context.Background(), initEvents, 0)
|
||||
w.Stop()
|
||||
if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
|
||||
|
@ -187,7 +191,7 @@ TestCase:
|
|||
testCase.events[j].ResourceVersion = uint64(j) + 1
|
||||
}
|
||||
|
||||
w := newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false)
|
||||
w := newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType)
|
||||
go w.process(context.Background(), testCase.events, 0)
|
||||
|
||||
ch := w.ResultChan()
|
||||
|
@ -466,7 +470,7 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) {
|
|||
// timeout to zero and run the Stop goroutine concurrently.
|
||||
// May sure that the watch will not be blocked on Stop.
|
||||
for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ {
|
||||
w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false)
|
||||
w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType)
|
||||
go w.Stop()
|
||||
select {
|
||||
case <-done:
|
||||
|
@ -478,7 +482,7 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) {
|
|||
deadline := time.Now().Add(time.Hour)
|
||||
// After that, verifies the cacheWatcher.process goroutine works correctly.
|
||||
for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ {
|
||||
w = newCacheWatcher(2, filter, emptyFunc, testVersioner{}, deadline, false)
|
||||
w = newCacheWatcher(2, filter, emptyFunc, testVersioner{}, deadline, false, objectType)
|
||||
w.input <- &watchCacheEvent{Object: &v1.Pod{}, ResourceVersion: uint64(i + 1)}
|
||||
ctx, _ := context.WithDeadline(context.Background(), deadline)
|
||||
go w.process(ctx, nil, 0)
|
||||
|
@ -498,7 +502,7 @@ func TestTimeBucketWatchersBasic(t *testing.T) {
|
|||
forget := func() {}
|
||||
|
||||
newWatcher := func(deadline time.Time) *cacheWatcher {
|
||||
return newCacheWatcher(0, filter, forget, testVersioner{}, deadline, true)
|
||||
return newCacheWatcher(0, filter, forget, testVersioner{}, deadline, true, objectType)
|
||||
}
|
||||
|
||||
clock := clock.NewFakeClock(time.Now())
|
||||
|
|
Loading…
Reference in New Issue