2019-01-12 04:58:27 +00:00
/ *
Copyright 2015 The Kubernetes Authors .
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
package cacher
import (
"context"
"fmt"
"net/http"
"reflect"
"sync"
"time"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
2019-08-30 18:33:25 +00:00
"k8s.io/apimachinery/pkg/util/clock"
2019-01-12 04:58:27 +00:00
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature"
2021-07-02 08:43:15 +00:00
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
2019-01-12 04:58:27 +00:00
"k8s.io/client-go/tools/cache"
2020-08-10 17:43:49 +00:00
"k8s.io/klog/v2"
2019-04-07 17:07:55 +00:00
utiltrace "k8s.io/utils/trace"
)
var (
2019-08-30 18:33:25 +00:00
emptyFunc = func ( ) { }
)
const (
// storageWatchListPageSize is the cacher's request chunk size of
// initial and resync watch lists to storage.
storageWatchListPageSize = int64 ( 10000 )
2020-08-10 17:43:49 +00:00
// defaultBookmarkFrequency defines how frequently watch bookmarks should be send
// in addition to sending a bookmark right before watch deadline.
//
// NOTE: Update `eventFreshDuration` when changing this value.
defaultBookmarkFrequency = time . Minute
2019-01-12 04:58:27 +00:00
)
// Config contains the configuration for a given Cache.
type Config struct {
// An underlying storage.Interface.
Storage storage . Interface
// An underlying storage.Versioner.
Versioner storage . Versioner
// The Cache will be caching objects of a given Type and assumes that they
// are all stored under ResourcePrefix directory in the underlying database.
ResourcePrefix string
// KeyFunc is used to get a key in the underlying storage for a given object.
KeyFunc func ( runtime . Object ) ( string , error )
2019-04-07 17:07:55 +00:00
// GetAttrsFunc is used to get object labels, fields
GetAttrsFunc func ( runtime . Object ) ( label labels . Set , field fields . Set , err error )
2019-01-12 04:58:27 +00:00
2019-09-27 21:51:53 +00:00
// IndexerFuncs is used for optimizing amount of watchers that
2019-01-12 04:58:27 +00:00
// needs to process an incoming event.
2019-09-27 21:51:53 +00:00
IndexerFuncs storage . IndexerFuncs
2019-01-12 04:58:27 +00:00
2020-03-26 21:07:15 +00:00
// Indexers is used to accelerate the list operation, falls back to regular list
// operation if no indexer found.
Indexers * cache . Indexers
2019-08-30 18:33:25 +00:00
// NewFunc is a function that creates new empty object storing a object of type Type.
NewFunc func ( ) runtime . Object
2019-01-12 04:58:27 +00:00
// NewList is a function that creates new empty object storing a list of
// objects of type Type.
NewListFunc func ( ) runtime . Object
Codec runtime . Codec
2020-08-10 17:43:49 +00:00
Clock clock . Clock
2019-01-12 04:58:27 +00:00
}
type watchersMap map [ int ] * cacheWatcher
func ( wm watchersMap ) addWatcher ( w * cacheWatcher , number int ) {
wm [ number ] = w
}
2019-04-07 17:07:55 +00:00
func ( wm watchersMap ) deleteWatcher ( number int , done func ( * cacheWatcher ) ) {
if watcher , ok := wm [ number ] ; ok {
delete ( wm , number )
done ( watcher )
}
2019-01-12 04:58:27 +00:00
}
2019-04-07 17:07:55 +00:00
func ( wm watchersMap ) terminateAll ( done func ( * cacheWatcher ) ) {
2019-01-12 04:58:27 +00:00
for key , watcher := range wm {
delete ( wm , key )
2019-04-07 17:07:55 +00:00
done ( watcher )
2019-01-12 04:58:27 +00:00
}
}
type indexedWatchers struct {
allWatchers watchersMap
valueWatchers map [ string ] watchersMap
}
func ( i * indexedWatchers ) addWatcher ( w * cacheWatcher , number int , value string , supported bool ) {
if supported {
if _ , ok := i . valueWatchers [ value ] ; ! ok {
i . valueWatchers [ value ] = watchersMap { }
}
i . valueWatchers [ value ] . addWatcher ( w , number )
} else {
i . allWatchers . addWatcher ( w , number )
}
}
2019-04-07 17:07:55 +00:00
func ( i * indexedWatchers ) deleteWatcher ( number int , value string , supported bool , done func ( * cacheWatcher ) ) {
2019-01-12 04:58:27 +00:00
if supported {
2019-04-07 17:07:55 +00:00
i . valueWatchers [ value ] . deleteWatcher ( number , done )
2019-01-12 04:58:27 +00:00
if len ( i . valueWatchers [ value ] ) == 0 {
delete ( i . valueWatchers , value )
}
} else {
2019-04-07 17:07:55 +00:00
i . allWatchers . deleteWatcher ( number , done )
2019-01-12 04:58:27 +00:00
}
}
2019-04-07 17:07:55 +00:00
func ( i * indexedWatchers ) terminateAll ( objectType reflect . Type , done func ( * cacheWatcher ) ) {
2019-01-12 04:58:27 +00:00
if len ( i . allWatchers ) > 0 || len ( i . valueWatchers ) > 0 {
klog . Warningf ( "Terminating all watchers from cacher %v" , objectType )
}
2019-04-07 17:07:55 +00:00
i . allWatchers . terminateAll ( done )
2019-09-27 21:51:53 +00:00
for _ , watchers := range i . valueWatchers {
2019-04-07 17:07:55 +00:00
watchers . terminateAll ( done )
2019-01-12 04:58:27 +00:00
}
2019-09-27 21:51:53 +00:00
i . valueWatchers = map [ string ] watchersMap { }
2019-01-12 04:58:27 +00:00
}
2019-08-30 18:33:25 +00:00
// As we don't need a high precision here, we keep all watchers timeout within a
// second in a bucket, and pop up them once at the timeout. To be more specific,
// if you set fire time at X, you can get the bookmark within (X-1,X+1) period.
type watcherBookmarkTimeBuckets struct {
2020-12-01 01:06:26 +00:00
lock sync . Mutex
// the key of watcherBuckets is the number of seconds since createTime
2020-08-10 17:43:49 +00:00
watchersBuckets map [ int64 ] [ ] * cacheWatcher
2020-12-01 01:06:26 +00:00
createTime time . Time
2020-08-10 17:43:49 +00:00
startBucketID int64
clock clock . Clock
bookmarkFrequency time . Duration
2019-08-30 18:33:25 +00:00
}
2020-08-10 17:43:49 +00:00
func newTimeBucketWatchers ( clock clock . Clock , bookmarkFrequency time . Duration ) * watcherBookmarkTimeBuckets {
2019-08-30 18:33:25 +00:00
return & watcherBookmarkTimeBuckets {
2020-08-10 17:43:49 +00:00
watchersBuckets : make ( map [ int64 ] [ ] * cacheWatcher ) ,
2020-12-01 01:06:26 +00:00
createTime : clock . Now ( ) ,
startBucketID : 0 ,
2020-08-10 17:43:49 +00:00
clock : clock ,
bookmarkFrequency : bookmarkFrequency ,
2019-08-30 18:33:25 +00:00
}
}
// adds a watcher to the bucket, if the deadline is before the start, it will be
// added to the first one.
func ( t * watcherBookmarkTimeBuckets ) addWatcher ( w * cacheWatcher ) bool {
2020-08-10 17:43:49 +00:00
nextTime , ok := w . nextBookmarkTime ( t . clock . Now ( ) , t . bookmarkFrequency )
2019-08-30 18:33:25 +00:00
if ! ok {
return false
}
2020-12-01 01:06:26 +00:00
bucketID := int64 ( nextTime . Sub ( t . createTime ) / time . Second )
2019-09-27 21:51:53 +00:00
t . lock . Lock ( )
defer t . lock . Unlock ( )
2019-08-30 18:33:25 +00:00
if bucketID < t . startBucketID {
bucketID = t . startBucketID
}
watchers , _ := t . watchersBuckets [ bucketID ]
t . watchersBuckets [ bucketID ] = append ( watchers , w )
return true
}
func ( t * watcherBookmarkTimeBuckets ) popExpiredWatchers ( ) [ ] [ ] * cacheWatcher {
2020-12-01 01:06:26 +00:00
currentBucketID := int64 ( t . clock . Since ( t . createTime ) / time . Second )
2019-08-30 18:33:25 +00:00
// There should be one or two elements in almost all cases
expiredWatchers := make ( [ ] [ ] * cacheWatcher , 0 , 2 )
2019-09-27 21:51:53 +00:00
t . lock . Lock ( )
defer t . lock . Unlock ( )
2019-08-30 18:33:25 +00:00
for ; t . startBucketID <= currentBucketID ; t . startBucketID ++ {
if watchers , ok := t . watchersBuckets [ t . startBucketID ] ; ok {
delete ( t . watchersBuckets , t . startBucketID )
expiredWatchers = append ( expiredWatchers , watchers )
}
}
return expiredWatchers
}
2019-04-07 17:07:55 +00:00
type filterWithAttrsFunc func ( key string , l labels . Set , f fields . Set ) bool
2019-01-12 04:58:27 +00:00
2019-09-27 21:51:53 +00:00
type indexedTriggerFunc struct {
indexName string
indexerFunc storage . IndexerFunc
}
2019-01-12 04:58:27 +00:00
// Cacher is responsible for serving WATCH and LIST requests for a given
// resource from its internal cache and updating its cache in the background
// based on the underlying storage contents.
// Cacher implements storage.Interface (although most of the calls are just
// delegated to the underlying storage).
type Cacher struct {
// HighWaterMarks for performance debugging.
// Important: Since HighWaterMark is using sync/atomic, it has to be at the top of the struct due to a bug on 32-bit platforms
// See: https://golang.org/pkg/sync/atomic/ for more information
incomingHWM storage . HighWaterMark
// Incoming events that should be dispatched to watchers.
incoming chan watchCacheEvent
sync . RWMutex
// Before accessing the cacher's cache, wait for the ready to be ok.
// This is necessary to prevent users from accessing structures that are
// uninitialized or are being repopulated right now.
// ready needs to be set to false when the cacher is paused or stopped.
// ready needs to be set to true when the cacher is ready to use after
// initialization.
ready * ready
// Underlying storage.Interface.
storage storage . Interface
// Expected type of objects in the underlying cache.
objectType reflect . Type
// "sliding window" of recent changes of objects and the current state.
watchCache * watchCache
reflector * cache . Reflector
// Versioner is used to handle resource versions.
versioner storage . Versioner
2019-08-30 18:33:25 +00:00
// newFunc is a function that creates new empty object storing a object of type Type.
newFunc func ( ) runtime . Object
2019-09-27 21:51:53 +00:00
// indexedTrigger is used for optimizing amount of watchers that needs to process
2019-01-12 04:58:27 +00:00
// an incoming event.
2019-09-27 21:51:53 +00:00
indexedTrigger * indexedTriggerFunc
2019-01-12 04:58:27 +00:00
// watchers is mapping from the value of trigger function that a
// watcher is interested into the watchers
watcherIdx int
watchers indexedWatchers
// Defines a time budget that can be spend on waiting for not-ready watchers
// while dispatching event before shutting them down.
2020-12-01 01:06:26 +00:00
dispatchTimeoutBudget timeBudget
2019-01-12 04:58:27 +00:00
// Handling graceful termination.
stopLock sync . RWMutex
stopped bool
stopCh chan struct { }
stopWg sync . WaitGroup
2019-04-07 17:07:55 +00:00
2019-08-30 18:33:25 +00:00
clock clock . Clock
2019-04-07 17:07:55 +00:00
// timer is used to avoid unnecessary allocations in underlying watchers.
timer * time . Timer
// dispatching determines whether there is currently dispatching of
// any event in flight.
dispatching bool
// watchersBuffer is a list of watchers potentially interested in currently
// dispatched event.
watchersBuffer [ ] * cacheWatcher
2019-09-27 21:51:53 +00:00
// blockedWatchers is a list of watchers whose buffer is currently full.
blockedWatchers [ ] * cacheWatcher
2019-04-07 17:07:55 +00:00
// watchersToStop is a list of watchers that were supposed to be stopped
// during current dispatching, but stopping was deferred to the end of
// dispatching that event to avoid race with closing channels in watchers.
watchersToStop [ ] * cacheWatcher
2019-08-30 18:33:25 +00:00
// Maintain a timeout queue to send the bookmark event before the watcher times out.
bookmarkWatchers * watcherBookmarkTimeBuckets
2019-01-12 04:58:27 +00:00
}
// NewCacherFromConfig creates a new Cacher responsible for servicing WATCH and LIST requests from
// its internal cache and updating its cache in the background based on the
// given configuration.
2019-09-27 21:51:53 +00:00
func NewCacherFromConfig ( config Config ) ( * Cacher , error ) {
2019-08-30 18:33:25 +00:00
stopCh := make ( chan struct { } )
obj := config . NewFunc ( )
2019-01-12 04:58:27 +00:00
// Give this error when it is constructed rather than when you get the
// first watch item, because it's much easier to track down that way.
2019-08-30 18:33:25 +00:00
if err := runtime . CheckCodec ( config . Codec , obj ) ; err != nil {
2019-09-27 21:51:53 +00:00
return nil , fmt . Errorf ( "storage codec doesn't seem to match given type: %v" , err )
}
var indexedTrigger * indexedTriggerFunc
if config . IndexerFuncs != nil {
// For now, we don't support multiple trigger functions defined
// for a given resource.
if len ( config . IndexerFuncs ) > 1 {
return nil , fmt . Errorf ( "cacher %s doesn't support more than one IndexerFunc: " , reflect . TypeOf ( obj ) . String ( ) )
}
for key , value := range config . IndexerFuncs {
if value != nil {
indexedTrigger = & indexedTriggerFunc {
indexName : key ,
indexerFunc : value ,
}
}
}
2019-01-12 04:58:27 +00:00
}
2020-08-10 17:43:49 +00:00
if config . Clock == nil {
config . Clock = clock . RealClock { }
}
objType := reflect . TypeOf ( obj )
2019-01-12 04:58:27 +00:00
cacher := & Cacher {
2019-09-27 21:51:53 +00:00
ready : newReady ( ) ,
storage : config . Storage ,
2020-08-10 17:43:49 +00:00
objectType : objType ,
2019-09-27 21:51:53 +00:00
versioner : config . Versioner ,
newFunc : config . NewFunc ,
indexedTrigger : indexedTrigger ,
watcherIdx : 0 ,
2019-01-12 04:58:27 +00:00
watchers : indexedWatchers {
allWatchers : make ( map [ int ] * cacheWatcher ) ,
valueWatchers : make ( map [ string ] watchersMap ) ,
} ,
// TODO: Figure out the correct value for the buffer size.
incoming : make ( chan watchCacheEvent , 100 ) ,
dispatchTimeoutBudget : newTimeBudget ( stopCh ) ,
// We need to (potentially) stop both:
// - wait.Until go-routine
// - reflector.ListAndWatch
// and there are no guarantees on the order that they will stop.
// So we will be simply closing the channel, and synchronizing on the WaitGroup.
2019-12-12 01:27:03 +00:00
stopCh : stopCh ,
2020-08-10 17:43:49 +00:00
clock : config . Clock ,
2019-12-12 01:27:03 +00:00
timer : time . NewTimer ( time . Duration ( 0 ) ) ,
2020-08-10 17:43:49 +00:00
bookmarkWatchers : newTimeBucketWatchers ( config . Clock , defaultBookmarkFrequency ) ,
2019-01-12 04:58:27 +00:00
}
2019-08-30 18:33:25 +00:00
// Ensure that timer is stopped.
if ! cacher . timer . Stop ( ) {
// Consume triggered (but not yet received) timer event
// so that future reuse does not get a spurious timeout.
<- cacher . timer . C
}
watchCache := newWatchCache (
2020-08-10 17:43:49 +00:00
config . KeyFunc , cacher . processEvent , config . GetAttrsFunc , config . Versioner , config . Indexers , config . Clock , objType )
2019-08-30 18:33:25 +00:00
listerWatcher := NewCacherListerWatcher ( config . Storage , config . ResourcePrefix , config . NewListFunc )
reflectorName := "storage/cacher.go:" + config . ResourcePrefix
reflector := cache . NewNamedReflector ( reflectorName , listerWatcher , obj , watchCache , 0 )
// Configure reflector's pager to for an appropriate pagination chunk size for fetching data from
// storage. The pager falls back to full list if paginated list calls fail due to an "Expired" error.
reflector . WatchListPageSize = storageWatchListPageSize
cacher . watchCache = watchCache
cacher . reflector = reflector
2019-01-12 04:58:27 +00:00
go cacher . dispatchEvents ( )
cacher . stopWg . Add ( 1 )
go func ( ) {
defer cacher . stopWg . Done ( )
2019-06-12 21:00:25 +00:00
defer cacher . terminateAllWatchers ( )
2019-01-12 04:58:27 +00:00
wait . Until (
func ( ) {
if ! cacher . isStopped ( ) {
cacher . startCaching ( stopCh )
}
} , time . Second , stopCh ,
)
} ( )
2019-04-07 17:07:55 +00:00
2019-09-27 21:51:53 +00:00
return cacher , nil
2019-01-12 04:58:27 +00:00
}
func ( c * Cacher ) startCaching ( stopChannel <- chan struct { } ) {
// The 'usable' lock is always 'RLock'able when it is safe to use the cache.
// It is safe to use the cache after a successful list until a disconnection.
// We start with usable (write) locked. The below OnReplace function will
// unlock it after a successful list. The below defer will then re-lock
// it when this function exits (always due to disconnection), only if
// we actually got a successful list. This cycle will repeat as needed.
successfulList := false
c . watchCache . SetOnReplace ( func ( ) {
successfulList = true
c . ready . set ( true )
2020-08-10 17:43:49 +00:00
klog . V ( 1 ) . Infof ( "cacher (%v): initialized" , c . objectType . String ( ) )
2019-01-12 04:58:27 +00:00
} )
defer func ( ) {
if successfulList {
c . ready . set ( false )
}
} ( )
c . terminateAllWatchers ( )
// Note that since onReplace may be not called due to errors, we explicitly
// need to retry it on errors under lock.
// Also note that startCaching is called in a loop, so there's no need
// to have another loop here.
if err := c . reflector . ListAndWatch ( stopChannel ) ; err != nil {
2020-08-10 17:43:49 +00:00
klog . Errorf ( "cacher (%v): unexpected ListAndWatch error: %v; reinitializing..." , c . objectType . String ( ) , err )
2019-01-12 04:58:27 +00:00
}
}
// Versioner implements storage.Interface.
func ( c * Cacher ) Versioner ( ) storage . Versioner {
return c . storage . Versioner ( )
}
// Create implements storage.Interface.
func ( c * Cacher ) Create ( ctx context . Context , key string , obj , out runtime . Object , ttl uint64 ) error {
return c . storage . Create ( ctx , key , obj , out , ttl )
}
// Delete implements storage.Interface.
2021-03-18 22:40:29 +00:00
func ( c * Cacher ) Delete (
ctx context . Context , key string , out runtime . Object , preconditions * storage . Preconditions ,
validateDeletion storage . ValidateObjectFunc , _ runtime . Object ) error {
// Ignore the suggestion and try to pass down the current version of the object
// read from cache.
if elem , exists , err := c . watchCache . GetByKey ( key ) ; err != nil {
klog . Errorf ( "GetByKey returned error: %v" , err )
} else if exists {
// DeepCopy the object since we modify resource version when serializing the
// current object.
currObj := elem . ( * storeElement ) . Object . DeepCopyObject ( )
return c . storage . Delete ( ctx , key , out , preconditions , validateDeletion , currObj )
}
// If we couldn't get the object, fallback to no-suggestion.
return c . storage . Delete ( ctx , key , out , preconditions , validateDeletion , nil )
2019-01-12 04:58:27 +00:00
}
// Watch implements storage.Interface.
2020-08-10 17:43:49 +00:00
func ( c * Cacher ) Watch ( ctx context . Context , key string , opts storage . ListOptions ) ( watch . Interface , error ) {
pred := opts . Predicate
watchRV , err := c . versioner . ParseResourceVersion ( opts . ResourceVersion )
2019-01-12 04:58:27 +00:00
if err != nil {
return nil , err
}
c . ready . wait ( )
triggerValue , triggerSupported := "" , false
2019-09-27 21:51:53 +00:00
if c . indexedTrigger != nil {
for _ , field := range pred . IndexFields {
if field == c . indexedTrigger . indexName {
if value , ok := pred . Field . RequiresExactMatch ( field ) ; ok {
triggerValue , triggerSupported = value , true
}
}
}
2019-01-12 04:58:27 +00:00
}
2019-09-27 21:51:53 +00:00
// If there is indexedTrigger defined, but triggerSupported is false,
2019-01-12 04:58:27 +00:00
// we can't narrow the amount of events significantly at this point.
//
2019-09-27 21:51:53 +00:00
// That said, currently indexedTrigger is defined only for couple resources:
// Pods, Nodes, Secrets and ConfigMaps and there is only a constant
// number of watchers for which triggerSupported is false (excluding those
// issued explicitly by users).
2019-01-12 04:58:27 +00:00
// Thus, to reduce the risk of those watchers blocking all watchers of a
// given resource in the system, we increase the sizes of buffers for them.
chanSize := 10
2019-09-27 21:51:53 +00:00
if c . indexedTrigger != nil && ! triggerSupported {
2019-01-12 04:58:27 +00:00
// TODO: We should tune this value and ideally make it dependent on the
// number of objects of a given type and/or their churn.
chanSize = 1000
}
2019-08-30 18:33:25 +00:00
// Determine watch timeout('0' means deadline is not set, ignore checking)
deadline , _ := ctx . Deadline ( )
2021-03-18 22:40:29 +00:00
identifier := fmt . Sprintf ( "key: %q, labels: %q, fields: %q" , key , pred . Label , pred . Field )
2019-08-30 18:33:25 +00:00
// Create a watcher here to reduce memory allocations under lock,
// given that memory allocation may trigger GC and block the thread.
// Also note that emptyFunc is a placeholder, until we will be able
// to compute watcher.forget function (which has to happen under lock).
2021-03-18 22:40:29 +00:00
watcher := newCacheWatcher ( chanSize , filterWithAttrsFunction ( key , pred ) , emptyFunc , c . versioner , deadline , pred . AllowWatchBookmarks , c . objectType , identifier )
2019-08-30 18:33:25 +00:00
// We explicitly use thread unsafe version and do locking ourself to ensure that
// no new events will be processed in the meantime. The watchCache will be unlocked
// on return from this function.
// Note that we cannot do it under Cacher lock, to avoid a deadlock, since the
// underlying watchCache is calling processEvent under its lock.
c . watchCache . RLock ( )
defer c . watchCache . RUnlock ( )
initEvents , err := c . watchCache . GetAllEventsSinceThreadUnsafe ( watchRV )
if err != nil {
// To match the uncached watch implementation, once we have passed authn/authz/admission,
// and successfully parsed a resource version, other errors must fail with a watch event of type ERROR,
// rather than a directly returned error.
return newErrWatcher ( err ) , nil
}
2019-03-04 01:22:32 +00:00
// With some events already sent, update resourceVersion so that
// events that were buffered and not yet processed won't be delivered
// to this watcher second time causing going back in time.
if len ( initEvents ) > 0 {
watchRV = initEvents [ len ( initEvents ) - 1 ] . ResourceVersion
}
2019-08-30 18:33:25 +00:00
func ( ) {
c . Lock ( )
defer c . Unlock ( )
// Update watcher.forget function once we can compute it.
watcher . forget = forgetWatcher ( c , c . watcherIdx , triggerValue , triggerSupported )
c . watchers . addWatcher ( watcher , c . watcherIdx , triggerValue , triggerSupported )
2019-01-12 04:58:27 +00:00
2019-12-12 01:27:03 +00:00
// Add it to the queue only when the client support watch bookmarks.
if watcher . allowWatchBookmarks {
2019-08-30 18:33:25 +00:00
c . bookmarkWatchers . addWatcher ( watcher )
}
c . watcherIdx ++
} ( )
go watcher . process ( ctx , initEvents , watchRV )
2019-01-12 04:58:27 +00:00
return watcher , nil
}
// WatchList implements storage.Interface.
2020-08-10 17:43:49 +00:00
func ( c * Cacher ) WatchList ( ctx context . Context , key string , opts storage . ListOptions ) ( watch . Interface , error ) {
return c . Watch ( ctx , key , opts )
2019-01-12 04:58:27 +00:00
}
// Get implements storage.Interface.
2020-08-10 17:43:49 +00:00
func ( c * Cacher ) Get ( ctx context . Context , key string , opts storage . GetOptions , objPtr runtime . Object ) error {
if opts . ResourceVersion == "" {
2019-01-12 04:58:27 +00:00
// If resourceVersion is not specified, serve it from underlying
// storage (for backward compatibility).
2020-08-10 17:43:49 +00:00
return c . storage . Get ( ctx , key , opts , objPtr )
2019-01-12 04:58:27 +00:00
}
// If resourceVersion is specified, serve it from cache.
// It's guaranteed that the returned value is at least that
// fresh as the given resourceVersion.
2020-08-10 17:43:49 +00:00
getRV , err := c . versioner . ParseResourceVersion ( opts . ResourceVersion )
2019-01-12 04:58:27 +00:00
if err != nil {
return err
}
if getRV == 0 && ! c . ready . check ( ) {
// If Cacher is not yet initialized and we don't require any specific
// minimal resource version, simply forward the request to storage.
2020-08-10 17:43:49 +00:00
return c . storage . Get ( ctx , key , opts , objPtr )
2019-01-12 04:58:27 +00:00
}
// Do not create a trace - it's not for free and there are tons
// of Get requests. We can add it if it will be really needed.
c . ready . wait ( )
objVal , err := conversion . EnforcePtr ( objPtr )
if err != nil {
return err
}
obj , exists , readResourceVersion , err := c . watchCache . WaitUntilFreshAndGet ( getRV , key , nil )
if err != nil {
return err
}
if exists {
elem , ok := obj . ( * storeElement )
if ! ok {
return fmt . Errorf ( "non *storeElement returned from storage: %v" , obj )
}
objVal . Set ( reflect . ValueOf ( elem . Object ) . Elem ( ) )
} else {
objVal . Set ( reflect . Zero ( objVal . Type ( ) ) )
2020-08-10 17:43:49 +00:00
if ! opts . IgnoreNotFound {
2019-01-12 04:58:27 +00:00
return storage . NewKeyNotFoundError ( key , int64 ( readResourceVersion ) )
}
}
return nil
}
2021-07-02 08:43:15 +00:00
func shouldDelegateList ( opts storage . ListOptions ) bool {
2020-08-10 17:43:49 +00:00
resourceVersion := opts . ResourceVersion
pred := opts . Predicate
2019-01-12 04:58:27 +00:00
pagingEnabled := utilfeature . DefaultFeatureGate . Enabled ( features . APIListChunking )
hasContinuation := pagingEnabled && len ( pred . Continue ) > 0
hasLimit := pagingEnabled && pred . Limit > 0 && resourceVersion != "0"
2021-07-02 08:43:15 +00:00
// If resourceVersion is not specified, serve it from underlying
// storage (for backward compatibility). If a continuation is
// requested, serve it from the underlying storage as well.
// Limits are only sent to storage when resourceVersion is non-zero
// since the watch cache isn't able to perform continuations, and
// limits are ignored when resource version is zero
return resourceVersion == "" || hasContinuation || hasLimit || opts . ResourceVersionMatch == metav1 . ResourceVersionMatchExact
}
// GetToList implements storage.Interface.
func ( c * Cacher ) GetToList ( ctx context . Context , key string , opts storage . ListOptions , listObj runtime . Object ) error {
resourceVersion := opts . ResourceVersion
pred := opts . Predicate
if shouldDelegateList ( opts ) {
2020-08-10 17:43:49 +00:00
return c . storage . GetToList ( ctx , key , opts , listObj )
2019-01-12 04:58:27 +00:00
}
// If resourceVersion is specified, serve it from cache.
// It's guaranteed that the returned value is at least that
// fresh as the given resourceVersion.
listRV , err := c . versioner . ParseResourceVersion ( resourceVersion )
if err != nil {
return err
}
if listRV == 0 && ! c . ready . check ( ) {
// If Cacher is not yet initialized and we don't require any specific
// minimal resource version, simply forward the request to storage.
2020-08-10 17:43:49 +00:00
return c . storage . GetToList ( ctx , key , opts , listObj )
2019-01-12 04:58:27 +00:00
}
2019-09-27 21:51:53 +00:00
trace := utiltrace . New ( "cacher list" , utiltrace . Field { "type" , c . objectType . String ( ) } )
2019-01-12 04:58:27 +00:00
defer trace . LogIfLong ( 500 * time . Millisecond )
c . ready . wait ( )
trace . Step ( "Ready" )
// List elements with at least 'listRV' from cache.
listPtr , err := meta . GetItemsPtr ( listObj )
if err != nil {
return err
}
listVal , err := conversion . EnforcePtr ( listPtr )
2019-12-12 01:27:03 +00:00
if err != nil {
return err
}
if listVal . Kind ( ) != reflect . Slice {
2019-01-12 04:58:27 +00:00
return fmt . Errorf ( "need a pointer to slice, got %v" , listVal . Kind ( ) )
}
filter := filterWithAttrsFunction ( key , pred )
obj , exists , readResourceVersion , err := c . watchCache . WaitUntilFreshAndGet ( listRV , key , trace )
if err != nil {
return err
}
trace . Step ( "Got from cache" )
if exists {
elem , ok := obj . ( * storeElement )
if ! ok {
return fmt . Errorf ( "non *storeElement returned from storage: %v" , obj )
}
2019-04-07 17:07:55 +00:00
if filter ( elem . Key , elem . Labels , elem . Fields ) {
2019-01-12 04:58:27 +00:00
listVal . Set ( reflect . Append ( listVal , reflect . ValueOf ( elem . Object ) . Elem ( ) ) )
}
}
if c . versioner != nil {
2019-08-30 18:33:25 +00:00
if err := c . versioner . UpdateList ( listObj , readResourceVersion , "" , nil ) ; err != nil {
2019-01-12 04:58:27 +00:00
return err
}
}
return nil
}
// List implements storage.Interface.
2020-08-10 17:43:49 +00:00
func ( c * Cacher ) List ( ctx context . Context , key string , opts storage . ListOptions , listObj runtime . Object ) error {
resourceVersion := opts . ResourceVersion
pred := opts . Predicate
2021-07-02 08:43:15 +00:00
if shouldDelegateList ( opts ) {
2020-08-10 17:43:49 +00:00
return c . storage . List ( ctx , key , opts , listObj )
2019-01-12 04:58:27 +00:00
}
// If resourceVersion is specified, serve it from cache.
// It's guaranteed that the returned value is at least that
// fresh as the given resourceVersion.
listRV , err := c . versioner . ParseResourceVersion ( resourceVersion )
if err != nil {
return err
}
if listRV == 0 && ! c . ready . check ( ) {
// If Cacher is not yet initialized and we don't require any specific
// minimal resource version, simply forward the request to storage.
2020-08-10 17:43:49 +00:00
return c . storage . List ( ctx , key , opts , listObj )
2019-01-12 04:58:27 +00:00
}
2019-09-27 21:51:53 +00:00
trace := utiltrace . New ( "cacher list" , utiltrace . Field { "type" , c . objectType . String ( ) } )
2019-01-12 04:58:27 +00:00
defer trace . LogIfLong ( 500 * time . Millisecond )
c . ready . wait ( )
trace . Step ( "Ready" )
// List elements with at least 'listRV' from cache.
listPtr , err := meta . GetItemsPtr ( listObj )
if err != nil {
return err
}
listVal , err := conversion . EnforcePtr ( listPtr )
2019-12-12 01:27:03 +00:00
if err != nil {
return err
}
if listVal . Kind ( ) != reflect . Slice {
2019-01-12 04:58:27 +00:00
return fmt . Errorf ( "need a pointer to slice, got %v" , listVal . Kind ( ) )
}
filter := filterWithAttrsFunction ( key , pred )
2020-03-26 21:07:15 +00:00
objs , readResourceVersion , err := c . watchCache . WaitUntilFreshAndList ( listRV , pred . MatcherIndex ( ) , trace )
2019-01-12 04:58:27 +00:00
if err != nil {
return err
}
2019-09-27 21:51:53 +00:00
trace . Step ( "Listed items from cache" , utiltrace . Field { "count" , len ( objs ) } )
2019-01-12 04:58:27 +00:00
if len ( objs ) > listVal . Cap ( ) && pred . Label . Empty ( ) && pred . Field . Empty ( ) {
// Resize the slice appropriately, since we already know that none
// of the elements will be filtered out.
listVal . Set ( reflect . MakeSlice ( reflect . SliceOf ( c . objectType . Elem ( ) ) , 0 , len ( objs ) ) )
trace . Step ( "Resized result" )
}
for _ , obj := range objs {
elem , ok := obj . ( * storeElement )
if ! ok {
return fmt . Errorf ( "non *storeElement returned from storage: %v" , obj )
}
2019-04-07 17:07:55 +00:00
if filter ( elem . Key , elem . Labels , elem . Fields ) {
2019-01-12 04:58:27 +00:00
listVal . Set ( reflect . Append ( listVal , reflect . ValueOf ( elem . Object ) . Elem ( ) ) )
}
}
2019-09-27 21:51:53 +00:00
trace . Step ( "Filtered items" , utiltrace . Field { "count" , listVal . Len ( ) } )
2019-01-12 04:58:27 +00:00
if c . versioner != nil {
2019-08-30 18:33:25 +00:00
if err := c . versioner . UpdateList ( listObj , readResourceVersion , "" , nil ) ; err != nil {
2019-01-12 04:58:27 +00:00
return err
}
}
return nil
}
// GuaranteedUpdate implements storage.Interface.
func ( c * Cacher ) GuaranteedUpdate (
ctx context . Context , key string , ptrToType runtime . Object , ignoreNotFound bool ,
2020-12-01 01:06:26 +00:00
preconditions * storage . Preconditions , tryUpdate storage . UpdateFunc , _ runtime . Object ) error {
2019-01-12 04:58:27 +00:00
// Ignore the suggestion and try to pass down the current version of the object
// read from cache.
if elem , exists , err := c . watchCache . GetByKey ( key ) ; err != nil {
klog . Errorf ( "GetByKey returned error: %v" , err )
} else if exists {
2021-03-18 22:40:29 +00:00
// DeepCopy the object since we modify resource version when serializing the
// current object.
2019-01-12 04:58:27 +00:00
currObj := elem . ( * storeElement ) . Object . DeepCopyObject ( )
return c . storage . GuaranteedUpdate ( ctx , key , ptrToType , ignoreNotFound , preconditions , tryUpdate , currObj )
}
// If we couldn't get the object, fallback to no-suggestion.
2020-12-01 01:06:26 +00:00
return c . storage . GuaranteedUpdate ( ctx , key , ptrToType , ignoreNotFound , preconditions , tryUpdate , nil )
2019-01-12 04:58:27 +00:00
}
// Count implements storage.Interface.
func ( c * Cacher ) Count ( pathPrefix string ) ( int64 , error ) {
return c . storage . Count ( pathPrefix )
}
2019-12-12 01:27:03 +00:00
// baseObjectThreadUnsafe omits locking for cachingObject.
func baseObjectThreadUnsafe ( object runtime . Object ) runtime . Object {
if co , ok := object . ( * cachingObject ) ; ok {
return co . object
}
return object
}
func ( c * Cacher ) triggerValuesThreadUnsafe ( event * watchCacheEvent ) ( [ ] string , bool ) {
2019-09-27 21:51:53 +00:00
if c . indexedTrigger == nil {
2019-01-12 04:58:27 +00:00
return nil , false
}
2019-09-27 21:51:53 +00:00
2019-01-12 04:58:27 +00:00
result := make ( [ ] string , 0 , 2 )
2019-12-12 01:27:03 +00:00
result = append ( result , c . indexedTrigger . indexerFunc ( baseObjectThreadUnsafe ( event . Object ) ) )
2019-01-12 04:58:27 +00:00
if event . PrevObject == nil {
2019-09-27 21:51:53 +00:00
return result , true
2019-01-12 04:58:27 +00:00
}
2019-12-12 01:27:03 +00:00
prevTriggerValue := c . indexedTrigger . indexerFunc ( baseObjectThreadUnsafe ( event . PrevObject ) )
2019-09-27 21:51:53 +00:00
if result [ 0 ] != prevTriggerValue {
result = append ( result , prevTriggerValue )
2019-01-12 04:58:27 +00:00
}
2019-09-27 21:51:53 +00:00
return result , true
2019-01-12 04:58:27 +00:00
}
func ( c * Cacher ) processEvent ( event * watchCacheEvent ) {
if curLen := int64 ( len ( c . incoming ) ) ; c . incomingHWM . Update ( curLen ) {
// Monitor if this gets backed up, and how much.
klog . V ( 1 ) . Infof ( "cacher (%v): %v objects queued in incoming channel." , c . objectType . String ( ) , curLen )
}
c . incoming <- * event
}
func ( c * Cacher ) dispatchEvents ( ) {
2019-08-30 18:33:25 +00:00
// Jitter to help level out any aggregate load.
bookmarkTimer := c . clock . NewTimer ( wait . Jitter ( time . Second , 0.25 ) )
defer bookmarkTimer . Stop ( )
lastProcessedResourceVersion := uint64 ( 0 )
2019-01-12 04:58:27 +00:00
for {
select {
case event , ok := <- c . incoming :
if ! ok {
return
}
2020-12-01 01:06:26 +00:00
// Don't dispatch bookmarks coming from the storage layer.
// They can be very frequent (even to the level of subseconds)
// to allow efficient watch resumption on kube-apiserver restarts,
// and propagating them down may overload the whole system.
//
// TODO: If at some point we decide the performance and scalability
// footprint is acceptable, this is the place to hook them in.
// However, we then need to check if this was called as a result
// of a bookmark event or regular Add/Update/Delete operation by
// checking if resourceVersion here has changed.
if event . Type != watch . Bookmark {
c . dispatchEvent ( & event )
}
2019-08-30 18:33:25 +00:00
lastProcessedResourceVersion = event . ResourceVersion
case <- bookmarkTimer . C ( ) :
bookmarkTimer . Reset ( wait . Jitter ( time . Second , 0.25 ) )
// Never send a bookmark event if we did not see an event here, this is fine
// because we don't provide any guarantees on sending bookmarks.
if lastProcessedResourceVersion == 0 {
2019-09-27 21:51:53 +00:00
// pop expired watchers in case there has been no update
c . bookmarkWatchers . popExpiredWatchers ( )
2019-08-30 18:33:25 +00:00
continue
}
bookmarkEvent := & watchCacheEvent {
Type : watch . Bookmark ,
Object : c . newFunc ( ) ,
ResourceVersion : lastProcessedResourceVersion ,
}
if err := c . versioner . UpdateObject ( bookmarkEvent . Object , bookmarkEvent . ResourceVersion ) ; err != nil {
klog . Errorf ( "failure to set resourceVersion to %d on bookmark event %+v" , bookmarkEvent . ResourceVersion , bookmarkEvent . Object )
continue
}
c . dispatchEvent ( bookmarkEvent )
2019-01-12 04:58:27 +00:00
case <- c . stopCh :
return
}
}
}
2019-12-12 01:27:03 +00:00
func setCachingObjects ( event * watchCacheEvent , versioner storage . Versioner ) {
switch event . Type {
case watch . Added , watch . Modified :
if object , err := newCachingObject ( event . Object ) ; err == nil {
event . Object = object
} else {
klog . Errorf ( "couldn't create cachingObject from: %#v" , event . Object )
}
// Don't wrap PrevObject for update event (for create events it is nil).
// We only encode those to deliver DELETE watch events, so if
// event.Object is not nil it can be used only for watchers for which
// selector was satisfied for its previous version and is no longer
// satisfied for the current version.
// This is rare enough that it doesn't justify making deep-copy of the
// object (done by newCachingObject) every time.
case watch . Deleted :
// Don't wrap Object for delete events - these are not to deliver any
// events. Only wrap PrevObject.
if object , err := newCachingObject ( event . PrevObject ) ; err == nil {
// Update resource version of the underlying object.
// event.PrevObject is used to deliver DELETE watch events and
// for them, we set resourceVersion to <current> instead of
// the resourceVersion of the last modification of the object.
updateResourceVersionIfNeeded ( object . object , versioner , event . ResourceVersion )
event . PrevObject = object
} else {
klog . Errorf ( "couldn't create cachingObject from: %#v" , event . Object )
}
}
}
2019-01-12 04:58:27 +00:00
func ( c * Cacher ) dispatchEvent ( event * watchCacheEvent ) {
2019-04-07 17:07:55 +00:00
c . startDispatching ( event )
2019-08-30 18:33:25 +00:00
defer c . finishDispatching ( )
// Watchers stopped after startDispatching will be delayed to finishDispatching,
2019-04-07 17:07:55 +00:00
// Since add() can block, we explicitly add when cacher is unlocked.
2019-09-27 21:51:53 +00:00
// Dispatching event in nonblocking way first, which make faster watchers
// not be blocked by slower ones.
2019-08-30 18:33:25 +00:00
if event . Type == watch . Bookmark {
for _ , watcher := range c . watchersBuffer {
watcher . nonblockingAdd ( event )
}
} else {
2019-12-12 01:27:03 +00:00
// Set up caching of object serializations only for dispatching this event.
//
// Storing serializations in memory would result in increased memory usage,
// but it would help for caching encodings for watches started from old
// versions. However, we still don't have a convincing data that the gain
// from it justifies increased memory usage, so for now we drop the cached
// serializations after dispatching this event.
//
// Given the deep-copies that are done to create cachingObjects,
// we try to cache serializations only if there are at least 3 watchers.
if len ( c . watchersBuffer ) >= 3 {
// Make a shallow copy to allow overwriting Object and PrevObject.
wcEvent := * event
setCachingObjects ( & wcEvent , c . versioner )
event = & wcEvent
}
2019-09-27 21:51:53 +00:00
c . blockedWatchers = c . blockedWatchers [ : 0 ]
2019-08-30 18:33:25 +00:00
for _ , watcher := range c . watchersBuffer {
2019-09-27 21:51:53 +00:00
if ! watcher . nonblockingAdd ( event ) {
c . blockedWatchers = append ( c . blockedWatchers , watcher )
}
}
if len ( c . blockedWatchers ) > 0 {
// dispatchEvent is called very often, so arrange
// to reuse timers instead of constantly allocating.
startTime := time . Now ( )
timeout := c . dispatchTimeoutBudget . takeAvailable ( )
c . timer . Reset ( timeout )
// Make sure every watcher will try to send event without blocking first,
// even if the timer has already expired.
timer := c . timer
for _ , watcher := range c . blockedWatchers {
if ! watcher . add ( event , timer ) {
// fired, clean the timer by set it to nil.
timer = nil
}
}
// Stop the timer if it is not fired
if timer != nil && ! timer . Stop ( ) {
// Consume triggered (but not yet received) timer event
// so that future reuse does not get a spurious timeout.
<- timer . C
}
c . dispatchTimeoutBudget . returnUnused ( timeout - time . Since ( startTime ) )
2019-08-30 18:33:25 +00:00
}
2019-04-07 17:07:55 +00:00
}
2019-08-30 18:33:25 +00:00
}
2019-04-07 17:07:55 +00:00
2019-08-30 18:33:25 +00:00
func ( c * Cacher ) startDispatchingBookmarkEvents ( ) {
// Pop already expired watchers. However, explicitly ignore stopped ones,
// as we don't delete watcher from bookmarkWatchers when it is stopped.
for _ , watchers := range c . bookmarkWatchers . popExpiredWatchers ( ) {
for _ , watcher := range watchers {
2019-09-27 21:51:53 +00:00
// c.Lock() is held here.
// watcher.stopThreadUnsafe() is protected by c.Lock()
2019-08-30 18:33:25 +00:00
if watcher . stopped {
continue
}
c . watchersBuffer = append ( c . watchersBuffer , watcher )
2020-08-10 17:43:49 +00:00
// Requeue the watcher for the next bookmark if needed.
c . bookmarkWatchers . addWatcher ( watcher )
2019-08-30 18:33:25 +00:00
}
}
2019-04-07 17:07:55 +00:00
}
// startDispatching chooses watchers potentially interested in a given event
// a marks dispatching as true.
func ( c * Cacher ) startDispatching ( event * watchCacheEvent ) {
2019-12-12 01:27:03 +00:00
// It is safe to call triggerValuesThreadUnsafe here, because at this
// point only this thread can access this event (we create a separate
// watchCacheEvent for every dispatch).
triggerValues , supported := c . triggerValuesThreadUnsafe ( event )
2019-01-12 04:58:27 +00:00
c . Lock ( )
defer c . Unlock ( )
2019-04-07 17:07:55 +00:00
c . dispatching = true
// We are reusing the slice to avoid memory reallocations in every
// dispatchEvent() call. That may prevent Go GC from freeing items
// from previous phases that are sitting behind the current length
// of the slice, but there is only a limited number of those and the
// gain from avoiding memory allocations is much bigger.
c . watchersBuffer = c . watchersBuffer [ : 0 ]
2019-08-30 18:33:25 +00:00
if event . Type == watch . Bookmark {
c . startDispatchingBookmarkEvents ( )
// return here to reduce following code indentation and diff
return
}
2019-01-12 04:58:27 +00:00
// Iterate over "allWatchers" no matter what the trigger function is.
for _ , watcher := range c . watchers . allWatchers {
2019-04-07 17:07:55 +00:00
c . watchersBuffer = append ( c . watchersBuffer , watcher )
2019-01-12 04:58:27 +00:00
}
if supported {
// Iterate over watchers interested in the given values of the trigger.
for _ , triggerValue := range triggerValues {
for _ , watcher := range c . watchers . valueWatchers [ triggerValue ] {
2019-04-07 17:07:55 +00:00
c . watchersBuffer = append ( c . watchersBuffer , watcher )
2019-01-12 04:58:27 +00:00
}
}
} else {
// supported equal to false generally means that trigger function
// is not defined (or not aware of any indexes). In this case,
// watchers filters should generally also don't generate any
// trigger values, but can cause problems in case of some
// misconfiguration. Thus we paranoidly leave this branch.
// Iterate over watchers interested in exact values for all values.
for _ , watchers := range c . watchers . valueWatchers {
for _ , watcher := range watchers {
2019-04-07 17:07:55 +00:00
c . watchersBuffer = append ( c . watchersBuffer , watcher )
2019-01-12 04:58:27 +00:00
}
}
}
}
2019-04-07 17:07:55 +00:00
// finishDispatching stops all the watchers that were supposed to be
// stopped in the meantime, but it was deferred to avoid closing input
// channels of watchers, as add() may still have writing to it.
// It also marks dispatching as false.
func ( c * Cacher ) finishDispatching ( ) {
c . Lock ( )
defer c . Unlock ( )
c . dispatching = false
for _ , watcher := range c . watchersToStop {
2019-09-27 21:51:53 +00:00
watcher . stopThreadUnsafe ( )
2019-04-07 17:07:55 +00:00
}
c . watchersToStop = c . watchersToStop [ : 0 ]
}
2019-01-12 04:58:27 +00:00
func ( c * Cacher ) terminateAllWatchers ( ) {
c . Lock ( )
defer c . Unlock ( )
2019-04-07 17:07:55 +00:00
c . watchers . terminateAll ( c . objectType , c . stopWatcherThreadUnsafe )
}
func ( c * Cacher ) stopWatcherThreadUnsafe ( watcher * cacheWatcher ) {
if c . dispatching {
c . watchersToStop = append ( c . watchersToStop , watcher )
} else {
2019-09-27 21:51:53 +00:00
watcher . stopThreadUnsafe ( )
2019-04-07 17:07:55 +00:00
}
2019-01-12 04:58:27 +00:00
}
func ( c * Cacher ) isStopped ( ) bool {
c . stopLock . RLock ( )
defer c . stopLock . RUnlock ( )
return c . stopped
}
// Stop implements the graceful termination.
func ( c * Cacher ) Stop ( ) {
c . stopLock . Lock ( )
if c . stopped {
2019-08-30 18:33:25 +00:00
// avoid stopping twice (note: cachers are shared with subresources)
2019-01-12 04:58:27 +00:00
c . stopLock . Unlock ( )
return
}
c . stopped = true
c . stopLock . Unlock ( )
close ( c . stopCh )
c . stopWg . Wait ( )
}
2019-04-07 17:07:55 +00:00
func forgetWatcher ( c * Cacher , index int , triggerValue string , triggerSupported bool ) func ( ) {
return func ( ) {
c . Lock ( )
defer c . Unlock ( )
2019-01-12 04:58:27 +00:00
// It's possible that the watcher is already not in the structure (e.g. in case of
2019-09-27 21:51:53 +00:00
// simultaneous Stop() and terminateAllWatchers(), but it is safe to call stopThreadUnsafe()
2019-04-07 17:07:55 +00:00
// on a watcher multiple times.
c . watchers . deleteWatcher ( index , triggerValue , triggerSupported , c . stopWatcherThreadUnsafe )
2019-01-12 04:58:27 +00:00
}
}
func filterWithAttrsFunction ( key string , p storage . SelectionPredicate ) filterWithAttrsFunc {
2019-04-07 17:07:55 +00:00
filterFunc := func ( objKey string , label labels . Set , field fields . Set ) bool {
2019-01-12 04:58:27 +00:00
if ! hasPathPrefix ( objKey , key ) {
return false
}
2019-04-07 17:07:55 +00:00
return p . MatchesObjectAttributes ( label , field )
2019-01-12 04:58:27 +00:00
}
return filterFunc
}
// LastSyncResourceVersion returns resource version to which the underlying cache is synced.
func ( c * Cacher ) LastSyncResourceVersion ( ) ( uint64 , error ) {
c . ready . wait ( )
resourceVersion := c . reflector . LastSyncResourceVersion ( )
return c . versioner . ParseResourceVersion ( resourceVersion )
}
// cacherListerWatcher opaques storage.Interface to expose cache.ListerWatcher.
type cacherListerWatcher struct {
storage storage . Interface
resourcePrefix string
newListFunc func ( ) runtime . Object
}
2019-08-30 18:33:25 +00:00
// NewCacherListerWatcher returns a storage.Interface backed ListerWatcher.
func NewCacherListerWatcher ( storage storage . Interface , resourcePrefix string , newListFunc func ( ) runtime . Object ) cache . ListerWatcher {
2019-01-12 04:58:27 +00:00
return & cacherListerWatcher {
storage : storage ,
resourcePrefix : resourcePrefix ,
newListFunc : newListFunc ,
}
}
// Implements cache.ListerWatcher interface.
func ( lw * cacherListerWatcher ) List ( options metav1 . ListOptions ) ( runtime . Object , error ) {
list := lw . newListFunc ( )
2019-08-30 18:33:25 +00:00
pred := storage . SelectionPredicate {
Label : labels . Everything ( ) ,
Field : fields . Everything ( ) ,
Limit : options . Limit ,
Continue : options . Continue ,
}
2020-08-10 17:43:49 +00:00
if err := lw . storage . List ( context . TODO ( ) , lw . resourcePrefix , storage . ListOptions { ResourceVersionMatch : options . ResourceVersionMatch , Predicate : pred } , list ) ; err != nil {
2019-01-12 04:58:27 +00:00
return nil , err
}
return list , nil
}
// Implements cache.ListerWatcher interface.
func ( lw * cacherListerWatcher ) Watch ( options metav1 . ListOptions ) ( watch . Interface , error ) {
2020-12-01 01:06:26 +00:00
opts := storage . ListOptions {
ResourceVersion : options . ResourceVersion ,
Predicate : storage . Everything ,
}
if utilfeature . DefaultFeatureGate . Enabled ( features . EfficientWatchResumption ) {
opts . ProgressNotify = true
}
return lw . storage . WatchList ( context . TODO ( ) , lw . resourcePrefix , opts )
2019-01-12 04:58:27 +00:00
}
// errWatcher implements watch.Interface to return a single error
type errWatcher struct {
result chan watch . Event
}
func newErrWatcher ( err error ) * errWatcher {
// Create an error event
errEvent := watch . Event { Type : watch . Error }
switch err := err . ( type ) {
case runtime . Object :
errEvent . Object = err
case * errors . StatusError :
errEvent . Object = & err . ErrStatus
default :
errEvent . Object = & metav1 . Status {
Status : metav1 . StatusFailure ,
Message : err . Error ( ) ,
Reason : metav1 . StatusReasonInternalError ,
Code : http . StatusInternalServerError ,
}
}
// Create a watcher with room for a single event, populate it, and close the channel
watcher := & errWatcher { result : make ( chan watch . Event , 1 ) }
watcher . result <- errEvent
close ( watcher . result )
return watcher
}
// Implements watch.Interface.
func ( c * errWatcher ) ResultChan ( ) <- chan watch . Event {
return c . result
}
// Implements watch.Interface.
func ( c * errWatcher ) Stop ( ) {
// no-op
}
// cacheWatcher implements watch.Interface
2019-09-27 21:51:53 +00:00
// this is not thread-safe
2019-01-12 04:58:27 +00:00
type cacheWatcher struct {
input chan * watchCacheEvent
result chan watch . Event
done chan struct { }
filter filterWithAttrsFunc
stopped bool
2019-04-07 17:07:55 +00:00
forget func ( )
2019-01-12 04:58:27 +00:00
versioner storage . Versioner
2019-08-30 18:33:25 +00:00
// The watcher will be closed by server after the deadline,
// save it here to send bookmark events before that.
deadline time . Time
allowWatchBookmarks bool
// Object type of the cache watcher interests
objectType reflect . Type
2021-03-18 22:40:29 +00:00
// human readable identifier that helps assigning cacheWatcher
// instance with request
identifier string
2019-01-12 04:58:27 +00:00
}
2021-03-18 22:40:29 +00:00
func newCacheWatcher ( chanSize int , filter filterWithAttrsFunc , forget func ( ) , versioner storage . Versioner , deadline time . Time , allowWatchBookmarks bool , objectType reflect . Type , identifier string ) * cacheWatcher {
2019-08-30 18:33:25 +00:00
return & cacheWatcher {
input : make ( chan * watchCacheEvent , chanSize ) ,
result : make ( chan watch . Event , chanSize ) ,
done : make ( chan struct { } ) ,
filter : filter ,
stopped : false ,
forget : forget ,
versioner : versioner ,
deadline : deadline ,
allowWatchBookmarks : allowWatchBookmarks ,
objectType : objectType ,
2021-03-18 22:40:29 +00:00
identifier : identifier ,
2019-01-12 04:58:27 +00:00
}
}
// Implements watch.Interface.
func ( c * cacheWatcher ) ResultChan ( ) <- chan watch . Event {
return c . result
}
// Implements watch.Interface.
func ( c * cacheWatcher ) Stop ( ) {
2019-04-07 17:07:55 +00:00
c . forget ( )
2019-01-12 04:58:27 +00:00
}
2019-09-27 21:51:53 +00:00
// we rely on the fact that stopThredUnsafe is actually protected by Cacher.Lock()
func ( c * cacheWatcher ) stopThreadUnsafe ( ) {
2019-01-12 04:58:27 +00:00
if ! c . stopped {
c . stopped = true
close ( c . done )
close ( c . input )
}
}
2019-08-30 18:33:25 +00:00
func ( c * cacheWatcher ) nonblockingAdd ( event * watchCacheEvent ) bool {
2019-01-12 04:58:27 +00:00
select {
case c . input <- event :
2019-08-30 18:33:25 +00:00
return true
2019-01-12 04:58:27 +00:00
default :
2019-08-30 18:33:25 +00:00
return false
}
}
2019-09-27 21:51:53 +00:00
// Nil timer means that add will not block (if it can't send event immediately, it will break the watcher)
func ( c * cacheWatcher ) add ( event * watchCacheEvent , timer * time . Timer ) bool {
2019-08-30 18:33:25 +00:00
// Try to send the event immediately, without blocking.
if c . nonblockingAdd ( event ) {
2019-09-27 21:51:53 +00:00
return true
2019-01-12 04:58:27 +00:00
}
2019-09-27 21:51:53 +00:00
closeFunc := func ( ) {
2019-01-12 04:58:27 +00:00
// This means that we couldn't send event to that watcher.
// Since we don't want to block on it infinitely,
// we simply terminate it.
2021-03-18 22:40:29 +00:00
klog . V ( 1 ) . Infof ( "Forcing %v watcher close due to unresponsiveness: %v. len(c.input) = %v, len(c.result) = %v" , c . objectType . String ( ) , c . identifier , len ( c . input ) , len ( c . result ) )
terminatedWatchersCounter . WithLabelValues ( c . objectType . String ( ) ) . Inc ( )
2019-04-07 17:07:55 +00:00
c . forget ( )
2019-01-12 04:58:27 +00:00
}
2019-09-27 21:51:53 +00:00
if timer == nil {
closeFunc ( )
return false
}
// OK, block sending, but only until timer fires.
select {
case c . input <- event :
return true
case <- timer . C :
closeFunc ( )
return false
}
2019-01-12 04:58:27 +00:00
}
2020-08-10 17:43:49 +00:00
func ( c * cacheWatcher ) nextBookmarkTime ( now time . Time , bookmarkFrequency time . Duration ) ( time . Time , bool ) {
// We try to send bookmarks:
// (a) roughly every minute
// (b) right before the watcher timeout - for now we simply set it 2s before
// the deadline
// The former gives us periodicity if the watch breaks due to unexpected
// conditions, the later ensures that on timeout the watcher is as close to
// now as possible - this covers 99% of cases.
heartbeatTime := now . Add ( bookmarkFrequency )
2019-08-30 18:33:25 +00:00
if c . deadline . IsZero ( ) {
2020-08-10 17:43:49 +00:00
// Timeout is set by our client libraries (e.g. reflector) as well as defaulted by
// apiserver if properly configured. So this shoudln't happen in practice.
return heartbeatTime , true
}
if pretimeoutTime := c . deadline . Add ( - 2 * time . Second ) ; pretimeoutTime . Before ( heartbeatTime ) {
heartbeatTime = pretimeoutTime
}
if heartbeatTime . Before ( now ) {
return time . Time { } , false
2019-08-30 18:33:25 +00:00
}
2020-08-10 17:43:49 +00:00
return heartbeatTime , true
2019-08-30 18:33:25 +00:00
}
2019-12-12 01:27:03 +00:00
func getEventObject ( object runtime . Object ) runtime . Object {
if _ , ok := object . ( runtime . CacheableObject ) ; ok {
// It is safe to return without deep-copy, because the underlying
// object was already deep-copied during construction.
return object
}
return object . DeepCopyObject ( )
}
func updateResourceVersionIfNeeded ( object runtime . Object , versioner storage . Versioner , resourceVersion uint64 ) {
if _ , ok := object . ( * cachingObject ) ; ok {
// We assume that for cachingObject resourceVersion was already propagated before.
return
}
if err := versioner . UpdateObject ( object , resourceVersion ) ; err != nil {
utilruntime . HandleError ( fmt . Errorf ( "failure to version api object (%d) %#v: %v" , resourceVersion , object , err ) )
}
}
2019-08-30 18:33:25 +00:00
func ( c * cacheWatcher ) convertToWatchEvent ( event * watchCacheEvent ) * watch . Event {
if event . Type == watch . Bookmark {
return & watch . Event { Type : watch . Bookmark , Object : event . Object . DeepCopyObject ( ) }
}
2019-04-07 17:07:55 +00:00
curObjPasses := event . Type != watch . Deleted && c . filter ( event . Key , event . ObjLabels , event . ObjFields )
2019-01-12 04:58:27 +00:00
oldObjPasses := false
if event . PrevObject != nil {
2019-04-07 17:07:55 +00:00
oldObjPasses = c . filter ( event . Key , event . PrevObjLabels , event . PrevObjFields )
2019-01-12 04:58:27 +00:00
}
if ! curObjPasses && ! oldObjPasses {
// Watcher is not interested in that object.
2019-08-30 18:33:25 +00:00
return nil
2019-01-12 04:58:27 +00:00
}
switch {
case curObjPasses && ! oldObjPasses :
2019-12-12 01:27:03 +00:00
return & watch . Event { Type : watch . Added , Object : getEventObject ( event . Object ) }
2019-01-12 04:58:27 +00:00
case curObjPasses && oldObjPasses :
2019-12-12 01:27:03 +00:00
return & watch . Event { Type : watch . Modified , Object : getEventObject ( event . Object ) }
2019-01-12 04:58:27 +00:00
case ! curObjPasses && oldObjPasses :
// return a delete event with the previous object content, but with the event's resource version
2019-12-12 01:27:03 +00:00
oldObj := getEventObject ( event . PrevObject )
updateResourceVersionIfNeeded ( oldObj , c . versioner , event . ResourceVersion )
2019-08-30 18:33:25 +00:00
return & watch . Event { Type : watch . Deleted , Object : oldObj }
}
return nil
}
// NOTE: sendWatchCacheEvent is assumed to not modify <event> !!!
func ( c * cacheWatcher ) sendWatchCacheEvent ( event * watchCacheEvent ) {
watchEvent := c . convertToWatchEvent ( event )
if watchEvent == nil {
// Watcher is not interested in that object.
return
2019-01-12 04:58:27 +00:00
}
// We need to ensure that if we put event X to the c.result, all
// previous events were already put into it before, no matter whether
// c.done is close or not.
// Thus we cannot simply select from c.done and c.result and this
// would give us non-determinism.
// At the same time, we don't want to block infinitely on putting
// to c.result, when c.done is already closed.
// This ensures that with c.done already close, we at most once go
// into the next select after this. With that, no matter which
// statement we choose there, we will deliver only consecutive
// events.
select {
case <- c . done :
return
default :
}
select {
2019-08-30 18:33:25 +00:00
case c . result <- * watchEvent :
2019-01-12 04:58:27 +00:00
case <- c . done :
}
}
2019-08-30 18:33:25 +00:00
func ( c * cacheWatcher ) process ( ctx context . Context , initEvents [ ] * watchCacheEvent , resourceVersion uint64 ) {
2019-01-12 04:58:27 +00:00
defer utilruntime . HandleCrash ( )
// Check how long we are processing initEvents.
// As long as these are not processed, we are not processing
// any incoming events, so if it takes long, we may actually
// block all watchers for some time.
// TODO: From the logs it seems that there happens processing
// times even up to 1s which is very long. However, this doesn't
// depend that much on the number of initEvents. E.g. from the
// 2000-node Kubemark run we have logs like this, e.g.:
// ... processing 13862 initEvents took 66.808689ms
// ... processing 14040 initEvents took 993.532539ms
// We should understand what is blocking us in those cases (e.g.
// is it lack of CPU, network, or sth else) and potentially
// consider increase size of result buffer in those cases.
const initProcessThreshold = 500 * time . Millisecond
startTime := time . Now ( )
for _ , event := range initEvents {
c . sendWatchCacheEvent ( event )
}
2019-08-30 18:33:25 +00:00
objType := c . objectType . String ( )
2019-04-07 17:07:55 +00:00
if len ( initEvents ) > 0 {
initCounter . WithLabelValues ( objType ) . Add ( float64 ( len ( initEvents ) ) )
}
2019-01-12 04:58:27 +00:00
processingTime := time . Since ( startTime )
if processingTime > initProcessThreshold {
2021-03-18 22:40:29 +00:00
klog . V ( 2 ) . Infof ( "processing %d initEvents of %s (%s) took %v" , len ( initEvents ) , objType , c . identifier , processingTime )
2019-01-12 04:58:27 +00:00
}
2021-07-02 08:43:15 +00:00
// At this point we already start processing incoming watch events.
// However, the init event can still be processed because their serialization
// and sending to the client happens asynchrnously.
// TODO: As describe in the KEP, we would like to estimate that by delaying
// the initialization signal proportionally to the number of events to
// process, but we're leaving this to the tuning phase.
utilflowcontrol . WatchInitialized ( ctx )
2019-01-12 04:58:27 +00:00
defer close ( c . result )
defer c . Stop ( )
2019-08-30 18:33:25 +00:00
for {
select {
case event , ok := <- c . input :
if ! ok {
return
}
// only send events newer than resourceVersion
if event . ResourceVersion > resourceVersion {
c . sendWatchCacheEvent ( event )
}
case <- ctx . Done ( ) :
return
2019-01-12 04:58:27 +00:00
}
}
}
type ready struct {
ok bool
c * sync . Cond
}
func newReady ( ) * ready {
2019-08-30 18:33:25 +00:00
return & ready { c : sync . NewCond ( & sync . RWMutex { } ) }
2019-01-12 04:58:27 +00:00
}
func ( r * ready ) wait ( ) {
r . c . L . Lock ( )
for ! r . ok {
r . c . Wait ( )
}
r . c . L . Unlock ( )
}
// TODO: Make check() function more sophisticated, in particular
// allow it to behave as "waitWithTimeout".
func ( r * ready ) check ( ) bool {
2019-08-30 18:33:25 +00:00
rwMutex := r . c . L . ( * sync . RWMutex )
rwMutex . RLock ( )
defer rwMutex . RUnlock ( )
2019-01-12 04:58:27 +00:00
return r . ok
}
func ( r * ready ) set ( ok bool ) {
r . c . L . Lock ( )
defer r . c . L . Unlock ( )
r . ok = ok
r . c . Broadcast ( )
}