mirror of https://github.com/k3s-io/k3s
1275 lines
42 KiB
Go
1275 lines
42 KiB
Go
/*
|
|
Copyright 2015 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package cacher
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net/http"
|
|
"reflect"
|
|
"sync"
|
|
"time"
|
|
|
|
"k8s.io/klog"
|
|
|
|
"k8s.io/apimachinery/pkg/api/errors"
|
|
"k8s.io/apimachinery/pkg/api/meta"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/conversion"
|
|
"k8s.io/apimachinery/pkg/fields"
|
|
"k8s.io/apimachinery/pkg/labels"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/util/clock"
|
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
"k8s.io/apimachinery/pkg/watch"
|
|
"k8s.io/apiserver/pkg/features"
|
|
"k8s.io/apiserver/pkg/storage"
|
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
|
"k8s.io/client-go/tools/cache"
|
|
utiltrace "k8s.io/utils/trace"
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
)
|
|
|
|
var (
|
|
initCounter = prometheus.NewCounterVec(
|
|
prometheus.CounterOpts{
|
|
Name: "apiserver_init_events_total",
|
|
Help: "Counter of init events processed in watchcache broken by resource type",
|
|
},
|
|
[]string{"resource"},
|
|
)
|
|
emptyFunc = func() {}
|
|
)
|
|
|
|
const (
|
|
// storageWatchListPageSize is the cacher's request chunk size of
|
|
// initial and resync watch lists to storage.
|
|
storageWatchListPageSize = int64(10000)
|
|
)
|
|
|
|
func init() {
|
|
prometheus.MustRegister(initCounter)
|
|
}
|
|
|
|
// Config contains the configuration for a given Cache.
|
|
type Config struct {
|
|
// Maximum size of the history cached in memory.
|
|
CacheCapacity int
|
|
|
|
// An underlying storage.Interface.
|
|
Storage storage.Interface
|
|
|
|
// An underlying storage.Versioner.
|
|
Versioner storage.Versioner
|
|
|
|
// The Cache will be caching objects of a given Type and assumes that they
|
|
// are all stored under ResourcePrefix directory in the underlying database.
|
|
ResourcePrefix string
|
|
|
|
// KeyFunc is used to get a key in the underlying storage for a given object.
|
|
KeyFunc func(runtime.Object) (string, error)
|
|
|
|
// GetAttrsFunc is used to get object labels, fields
|
|
GetAttrsFunc func(runtime.Object) (label labels.Set, field fields.Set, err error)
|
|
|
|
// TriggerPublisherFunc is used for optimizing amount of watchers that
|
|
// needs to process an incoming event.
|
|
TriggerPublisherFunc storage.TriggerPublisherFunc
|
|
|
|
// NewFunc is a function that creates new empty object storing a object of type Type.
|
|
NewFunc func() runtime.Object
|
|
|
|
// NewList is a function that creates new empty object storing a list of
|
|
// objects of type Type.
|
|
NewListFunc func() runtime.Object
|
|
|
|
Codec runtime.Codec
|
|
}
|
|
|
|
type watchersMap map[int]*cacheWatcher
|
|
|
|
func (wm watchersMap) addWatcher(w *cacheWatcher, number int) {
|
|
wm[number] = w
|
|
}
|
|
|
|
func (wm watchersMap) deleteWatcher(number int, done func(*cacheWatcher)) {
|
|
if watcher, ok := wm[number]; ok {
|
|
delete(wm, number)
|
|
done(watcher)
|
|
}
|
|
}
|
|
|
|
func (wm watchersMap) terminateAll(done func(*cacheWatcher)) {
|
|
for key, watcher := range wm {
|
|
delete(wm, key)
|
|
done(watcher)
|
|
}
|
|
}
|
|
|
|
type indexedWatchers struct {
|
|
allWatchers watchersMap
|
|
valueWatchers map[string]watchersMap
|
|
}
|
|
|
|
func (i *indexedWatchers) addWatcher(w *cacheWatcher, number int, value string, supported bool) {
|
|
if supported {
|
|
if _, ok := i.valueWatchers[value]; !ok {
|
|
i.valueWatchers[value] = watchersMap{}
|
|
}
|
|
i.valueWatchers[value].addWatcher(w, number)
|
|
} else {
|
|
i.allWatchers.addWatcher(w, number)
|
|
}
|
|
}
|
|
|
|
func (i *indexedWatchers) deleteWatcher(number int, value string, supported bool, done func(*cacheWatcher)) {
|
|
if supported {
|
|
i.valueWatchers[value].deleteWatcher(number, done)
|
|
if len(i.valueWatchers[value]) == 0 {
|
|
delete(i.valueWatchers, value)
|
|
}
|
|
} else {
|
|
i.allWatchers.deleteWatcher(number, done)
|
|
}
|
|
}
|
|
|
|
func (i *indexedWatchers) terminateAll(objectType reflect.Type, done func(*cacheWatcher)) {
|
|
if len(i.allWatchers) > 0 || len(i.valueWatchers) > 0 {
|
|
klog.Warningf("Terminating all watchers from cacher %v", objectType)
|
|
}
|
|
i.allWatchers.terminateAll(done)
|
|
for index, watchers := range i.valueWatchers {
|
|
watchers.terminateAll(done)
|
|
delete(i.valueWatchers, index)
|
|
}
|
|
}
|
|
|
|
// As we don't need a high precision here, we keep all watchers timeout within a
|
|
// second in a bucket, and pop up them once at the timeout. To be more specific,
|
|
// if you set fire time at X, you can get the bookmark within (X-1,X+1) period.
|
|
// This is NOT thread-safe.
|
|
type watcherBookmarkTimeBuckets struct {
|
|
watchersBuckets map[int64][]*cacheWatcher
|
|
startBucketID int64
|
|
clock clock.Clock
|
|
}
|
|
|
|
func newTimeBucketWatchers(clock clock.Clock) *watcherBookmarkTimeBuckets {
|
|
return &watcherBookmarkTimeBuckets{
|
|
watchersBuckets: make(map[int64][]*cacheWatcher),
|
|
startBucketID: clock.Now().Unix(),
|
|
clock: clock,
|
|
}
|
|
}
|
|
|
|
// adds a watcher to the bucket, if the deadline is before the start, it will be
|
|
// added to the first one.
|
|
func (t *watcherBookmarkTimeBuckets) addWatcher(w *cacheWatcher) bool {
|
|
nextTime, ok := w.nextBookmarkTime(t.clock.Now())
|
|
if !ok {
|
|
return false
|
|
}
|
|
bucketID := nextTime.Unix()
|
|
if bucketID < t.startBucketID {
|
|
bucketID = t.startBucketID
|
|
}
|
|
watchers, _ := t.watchersBuckets[bucketID]
|
|
t.watchersBuckets[bucketID] = append(watchers, w)
|
|
return true
|
|
}
|
|
|
|
func (t *watcherBookmarkTimeBuckets) popExpiredWatchers() [][]*cacheWatcher {
|
|
currentBucketID := t.clock.Now().Unix()
|
|
// There should be one or two elements in almost all cases
|
|
expiredWatchers := make([][]*cacheWatcher, 0, 2)
|
|
for ; t.startBucketID <= currentBucketID; t.startBucketID++ {
|
|
if watchers, ok := t.watchersBuckets[t.startBucketID]; ok {
|
|
delete(t.watchersBuckets, t.startBucketID)
|
|
expiredWatchers = append(expiredWatchers, watchers)
|
|
}
|
|
}
|
|
return expiredWatchers
|
|
}
|
|
|
|
type filterWithAttrsFunc func(key string, l labels.Set, f fields.Set) bool
|
|
|
|
// Cacher is responsible for serving WATCH and LIST requests for a given
|
|
// resource from its internal cache and updating its cache in the background
|
|
// based on the underlying storage contents.
|
|
// Cacher implements storage.Interface (although most of the calls are just
|
|
// delegated to the underlying storage).
|
|
type Cacher struct {
|
|
// HighWaterMarks for performance debugging.
|
|
// Important: Since HighWaterMark is using sync/atomic, it has to be at the top of the struct due to a bug on 32-bit platforms
|
|
// See: https://golang.org/pkg/sync/atomic/ for more information
|
|
incomingHWM storage.HighWaterMark
|
|
// Incoming events that should be dispatched to watchers.
|
|
incoming chan watchCacheEvent
|
|
|
|
sync.RWMutex
|
|
|
|
// Before accessing the cacher's cache, wait for the ready to be ok.
|
|
// This is necessary to prevent users from accessing structures that are
|
|
// uninitialized or are being repopulated right now.
|
|
// ready needs to be set to false when the cacher is paused or stopped.
|
|
// ready needs to be set to true when the cacher is ready to use after
|
|
// initialization.
|
|
ready *ready
|
|
|
|
// Underlying storage.Interface.
|
|
storage storage.Interface
|
|
|
|
// Expected type of objects in the underlying cache.
|
|
objectType reflect.Type
|
|
|
|
// "sliding window" of recent changes of objects and the current state.
|
|
watchCache *watchCache
|
|
reflector *cache.Reflector
|
|
|
|
// Versioner is used to handle resource versions.
|
|
versioner storage.Versioner
|
|
|
|
// newFunc is a function that creates new empty object storing a object of type Type.
|
|
newFunc func() runtime.Object
|
|
|
|
// triggerFunc is used for optimizing amount of watchers that needs to process
|
|
// an incoming event.
|
|
triggerFunc storage.TriggerPublisherFunc
|
|
// watchers is mapping from the value of trigger function that a
|
|
// watcher is interested into the watchers
|
|
watcherIdx int
|
|
watchers indexedWatchers
|
|
|
|
// Defines a time budget that can be spend on waiting for not-ready watchers
|
|
// while dispatching event before shutting them down.
|
|
dispatchTimeoutBudget *timeBudget
|
|
|
|
// Handling graceful termination.
|
|
stopLock sync.RWMutex
|
|
stopped bool
|
|
stopCh chan struct{}
|
|
stopWg sync.WaitGroup
|
|
|
|
clock clock.Clock
|
|
// timer is used to avoid unnecessary allocations in underlying watchers.
|
|
timer *time.Timer
|
|
|
|
// dispatching determines whether there is currently dispatching of
|
|
// any event in flight.
|
|
dispatching bool
|
|
// watchersBuffer is a list of watchers potentially interested in currently
|
|
// dispatched event.
|
|
watchersBuffer []*cacheWatcher
|
|
// watchersToStop is a list of watchers that were supposed to be stopped
|
|
// during current dispatching, but stopping was deferred to the end of
|
|
// dispatching that event to avoid race with closing channels in watchers.
|
|
watchersToStop []*cacheWatcher
|
|
// Maintain a timeout queue to send the bookmark event before the watcher times out.
|
|
bookmarkWatchers *watcherBookmarkTimeBuckets
|
|
// watchBookmark feature-gate
|
|
watchBookmarkEnabled bool
|
|
}
|
|
|
|
// NewCacherFromConfig creates a new Cacher responsible for servicing WATCH and LIST requests from
|
|
// its internal cache and updating its cache in the background based on the
|
|
// given configuration.
|
|
func NewCacherFromConfig(config Config) *Cacher {
|
|
stopCh := make(chan struct{})
|
|
obj := config.NewFunc()
|
|
// Give this error when it is constructed rather than when you get the
|
|
// first watch item, because it's much easier to track down that way.
|
|
if err := runtime.CheckCodec(config.Codec, obj); err != nil {
|
|
panic("storage codec doesn't seem to match given type: " + err.Error())
|
|
}
|
|
|
|
clock := clock.RealClock{}
|
|
cacher := &Cacher{
|
|
ready: newReady(),
|
|
storage: config.Storage,
|
|
objectType: reflect.TypeOf(obj),
|
|
versioner: config.Versioner,
|
|
newFunc: config.NewFunc,
|
|
triggerFunc: config.TriggerPublisherFunc,
|
|
watcherIdx: 0,
|
|
watchers: indexedWatchers{
|
|
allWatchers: make(map[int]*cacheWatcher),
|
|
valueWatchers: make(map[string]watchersMap),
|
|
},
|
|
// TODO: Figure out the correct value for the buffer size.
|
|
incoming: make(chan watchCacheEvent, 100),
|
|
dispatchTimeoutBudget: newTimeBudget(stopCh),
|
|
// We need to (potentially) stop both:
|
|
// - wait.Until go-routine
|
|
// - reflector.ListAndWatch
|
|
// and there are no guarantees on the order that they will stop.
|
|
// So we will be simply closing the channel, and synchronizing on the WaitGroup.
|
|
stopCh: stopCh,
|
|
clock: clock,
|
|
timer: time.NewTimer(time.Duration(0)),
|
|
bookmarkWatchers: newTimeBucketWatchers(clock),
|
|
watchBookmarkEnabled: utilfeature.DefaultFeatureGate.Enabled(features.WatchBookmark),
|
|
}
|
|
|
|
// Ensure that timer is stopped.
|
|
if !cacher.timer.Stop() {
|
|
// Consume triggered (but not yet received) timer event
|
|
// so that future reuse does not get a spurious timeout.
|
|
<-cacher.timer.C
|
|
}
|
|
|
|
watchCache := newWatchCache(
|
|
config.CacheCapacity, config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner)
|
|
listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
|
|
reflectorName := "storage/cacher.go:" + config.ResourcePrefix
|
|
|
|
reflector := cache.NewNamedReflector(reflectorName, listerWatcher, obj, watchCache, 0)
|
|
// Configure reflector's pager to for an appropriate pagination chunk size for fetching data from
|
|
// storage. The pager falls back to full list if paginated list calls fail due to an "Expired" error.
|
|
reflector.WatchListPageSize = storageWatchListPageSize
|
|
|
|
cacher.watchCache = watchCache
|
|
cacher.reflector = reflector
|
|
|
|
go cacher.dispatchEvents()
|
|
|
|
cacher.stopWg.Add(1)
|
|
go func() {
|
|
defer cacher.stopWg.Done()
|
|
defer cacher.terminateAllWatchers()
|
|
wait.Until(
|
|
func() {
|
|
if !cacher.isStopped() {
|
|
cacher.startCaching(stopCh)
|
|
}
|
|
}, time.Second, stopCh,
|
|
)
|
|
}()
|
|
|
|
return cacher
|
|
}
|
|
|
|
func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
|
|
// The 'usable' lock is always 'RLock'able when it is safe to use the cache.
|
|
// It is safe to use the cache after a successful list until a disconnection.
|
|
// We start with usable (write) locked. The below OnReplace function will
|
|
// unlock it after a successful list. The below defer will then re-lock
|
|
// it when this function exits (always due to disconnection), only if
|
|
// we actually got a successful list. This cycle will repeat as needed.
|
|
successfulList := false
|
|
c.watchCache.SetOnReplace(func() {
|
|
successfulList = true
|
|
c.ready.set(true)
|
|
})
|
|
defer func() {
|
|
if successfulList {
|
|
c.ready.set(false)
|
|
}
|
|
}()
|
|
|
|
c.terminateAllWatchers()
|
|
// Note that since onReplace may be not called due to errors, we explicitly
|
|
// need to retry it on errors under lock.
|
|
// Also note that startCaching is called in a loop, so there's no need
|
|
// to have another loop here.
|
|
if err := c.reflector.ListAndWatch(stopChannel); err != nil {
|
|
klog.Errorf("unexpected ListAndWatch error: %v", err)
|
|
}
|
|
}
|
|
|
|
// Versioner implements storage.Interface.
|
|
func (c *Cacher) Versioner() storage.Versioner {
|
|
return c.storage.Versioner()
|
|
}
|
|
|
|
// Create implements storage.Interface.
|
|
func (c *Cacher) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
|
|
return c.storage.Create(ctx, key, obj, out, ttl)
|
|
}
|
|
|
|
// Delete implements storage.Interface.
|
|
func (c *Cacher) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc) error {
|
|
return c.storage.Delete(ctx, key, out, preconditions, validateDeletion)
|
|
}
|
|
|
|
// Watch implements storage.Interface.
|
|
func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) {
|
|
watchRV, err := c.versioner.ParseResourceVersion(resourceVersion)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
c.ready.wait()
|
|
|
|
triggerValue, triggerSupported := "", false
|
|
// TODO: Currently we assume that in a given Cacher object, any <predicate> that is
|
|
// passed here is aware of exactly the same trigger (at most one).
|
|
// Thus, either 0 or 1 values will be returned.
|
|
if matchValues := pred.MatcherIndex(); len(matchValues) > 0 {
|
|
triggerValue, triggerSupported = matchValues[0].Value, true
|
|
}
|
|
|
|
// If there is triggerFunc defined, but triggerSupported is false,
|
|
// we can't narrow the amount of events significantly at this point.
|
|
//
|
|
// That said, currently triggerFunc is defined only for Pods and Nodes,
|
|
// and there is only constant number of watchers for which triggerSupported
|
|
// is false (excluding those issues explicitly by users).
|
|
// Thus, to reduce the risk of those watchers blocking all watchers of a
|
|
// given resource in the system, we increase the sizes of buffers for them.
|
|
chanSize := 10
|
|
if c.triggerFunc != nil && !triggerSupported {
|
|
// TODO: We should tune this value and ideally make it dependent on the
|
|
// number of objects of a given type and/or their churn.
|
|
chanSize = 1000
|
|
}
|
|
|
|
// Determine watch timeout('0' means deadline is not set, ignore checking)
|
|
deadline, _ := ctx.Deadline()
|
|
// Create a watcher here to reduce memory allocations under lock,
|
|
// given that memory allocation may trigger GC and block the thread.
|
|
// Also note that emptyFunc is a placeholder, until we will be able
|
|
// to compute watcher.forget function (which has to happen under lock).
|
|
watcher := newCacheWatcher(chanSize, filterWithAttrsFunction(key, pred), emptyFunc, c.versioner, deadline, pred.AllowWatchBookmarks, c.objectType)
|
|
|
|
// We explicitly use thread unsafe version and do locking ourself to ensure that
|
|
// no new events will be processed in the meantime. The watchCache will be unlocked
|
|
// on return from this function.
|
|
// Note that we cannot do it under Cacher lock, to avoid a deadlock, since the
|
|
// underlying watchCache is calling processEvent under its lock.
|
|
c.watchCache.RLock()
|
|
defer c.watchCache.RUnlock()
|
|
initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV)
|
|
if err != nil {
|
|
// To match the uncached watch implementation, once we have passed authn/authz/admission,
|
|
// and successfully parsed a resource version, other errors must fail with a watch event of type ERROR,
|
|
// rather than a directly returned error.
|
|
return newErrWatcher(err), nil
|
|
}
|
|
|
|
// With some events already sent, update resourceVersion so that
|
|
// events that were buffered and not yet processed won't be delivered
|
|
// to this watcher second time causing going back in time.
|
|
if len(initEvents) > 0 {
|
|
watchRV = initEvents[len(initEvents)-1].ResourceVersion
|
|
}
|
|
|
|
func() {
|
|
c.Lock()
|
|
defer c.Unlock()
|
|
// Update watcher.forget function once we can compute it.
|
|
watcher.forget = forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
|
|
c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
|
|
|
|
// Add it to the queue only when server and client support watch bookmarks.
|
|
if c.watchBookmarkEnabled && watcher.allowWatchBookmarks {
|
|
c.bookmarkWatchers.addWatcher(watcher)
|
|
}
|
|
c.watcherIdx++
|
|
}()
|
|
|
|
go watcher.process(ctx, initEvents, watchRV)
|
|
return watcher, nil
|
|
}
|
|
|
|
// WatchList implements storage.Interface.
|
|
func (c *Cacher) WatchList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) {
|
|
return c.Watch(ctx, key, resourceVersion, pred)
|
|
}
|
|
|
|
// Get implements storage.Interface.
|
|
func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error {
|
|
if resourceVersion == "" {
|
|
// If resourceVersion is not specified, serve it from underlying
|
|
// storage (for backward compatibility).
|
|
return c.storage.Get(ctx, key, resourceVersion, objPtr, ignoreNotFound)
|
|
}
|
|
|
|
// If resourceVersion is specified, serve it from cache.
|
|
// It's guaranteed that the returned value is at least that
|
|
// fresh as the given resourceVersion.
|
|
getRV, err := c.versioner.ParseResourceVersion(resourceVersion)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if getRV == 0 && !c.ready.check() {
|
|
// If Cacher is not yet initialized and we don't require any specific
|
|
// minimal resource version, simply forward the request to storage.
|
|
return c.storage.Get(ctx, key, resourceVersion, objPtr, ignoreNotFound)
|
|
}
|
|
|
|
// Do not create a trace - it's not for free and there are tons
|
|
// of Get requests. We can add it if it will be really needed.
|
|
c.ready.wait()
|
|
|
|
objVal, err := conversion.EnforcePtr(objPtr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(getRV, key, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if exists {
|
|
elem, ok := obj.(*storeElement)
|
|
if !ok {
|
|
return fmt.Errorf("non *storeElement returned from storage: %v", obj)
|
|
}
|
|
objVal.Set(reflect.ValueOf(elem.Object).Elem())
|
|
} else {
|
|
objVal.Set(reflect.Zero(objVal.Type()))
|
|
if !ignoreNotFound {
|
|
return storage.NewKeyNotFoundError(key, int64(readResourceVersion))
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GetToList implements storage.Interface.
|
|
func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
|
|
pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
|
|
hasContinuation := pagingEnabled && len(pred.Continue) > 0
|
|
hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0"
|
|
if resourceVersion == "" || hasContinuation || hasLimit {
|
|
// If resourceVersion is not specified, serve it from underlying
|
|
// storage (for backward compatibility). If a continuation is
|
|
// requested, serve it from the underlying storage as well.
|
|
// Limits are only sent to storage when resourceVersion is non-zero
|
|
// since the watch cache isn't able to perform continuations, and
|
|
// limits are ignored when resource version is zero
|
|
return c.storage.GetToList(ctx, key, resourceVersion, pred, listObj)
|
|
}
|
|
|
|
// If resourceVersion is specified, serve it from cache.
|
|
// It's guaranteed that the returned value is at least that
|
|
// fresh as the given resourceVersion.
|
|
listRV, err := c.versioner.ParseResourceVersion(resourceVersion)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if listRV == 0 && !c.ready.check() {
|
|
// If Cacher is not yet initialized and we don't require any specific
|
|
// minimal resource version, simply forward the request to storage.
|
|
return c.storage.GetToList(ctx, key, resourceVersion, pred, listObj)
|
|
}
|
|
|
|
trace := utiltrace.New(fmt.Sprintf("cacher %v: List", c.objectType.String()))
|
|
defer trace.LogIfLong(500 * time.Millisecond)
|
|
|
|
c.ready.wait()
|
|
trace.Step("Ready")
|
|
|
|
// List elements with at least 'listRV' from cache.
|
|
listPtr, err := meta.GetItemsPtr(listObj)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
listVal, err := conversion.EnforcePtr(listPtr)
|
|
if err != nil || listVal.Kind() != reflect.Slice {
|
|
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
|
|
}
|
|
filter := filterWithAttrsFunction(key, pred)
|
|
|
|
obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(listRV, key, trace)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
trace.Step("Got from cache")
|
|
|
|
if exists {
|
|
elem, ok := obj.(*storeElement)
|
|
if !ok {
|
|
return fmt.Errorf("non *storeElement returned from storage: %v", obj)
|
|
}
|
|
if filter(elem.Key, elem.Labels, elem.Fields) {
|
|
listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()))
|
|
}
|
|
}
|
|
if c.versioner != nil {
|
|
if err := c.versioner.UpdateList(listObj, readResourceVersion, "", nil); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// List implements storage.Interface.
|
|
func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
|
|
pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
|
|
hasContinuation := pagingEnabled && len(pred.Continue) > 0
|
|
hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0"
|
|
if resourceVersion == "" || hasContinuation || hasLimit {
|
|
// If resourceVersion is not specified, serve it from underlying
|
|
// storage (for backward compatibility). If a continuation is
|
|
// requested, serve it from the underlying storage as well.
|
|
// Limits are only sent to storage when resourceVersion is non-zero
|
|
// since the watch cache isn't able to perform continuations, and
|
|
// limits are ignored when resource version is zero.
|
|
return c.storage.List(ctx, key, resourceVersion, pred, listObj)
|
|
}
|
|
|
|
// If resourceVersion is specified, serve it from cache.
|
|
// It's guaranteed that the returned value is at least that
|
|
// fresh as the given resourceVersion.
|
|
listRV, err := c.versioner.ParseResourceVersion(resourceVersion)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if listRV == 0 && !c.ready.check() {
|
|
// If Cacher is not yet initialized and we don't require any specific
|
|
// minimal resource version, simply forward the request to storage.
|
|
return c.storage.List(ctx, key, resourceVersion, pred, listObj)
|
|
}
|
|
|
|
trace := utiltrace.New(fmt.Sprintf("cacher %v: List", c.objectType.String()))
|
|
defer trace.LogIfLong(500 * time.Millisecond)
|
|
|
|
c.ready.wait()
|
|
trace.Step("Ready")
|
|
|
|
// List elements with at least 'listRV' from cache.
|
|
listPtr, err := meta.GetItemsPtr(listObj)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
listVal, err := conversion.EnforcePtr(listPtr)
|
|
if err != nil || listVal.Kind() != reflect.Slice {
|
|
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
|
|
}
|
|
filter := filterWithAttrsFunction(key, pred)
|
|
|
|
objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV, trace)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
trace.Step(fmt.Sprintf("Listed %d items from cache", len(objs)))
|
|
if len(objs) > listVal.Cap() && pred.Label.Empty() && pred.Field.Empty() {
|
|
// Resize the slice appropriately, since we already know that none
|
|
// of the elements will be filtered out.
|
|
listVal.Set(reflect.MakeSlice(reflect.SliceOf(c.objectType.Elem()), 0, len(objs)))
|
|
trace.Step("Resized result")
|
|
}
|
|
for _, obj := range objs {
|
|
elem, ok := obj.(*storeElement)
|
|
if !ok {
|
|
return fmt.Errorf("non *storeElement returned from storage: %v", obj)
|
|
}
|
|
if filter(elem.Key, elem.Labels, elem.Fields) {
|
|
listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()))
|
|
}
|
|
}
|
|
trace.Step(fmt.Sprintf("Filtered %d items", listVal.Len()))
|
|
if c.versioner != nil {
|
|
if err := c.versioner.UpdateList(listObj, readResourceVersion, "", nil); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GuaranteedUpdate implements storage.Interface.
|
|
func (c *Cacher) GuaranteedUpdate(
|
|
ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool,
|
|
preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, _ ...runtime.Object) error {
|
|
// Ignore the suggestion and try to pass down the current version of the object
|
|
// read from cache.
|
|
if elem, exists, err := c.watchCache.GetByKey(key); err != nil {
|
|
klog.Errorf("GetByKey returned error: %v", err)
|
|
} else if exists {
|
|
currObj := elem.(*storeElement).Object.DeepCopyObject()
|
|
return c.storage.GuaranteedUpdate(ctx, key, ptrToType, ignoreNotFound, preconditions, tryUpdate, currObj)
|
|
}
|
|
// If we couldn't get the object, fallback to no-suggestion.
|
|
return c.storage.GuaranteedUpdate(ctx, key, ptrToType, ignoreNotFound, preconditions, tryUpdate)
|
|
}
|
|
|
|
// Count implements storage.Interface.
|
|
func (c *Cacher) Count(pathPrefix string) (int64, error) {
|
|
return c.storage.Count(pathPrefix)
|
|
}
|
|
|
|
func (c *Cacher) triggerValues(event *watchCacheEvent) ([]string, bool) {
|
|
// TODO: Currently we assume that in a given Cacher object, its <c.triggerFunc>
|
|
// is aware of exactly the same trigger (at most one). Thus calling:
|
|
// c.triggerFunc(<some object>)
|
|
// can return only 0 or 1 values.
|
|
// That means, that triggerValues itself may return up to 2 different values.
|
|
if c.triggerFunc == nil {
|
|
return nil, false
|
|
}
|
|
result := make([]string, 0, 2)
|
|
matchValues := c.triggerFunc(event.Object)
|
|
if len(matchValues) > 0 {
|
|
result = append(result, matchValues[0].Value)
|
|
}
|
|
if event.PrevObject == nil {
|
|
return result, len(result) > 0
|
|
}
|
|
prevMatchValues := c.triggerFunc(event.PrevObject)
|
|
if len(prevMatchValues) > 0 {
|
|
if len(result) == 0 || result[0] != prevMatchValues[0].Value {
|
|
result = append(result, prevMatchValues[0].Value)
|
|
}
|
|
}
|
|
return result, len(result) > 0
|
|
}
|
|
|
|
func (c *Cacher) processEvent(event *watchCacheEvent) {
|
|
if curLen := int64(len(c.incoming)); c.incomingHWM.Update(curLen) {
|
|
// Monitor if this gets backed up, and how much.
|
|
klog.V(1).Infof("cacher (%v): %v objects queued in incoming channel.", c.objectType.String(), curLen)
|
|
}
|
|
c.incoming <- *event
|
|
}
|
|
|
|
func (c *Cacher) dispatchEvents() {
|
|
// Jitter to help level out any aggregate load.
|
|
bookmarkTimer := c.clock.NewTimer(wait.Jitter(time.Second, 0.25))
|
|
// Stop the timer when watchBookmarkFeatureGate is not enabled.
|
|
if !c.watchBookmarkEnabled && !bookmarkTimer.Stop() {
|
|
<-bookmarkTimer.C()
|
|
}
|
|
defer bookmarkTimer.Stop()
|
|
|
|
lastProcessedResourceVersion := uint64(0)
|
|
for {
|
|
select {
|
|
case event, ok := <-c.incoming:
|
|
if !ok {
|
|
return
|
|
}
|
|
c.dispatchEvent(&event)
|
|
lastProcessedResourceVersion = event.ResourceVersion
|
|
case <-bookmarkTimer.C():
|
|
bookmarkTimer.Reset(wait.Jitter(time.Second, 0.25))
|
|
// Never send a bookmark event if we did not see an event here, this is fine
|
|
// because we don't provide any guarantees on sending bookmarks.
|
|
if lastProcessedResourceVersion == 0 {
|
|
continue
|
|
}
|
|
bookmarkEvent := &watchCacheEvent{
|
|
Type: watch.Bookmark,
|
|
Object: c.newFunc(),
|
|
ResourceVersion: lastProcessedResourceVersion,
|
|
}
|
|
if err := c.versioner.UpdateObject(bookmarkEvent.Object, bookmarkEvent.ResourceVersion); err != nil {
|
|
klog.Errorf("failure to set resourceVersion to %d on bookmark event %+v", bookmarkEvent.ResourceVersion, bookmarkEvent.Object)
|
|
continue
|
|
}
|
|
c.dispatchEvent(bookmarkEvent)
|
|
case <-c.stopCh:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
|
|
c.startDispatching(event)
|
|
defer c.finishDispatching()
|
|
// Watchers stopped after startDispatching will be delayed to finishDispatching,
|
|
|
|
// Since add() can block, we explicitly add when cacher is unlocked.
|
|
if event.Type == watch.Bookmark {
|
|
for _, watcher := range c.watchersBuffer {
|
|
watcher.nonblockingAdd(event)
|
|
}
|
|
} else {
|
|
for _, watcher := range c.watchersBuffer {
|
|
watcher.add(event, c.timer, c.dispatchTimeoutBudget)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Cacher) startDispatchingBookmarkEvents() {
|
|
// Pop already expired watchers. However, explicitly ignore stopped ones,
|
|
// as we don't delete watcher from bookmarkWatchers when it is stopped.
|
|
for _, watchers := range c.bookmarkWatchers.popExpiredWatchers() {
|
|
for _, watcher := range watchers {
|
|
// watcher.stop() is protected by c.Lock()
|
|
if watcher.stopped {
|
|
continue
|
|
}
|
|
c.watchersBuffer = append(c.watchersBuffer, watcher)
|
|
// Given that we send bookmark event once at deadline-2s, never push again
|
|
// after the watcher pops up from the buckets. Once we decide to change the
|
|
// strategy to more sophisticated, we may need it here.
|
|
}
|
|
}
|
|
}
|
|
|
|
// startDispatching chooses watchers potentially interested in a given event
|
|
// a marks dispatching as true.
|
|
func (c *Cacher) startDispatching(event *watchCacheEvent) {
|
|
triggerValues, supported := c.triggerValues(event)
|
|
|
|
c.Lock()
|
|
defer c.Unlock()
|
|
|
|
c.dispatching = true
|
|
// We are reusing the slice to avoid memory reallocations in every
|
|
// dispatchEvent() call. That may prevent Go GC from freeing items
|
|
// from previous phases that are sitting behind the current length
|
|
// of the slice, but there is only a limited number of those and the
|
|
// gain from avoiding memory allocations is much bigger.
|
|
c.watchersBuffer = c.watchersBuffer[:0]
|
|
|
|
if event.Type == watch.Bookmark {
|
|
c.startDispatchingBookmarkEvents()
|
|
// return here to reduce following code indentation and diff
|
|
return
|
|
}
|
|
|
|
// Iterate over "allWatchers" no matter what the trigger function is.
|
|
for _, watcher := range c.watchers.allWatchers {
|
|
c.watchersBuffer = append(c.watchersBuffer, watcher)
|
|
}
|
|
if supported {
|
|
// Iterate over watchers interested in the given values of the trigger.
|
|
for _, triggerValue := range triggerValues {
|
|
for _, watcher := range c.watchers.valueWatchers[triggerValue] {
|
|
c.watchersBuffer = append(c.watchersBuffer, watcher)
|
|
}
|
|
}
|
|
} else {
|
|
// supported equal to false generally means that trigger function
|
|
// is not defined (or not aware of any indexes). In this case,
|
|
// watchers filters should generally also don't generate any
|
|
// trigger values, but can cause problems in case of some
|
|
// misconfiguration. Thus we paranoidly leave this branch.
|
|
|
|
// Iterate over watchers interested in exact values for all values.
|
|
for _, watchers := range c.watchers.valueWatchers {
|
|
for _, watcher := range watchers {
|
|
c.watchersBuffer = append(c.watchersBuffer, watcher)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// finishDispatching stops all the watchers that were supposed to be
|
|
// stopped in the meantime, but it was deferred to avoid closing input
|
|
// channels of watchers, as add() may still have writing to it.
|
|
// It also marks dispatching as false.
|
|
func (c *Cacher) finishDispatching() {
|
|
c.Lock()
|
|
defer c.Unlock()
|
|
c.dispatching = false
|
|
for _, watcher := range c.watchersToStop {
|
|
watcher.stop()
|
|
}
|
|
c.watchersToStop = c.watchersToStop[:0]
|
|
}
|
|
|
|
func (c *Cacher) terminateAllWatchers() {
|
|
c.Lock()
|
|
defer c.Unlock()
|
|
c.watchers.terminateAll(c.objectType, c.stopWatcherThreadUnsafe)
|
|
}
|
|
|
|
func (c *Cacher) stopWatcherThreadUnsafe(watcher *cacheWatcher) {
|
|
if c.dispatching {
|
|
c.watchersToStop = append(c.watchersToStop, watcher)
|
|
} else {
|
|
watcher.stop()
|
|
}
|
|
}
|
|
|
|
func (c *Cacher) isStopped() bool {
|
|
c.stopLock.RLock()
|
|
defer c.stopLock.RUnlock()
|
|
return c.stopped
|
|
}
|
|
|
|
// Stop implements the graceful termination.
|
|
func (c *Cacher) Stop() {
|
|
c.stopLock.Lock()
|
|
if c.stopped {
|
|
// avoid stopping twice (note: cachers are shared with subresources)
|
|
c.stopLock.Unlock()
|
|
return
|
|
}
|
|
c.stopped = true
|
|
c.stopLock.Unlock()
|
|
close(c.stopCh)
|
|
c.stopWg.Wait()
|
|
}
|
|
|
|
func forgetWatcher(c *Cacher, index int, triggerValue string, triggerSupported bool) func() {
|
|
return func() {
|
|
c.Lock()
|
|
defer c.Unlock()
|
|
|
|
// It's possible that the watcher is already not in the structure (e.g. in case of
|
|
// simultaneous Stop() and terminateAllWatchers(), but it is safe to call stop()
|
|
// on a watcher multiple times.
|
|
c.watchers.deleteWatcher(index, triggerValue, triggerSupported, c.stopWatcherThreadUnsafe)
|
|
}
|
|
}
|
|
|
|
func filterWithAttrsFunction(key string, p storage.SelectionPredicate) filterWithAttrsFunc {
|
|
filterFunc := func(objKey string, label labels.Set, field fields.Set) bool {
|
|
if !hasPathPrefix(objKey, key) {
|
|
return false
|
|
}
|
|
return p.MatchesObjectAttributes(label, field)
|
|
}
|
|
return filterFunc
|
|
}
|
|
|
|
// LastSyncResourceVersion returns resource version to which the underlying cache is synced.
|
|
func (c *Cacher) LastSyncResourceVersion() (uint64, error) {
|
|
c.ready.wait()
|
|
|
|
resourceVersion := c.reflector.LastSyncResourceVersion()
|
|
return c.versioner.ParseResourceVersion(resourceVersion)
|
|
}
|
|
|
|
// cacherListerWatcher opaques storage.Interface to expose cache.ListerWatcher.
|
|
type cacherListerWatcher struct {
|
|
storage storage.Interface
|
|
resourcePrefix string
|
|
newListFunc func() runtime.Object
|
|
}
|
|
|
|
// NewCacherListerWatcher returns a storage.Interface backed ListerWatcher.
|
|
func NewCacherListerWatcher(storage storage.Interface, resourcePrefix string, newListFunc func() runtime.Object) cache.ListerWatcher {
|
|
return &cacherListerWatcher{
|
|
storage: storage,
|
|
resourcePrefix: resourcePrefix,
|
|
newListFunc: newListFunc,
|
|
}
|
|
}
|
|
|
|
// Implements cache.ListerWatcher interface.
|
|
func (lw *cacherListerWatcher) List(options metav1.ListOptions) (runtime.Object, error) {
|
|
list := lw.newListFunc()
|
|
pred := storage.SelectionPredicate{
|
|
Label: labels.Everything(),
|
|
Field: fields.Everything(),
|
|
Limit: options.Limit,
|
|
Continue: options.Continue,
|
|
}
|
|
|
|
if err := lw.storage.List(context.TODO(), lw.resourcePrefix, "", pred, list); err != nil {
|
|
return nil, err
|
|
}
|
|
return list, nil
|
|
}
|
|
|
|
// Implements cache.ListerWatcher interface.
|
|
func (lw *cacherListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
|
|
return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, options.ResourceVersion, storage.Everything)
|
|
}
|
|
|
|
// errWatcher implements watch.Interface to return a single error
|
|
type errWatcher struct {
|
|
result chan watch.Event
|
|
}
|
|
|
|
func newErrWatcher(err error) *errWatcher {
|
|
// Create an error event
|
|
errEvent := watch.Event{Type: watch.Error}
|
|
switch err := err.(type) {
|
|
case runtime.Object:
|
|
errEvent.Object = err
|
|
case *errors.StatusError:
|
|
errEvent.Object = &err.ErrStatus
|
|
default:
|
|
errEvent.Object = &metav1.Status{
|
|
Status: metav1.StatusFailure,
|
|
Message: err.Error(),
|
|
Reason: metav1.StatusReasonInternalError,
|
|
Code: http.StatusInternalServerError,
|
|
}
|
|
}
|
|
|
|
// Create a watcher with room for a single event, populate it, and close the channel
|
|
watcher := &errWatcher{result: make(chan watch.Event, 1)}
|
|
watcher.result <- errEvent
|
|
close(watcher.result)
|
|
|
|
return watcher
|
|
}
|
|
|
|
// Implements watch.Interface.
|
|
func (c *errWatcher) ResultChan() <-chan watch.Event {
|
|
return c.result
|
|
}
|
|
|
|
// Implements watch.Interface.
|
|
func (c *errWatcher) Stop() {
|
|
// no-op
|
|
}
|
|
|
|
// cacheWatcher implements watch.Interface
|
|
type cacheWatcher struct {
|
|
sync.Mutex
|
|
input chan *watchCacheEvent
|
|
result chan watch.Event
|
|
done chan struct{}
|
|
filter filterWithAttrsFunc
|
|
stopped bool
|
|
forget func()
|
|
versioner storage.Versioner
|
|
// The watcher will be closed by server after the deadline,
|
|
// save it here to send bookmark events before that.
|
|
deadline time.Time
|
|
allowWatchBookmarks bool
|
|
// Object type of the cache watcher interests
|
|
objectType reflect.Type
|
|
}
|
|
|
|
func newCacheWatcher(chanSize int, filter filterWithAttrsFunc, forget func(), versioner storage.Versioner, deadline time.Time, allowWatchBookmarks bool, objectType reflect.Type) *cacheWatcher {
|
|
return &cacheWatcher{
|
|
input: make(chan *watchCacheEvent, chanSize),
|
|
result: make(chan watch.Event, chanSize),
|
|
done: make(chan struct{}),
|
|
filter: filter,
|
|
stopped: false,
|
|
forget: forget,
|
|
versioner: versioner,
|
|
deadline: deadline,
|
|
allowWatchBookmarks: allowWatchBookmarks,
|
|
objectType: objectType,
|
|
}
|
|
}
|
|
|
|
// Implements watch.Interface.
|
|
func (c *cacheWatcher) ResultChan() <-chan watch.Event {
|
|
return c.result
|
|
}
|
|
|
|
// Implements watch.Interface.
|
|
func (c *cacheWatcher) Stop() {
|
|
c.forget()
|
|
}
|
|
|
|
// TODO(#73958)
|
|
// stop() is protected by Cacher.Lock(), rename it to
|
|
// stopThreadUnsafe and remove the sync.Mutex.
|
|
func (c *cacheWatcher) stop() {
|
|
c.Lock()
|
|
defer c.Unlock()
|
|
if !c.stopped {
|
|
c.stopped = true
|
|
close(c.done)
|
|
close(c.input)
|
|
}
|
|
}
|
|
|
|
func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool {
|
|
// If we can't send it, don't block on it.
|
|
select {
|
|
case c.input <- event:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer, budget *timeBudget) {
|
|
// Try to send the event immediately, without blocking.
|
|
if c.nonblockingAdd(event) {
|
|
return
|
|
}
|
|
|
|
// OK, block sending, but only for up to <timeout>.
|
|
// cacheWatcher.add is called very often, so arrange
|
|
// to reuse timers instead of constantly allocating.
|
|
startTime := time.Now()
|
|
timeout := budget.takeAvailable()
|
|
|
|
timer.Reset(timeout)
|
|
|
|
select {
|
|
case c.input <- event:
|
|
if !timer.Stop() {
|
|
// Consume triggered (but not yet received) timer event
|
|
// so that future reuse does not get a spurious timeout.
|
|
<-timer.C
|
|
}
|
|
case <-timer.C:
|
|
// This means that we couldn't send event to that watcher.
|
|
// Since we don't want to block on it infinitely,
|
|
// we simply terminate it.
|
|
klog.V(1).Infof("Forcing watcher close due to unresponsiveness: %v", reflect.TypeOf(event.Object).String())
|
|
c.forget()
|
|
}
|
|
|
|
budget.returnUnused(timeout - time.Since(startTime))
|
|
}
|
|
|
|
func (c *cacheWatcher) nextBookmarkTime(now time.Time) (time.Time, bool) {
|
|
// For now we return 2s before deadline (and maybe +infinity is now already passed this time)
|
|
// but it gives us extensibility for the future(false when deadline is not set).
|
|
if c.deadline.IsZero() {
|
|
return c.deadline, false
|
|
}
|
|
return c.deadline.Add(-2 * time.Second), true
|
|
}
|
|
|
|
func (c *cacheWatcher) convertToWatchEvent(event *watchCacheEvent) *watch.Event {
|
|
if event.Type == watch.Bookmark {
|
|
return &watch.Event{Type: watch.Bookmark, Object: event.Object.DeepCopyObject()}
|
|
}
|
|
|
|
curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.ObjLabels, event.ObjFields)
|
|
oldObjPasses := false
|
|
if event.PrevObject != nil {
|
|
oldObjPasses = c.filter(event.Key, event.PrevObjLabels, event.PrevObjFields)
|
|
}
|
|
if !curObjPasses && !oldObjPasses {
|
|
// Watcher is not interested in that object.
|
|
return nil
|
|
}
|
|
|
|
switch {
|
|
case curObjPasses && !oldObjPasses:
|
|
return &watch.Event{Type: watch.Added, Object: event.Object.DeepCopyObject()}
|
|
case curObjPasses && oldObjPasses:
|
|
return &watch.Event{Type: watch.Modified, Object: event.Object.DeepCopyObject()}
|
|
case !curObjPasses && oldObjPasses:
|
|
// return a delete event with the previous object content, but with the event's resource version
|
|
oldObj := event.PrevObject.DeepCopyObject()
|
|
if err := c.versioner.UpdateObject(oldObj, event.ResourceVersion); err != nil {
|
|
utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", event.ResourceVersion, oldObj, err))
|
|
}
|
|
return &watch.Event{Type: watch.Deleted, Object: oldObj}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// NOTE: sendWatchCacheEvent is assumed to not modify <event> !!!
|
|
func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
|
|
watchEvent := c.convertToWatchEvent(event)
|
|
if watchEvent == nil {
|
|
// Watcher is not interested in that object.
|
|
return
|
|
}
|
|
|
|
// We need to ensure that if we put event X to the c.result, all
|
|
// previous events were already put into it before, no matter whether
|
|
// c.done is close or not.
|
|
// Thus we cannot simply select from c.done and c.result and this
|
|
// would give us non-determinism.
|
|
// At the same time, we don't want to block infinitely on putting
|
|
// to c.result, when c.done is already closed.
|
|
|
|
// This ensures that with c.done already close, we at most once go
|
|
// into the next select after this. With that, no matter which
|
|
// statement we choose there, we will deliver only consecutive
|
|
// events.
|
|
select {
|
|
case <-c.done:
|
|
return
|
|
default:
|
|
}
|
|
|
|
select {
|
|
case c.result <- *watchEvent:
|
|
case <-c.done:
|
|
}
|
|
}
|
|
|
|
func (c *cacheWatcher) process(ctx context.Context, initEvents []*watchCacheEvent, resourceVersion uint64) {
|
|
defer utilruntime.HandleCrash()
|
|
|
|
// Check how long we are processing initEvents.
|
|
// As long as these are not processed, we are not processing
|
|
// any incoming events, so if it takes long, we may actually
|
|
// block all watchers for some time.
|
|
// TODO: From the logs it seems that there happens processing
|
|
// times even up to 1s which is very long. However, this doesn't
|
|
// depend that much on the number of initEvents. E.g. from the
|
|
// 2000-node Kubemark run we have logs like this, e.g.:
|
|
// ... processing 13862 initEvents took 66.808689ms
|
|
// ... processing 14040 initEvents took 993.532539ms
|
|
// We should understand what is blocking us in those cases (e.g.
|
|
// is it lack of CPU, network, or sth else) and potentially
|
|
// consider increase size of result buffer in those cases.
|
|
const initProcessThreshold = 500 * time.Millisecond
|
|
startTime := time.Now()
|
|
for _, event := range initEvents {
|
|
c.sendWatchCacheEvent(event)
|
|
}
|
|
objType := c.objectType.String()
|
|
if len(initEvents) > 0 {
|
|
initCounter.WithLabelValues(objType).Add(float64(len(initEvents)))
|
|
}
|
|
processingTime := time.Since(startTime)
|
|
if processingTime > initProcessThreshold {
|
|
klog.V(2).Infof("processing %d initEvents of %s took %v", len(initEvents), objType, processingTime)
|
|
}
|
|
|
|
defer close(c.result)
|
|
defer c.Stop()
|
|
for {
|
|
select {
|
|
case event, ok := <-c.input:
|
|
if !ok {
|
|
return
|
|
}
|
|
// only send events newer than resourceVersion
|
|
if event.ResourceVersion > resourceVersion {
|
|
c.sendWatchCacheEvent(event)
|
|
}
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
type ready struct {
|
|
ok bool
|
|
c *sync.Cond
|
|
}
|
|
|
|
func newReady() *ready {
|
|
return &ready{c: sync.NewCond(&sync.RWMutex{})}
|
|
}
|
|
|
|
func (r *ready) wait() {
|
|
r.c.L.Lock()
|
|
for !r.ok {
|
|
r.c.Wait()
|
|
}
|
|
r.c.L.Unlock()
|
|
}
|
|
|
|
// TODO: Make check() function more sophisticated, in particular
|
|
// allow it to behave as "waitWithTimeout".
|
|
func (r *ready) check() bool {
|
|
rwMutex := r.c.L.(*sync.RWMutex)
|
|
rwMutex.RLock()
|
|
defer rwMutex.RUnlock()
|
|
return r.ok
|
|
}
|
|
|
|
func (r *ready) set(ok bool) {
|
|
r.c.L.Lock()
|
|
defer r.c.L.Unlock()
|
|
r.ok = ok
|
|
r.c.Broadcast()
|
|
}
|