2019-01-12 04:58:27 +00:00
/ *
Copyright 2015 The Kubernetes Authors .
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
package cache
import (
"fmt"
"sync"
"time"
2020-03-26 21:07:15 +00:00
"k8s.io/apimachinery/pkg/api/meta"
2019-01-12 04:58:27 +00:00
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
2019-04-07 17:07:55 +00:00
"k8s.io/utils/buffer"
2019-01-12 04:58:27 +00:00
"k8s.io/klog"
)
2019-08-30 18:33:25 +00:00
// SharedInformer provides eventually consistent linkage of its
// clients to the authoritative state of a given collection of
// objects. An object is identified by its API group, kind/resource,
2019-09-27 21:51:53 +00:00
// namespace, and name; the `ObjectMeta.UID` is not part of an
// object's ID as far as this contract is concerned. One
// SharedInformer provides linkage to objects of a particular API
// group and kind/resource. The linked object collection of a
// SharedInformer may be further restricted to one namespace and/or by
// label selector and/or field selector.
2019-08-30 18:33:25 +00:00
//
// The authoritative state of an object is what apiservers provide
// access to, and an object goes through a strict sequence of states.
2019-09-27 21:51:53 +00:00
// An object state is either "absent" or present with a
// ResourceVersion and other appropriate content.
2019-08-30 18:33:25 +00:00
//
2019-09-27 21:51:53 +00:00
// A SharedInformer maintains a local cache, exposed by GetStore() and
// by GetIndexer() in the case of an indexed informer, of the state of
// each relevant object. This cache is eventually consistent with the
// authoritative state. This means that, unless prevented by
// persistent communication problems, if ever a particular object ID X
// is authoritatively associated with a state S then for every
// SharedInformer I whose collection includes (X, S) eventually either
// (1) I's cache associates X with S or a later state of X, (2) I is
// stopped, or (3) the authoritative state service for X terminates.
// To be formally complete, we say that the absent state meets any
// restriction by label selector or field selector.
//
2020-03-26 21:07:15 +00:00
// For a given informer and relevant object ID X, the sequence of
// states that appears in the informer's cache is a subsequence of the
// states authoritatively associated with X. That is, some states
// might never appear in the cache but ordering among the appearing
// states is correct. Note, however, that there is no promise about
// ordering between states seen for different objects.
//
2019-09-27 21:51:53 +00:00
// The local cache starts out empty, and gets populated and updated
// during `Run()`.
2019-08-30 18:33:25 +00:00
//
2020-03-26 21:07:15 +00:00
// As a simple example, if a collection of objects is henceforth
2019-09-27 21:51:53 +00:00
// unchanging, a SharedInformer is created that links to that
// collection, and that SharedInformer is `Run()` then that
// SharedInformer's cache eventually holds an exact copy of that
// collection (unless it is stopped too soon, the authoritative state
// service ends, or communication problems between the two
// persistently thwart achievement).
2019-08-30 18:33:25 +00:00
//
// As another simple example, if the local cache ever holds a
// non-absent state for some object ID and the object is eventually
// removed from the authoritative state then eventually the object is
// removed from the local cache (unless the SharedInformer is stopped
2019-09-27 21:51:53 +00:00
// too soon, the authoritative state service ends, or communication
2019-08-30 18:33:25 +00:00
// problems persistently thwart the desired result).
//
2019-09-27 21:51:53 +00:00
// The keys in the Store are of the form namespace/name for namespaced
2019-08-30 18:33:25 +00:00
// objects, and are simply the name for non-namespaced objects.
2019-09-27 21:51:53 +00:00
// Clients can use `MetaNamespaceKeyFunc(obj)` to extract the key for
// a given object, and `SplitMetaNamespaceKey(key)` to split a key
// into its constituent parts.
2019-08-30 18:33:25 +00:00
//
2020-03-26 21:07:15 +00:00
// Every query against the local cache is answered entirely from one
// snapshot of the cache's state. Thus, the result of a `List` call
// will not contain two entries with the same namespace and name.
//
2019-08-30 18:33:25 +00:00
// A client is identified here by a ResourceEventHandler. For every
2019-09-27 21:51:53 +00:00
// update to the SharedInformer's local cache and for every client
// added before `Run()`, eventually either the SharedInformer is
// stopped or the client is notified of the update. A client added
// after `Run()` starts gets a startup batch of notifications of
// additions of the object existing in the cache at the time that
// client was added; also, for every update to the SharedInformer's
// local cache after that client was added, eventually either the
// SharedInformer is stopped or that client is notified of that
// update. Client notifications happen after the corresponding cache
// update and, in the case of a SharedIndexInformer, after the
// corresponding index updates. It is possible that additional cache
// and index updates happen before such a prescribed notification.
// For a given SharedInformer and client, the notifications are
// delivered sequentially. For a given SharedInformer, client, and
2020-03-26 21:07:15 +00:00
// object ID, the notifications are delivered in order. Because
// `ObjectMeta.UID` has no role in identifying objects, it is possible
// that when (1) object O1 with ID (e.g. namespace and name) X and
// `ObjectMeta.UID` U1 in the SharedInformer's local cache is deleted
// and later (2) another object O2 with ID X and ObjectMeta.UID U2 is
// created the informer's clients are not notified of (1) and (2) but
// rather are notified only of an update from O1 to O2. Clients that
// need to detect such cases might do so by comparing the `ObjectMeta.UID`
// field of the old and the new object in the code that handles update
// notifications (i.e. `OnUpdate` method of ResourceEventHandler).
2019-09-27 21:51:53 +00:00
//
// A client must process each notification promptly; a SharedInformer
// is not engineered to deal well with a large backlog of
// notifications to deliver. Lengthy processing should be passed off
// to something else, for example through a
// `client-go/util/workqueue`.
//
2019-08-30 18:33:25 +00:00
// A delete notification exposes the last locally known non-absent
// state, except that its ResourceVersion is replaced with a
// ResourceVersion in which the object is actually absent.
2019-01-12 04:58:27 +00:00
type SharedInformer interface {
// AddEventHandler adds an event handler to the shared informer using the shared informer's resync
// period. Events to a single handler are delivered sequentially, but there is no coordination
// between different handlers.
AddEventHandler ( handler ResourceEventHandler )
2019-08-30 18:33:25 +00:00
// AddEventHandlerWithResyncPeriod adds an event handler to the
2020-03-26 21:07:15 +00:00
// shared informer with the requested resync period; zero means
// this handler does not care about resyncs. The resync operation
// consists of delivering to the handler an update notification
// for every object in the informer's local cache; it does not add
// any interactions with the authoritative storage. Some
// informers do no resyncs at all, not even for handlers added
// with a non-zero resyncPeriod. For an informer that does
// resyncs, and for each handler that requests resyncs, that
// informer develops a nominal resync period that is no shorter
// than the requested period but may be longer. The actual time
// between any two resyncs may be longer than the nominal period
// because the implementation takes time to do work and there may
// be competing load and scheduling noise.
2019-01-12 04:58:27 +00:00
AddEventHandlerWithResyncPeriod ( handler ResourceEventHandler , resyncPeriod time . Duration )
2019-08-30 18:33:25 +00:00
// GetStore returns the informer's local cache as a Store.
2019-01-12 04:58:27 +00:00
GetStore ( ) Store
2020-03-26 21:07:15 +00:00
// GetController is deprecated, it does nothing useful
2019-01-12 04:58:27 +00:00
GetController ( ) Controller
2019-08-30 18:33:25 +00:00
// Run starts and runs the shared informer, returning after it stops.
// The informer will be stopped when stopCh is closed.
2019-01-12 04:58:27 +00:00
Run ( stopCh <- chan struct { } )
2019-08-30 18:33:25 +00:00
// HasSynced returns true if the shared informer's store has been
// informed by at least one full LIST of the authoritative state
// of the informer's object collection. This is unrelated to "resync".
2019-01-12 04:58:27 +00:00
HasSynced ( ) bool
// LastSyncResourceVersion is the resource version observed when last synced with the underlying
// store. The value returned is not synchronized with access to the underlying store and is not
// thread-safe.
LastSyncResourceVersion ( ) string
}
2019-09-27 21:51:53 +00:00
// SharedIndexInformer provides add and get Indexers ability based on SharedInformer.
2019-01-12 04:58:27 +00:00
type SharedIndexInformer interface {
SharedInformer
// AddIndexers add indexers to the informer before it starts.
AddIndexers ( indexers Indexers ) error
GetIndexer ( ) Indexer
}
// NewSharedInformer creates a new instance for the listwatcher.
2020-03-26 21:07:15 +00:00
func NewSharedInformer ( lw ListerWatcher , exampleObject runtime . Object , defaultEventHandlerResyncPeriod time . Duration ) SharedInformer {
return NewSharedIndexInformer ( lw , exampleObject , defaultEventHandlerResyncPeriod , Indexers { } )
2019-01-12 04:58:27 +00:00
}
// NewSharedIndexInformer creates a new instance for the listwatcher.
2020-03-26 21:07:15 +00:00
// The created informer will not do resyncs if the given
// defaultEventHandlerResyncPeriod is zero. Otherwise: for each
// handler that with a non-zero requested resync period, whether added
// before or after the informer starts, the nominal resync period is
// the requested resync period rounded up to a multiple of the
// informer's resync checking period. Such an informer's resync
// checking period is established when the informer starts running,
// and is the maximum of (a) the minimum of the resync periods
// requested before the informer starts and the
// defaultEventHandlerResyncPeriod given here and (b) the constant
// `minimumResyncPeriod` defined in this file.
func NewSharedIndexInformer ( lw ListerWatcher , exampleObject runtime . Object , defaultEventHandlerResyncPeriod time . Duration , indexers Indexers ) SharedIndexInformer {
2019-01-12 04:58:27 +00:00
realClock := & clock . RealClock { }
sharedIndexInformer := & sharedIndexInformer {
processor : & sharedProcessor { clock : realClock } ,
indexer : NewIndexer ( DeletionHandlingMetaNamespaceKeyFunc , indexers ) ,
listerWatcher : lw ,
2020-03-26 21:07:15 +00:00
objectType : exampleObject ,
2019-01-12 04:58:27 +00:00
resyncCheckPeriod : defaultEventHandlerResyncPeriod ,
defaultEventHandlerResyncPeriod : defaultEventHandlerResyncPeriod ,
2020-03-26 21:07:15 +00:00
cacheMutationDetector : NewCacheMutationDetector ( fmt . Sprintf ( "%T" , exampleObject ) ) ,
2019-01-12 04:58:27 +00:00
clock : realClock ,
}
return sharedIndexInformer
}
// InformerSynced is a function that can be used to determine if an informer has synced. This is useful for determining if caches have synced.
type InformerSynced func ( ) bool
const (
// syncedPollPeriod controls how often you look at the status of your sync funcs
syncedPollPeriod = 100 * time . Millisecond
// initialBufferSize is the initial number of event notifications that can be buffered.
initialBufferSize = 1024
)
2019-09-27 21:51:53 +00:00
// WaitForNamedCacheSync is a wrapper around WaitForCacheSync that generates log messages
// indicating that the caller identified by name is waiting for syncs, followed by
// either a successful or failed sync.
func WaitForNamedCacheSync ( controllerName string , stopCh <- chan struct { } , cacheSyncs ... InformerSynced ) bool {
klog . Infof ( "Waiting for caches to sync for %s" , controllerName )
if ! WaitForCacheSync ( stopCh , cacheSyncs ... ) {
utilruntime . HandleError ( fmt . Errorf ( "unable to sync caches for %s" , controllerName ) )
return false
}
klog . Infof ( "Caches are synced for %s " , controllerName )
return true
}
2019-01-12 04:58:27 +00:00
// WaitForCacheSync waits for caches to populate. It returns true if it was successful, false
// if the controller should shutdown
2019-09-27 21:51:53 +00:00
// callers should prefer WaitForNamedCacheSync()
2019-01-12 04:58:27 +00:00
func WaitForCacheSync ( stopCh <- chan struct { } , cacheSyncs ... InformerSynced ) bool {
2019-10-16 05:42:28 +00:00
err := wait . PollImmediateUntil ( syncedPollPeriod ,
2019-01-12 04:58:27 +00:00
func ( ) ( bool , error ) {
for _ , syncFunc := range cacheSyncs {
if ! syncFunc ( ) {
return false , nil
}
}
return true , nil
} ,
stopCh )
if err != nil {
klog . V ( 2 ) . Infof ( "stop requested" )
return false
}
klog . V ( 4 ) . Infof ( "caches populated" )
return true
}
2020-03-26 21:07:15 +00:00
// `*sharedIndexInformer` implements SharedIndexInformer and has three
// main components. One is an indexed local cache, `indexer Indexer`.
// The second main component is a Controller that pulls
// objects/notifications using the ListerWatcher and pushes them into
// a DeltaFIFO --- whose knownObjects is the informer's local cache
// --- while concurrently Popping Deltas values from that fifo and
// processing them with `sharedIndexInformer::HandleDeltas`. Each
// invocation of HandleDeltas, which is done with the fifo's lock
// held, processes each Delta in turn. For each Delta this both
// updates the local cache and stuffs the relevant notification into
// the sharedProcessor. The third main component is that
// sharedProcessor, which is responsible for relaying those
// notifications to each of the informer's clients.
2019-01-12 04:58:27 +00:00
type sharedIndexInformer struct {
indexer Indexer
controller Controller
processor * sharedProcessor
2019-09-27 21:51:53 +00:00
cacheMutationDetector MutationDetector
2019-01-12 04:58:27 +00:00
listerWatcher ListerWatcher
2020-03-26 21:07:15 +00:00
// objectType is an example object of the type this informer is
// expected to handle. Only the type needs to be right, except
// that when that is `unstructured.Unstructured` the object's
// `"apiVersion"` and `"kind"` must also be right.
objectType runtime . Object
2019-01-12 04:58:27 +00:00
// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
// shouldResync to check if any of our listeners need a resync.
resyncCheckPeriod time . Duration
// defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
// AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
// value).
defaultEventHandlerResyncPeriod time . Duration
// clock allows for testability
clock clock . Clock
started , stopped bool
startedLock sync . Mutex
// blockDeltas gives a way to stop all event distribution so that a late event handler
// can safely join the shared informer.
blockDeltas sync . Mutex
}
// dummyController hides the fact that a SharedInformer is different from a dedicated one
// where a caller can `Run`. The run method is disconnected in this case, because higher
// level logic will decide when to start the SharedInformer and related controller.
// Because returning information back is always asynchronous, the legacy callers shouldn't
// notice any change in behavior.
type dummyController struct {
informer * sharedIndexInformer
}
func ( v * dummyController ) Run ( stopCh <- chan struct { } ) {
}
func ( v * dummyController ) HasSynced ( ) bool {
return v . informer . HasSynced ( )
}
2019-09-27 21:51:53 +00:00
func ( v * dummyController ) LastSyncResourceVersion ( ) string {
2019-01-12 04:58:27 +00:00
return ""
}
type updateNotification struct {
oldObj interface { }
newObj interface { }
}
type addNotification struct {
newObj interface { }
}
type deleteNotification struct {
oldObj interface { }
}
func ( s * sharedIndexInformer ) Run ( stopCh <- chan struct { } ) {
defer utilruntime . HandleCrash ( )
2020-03-26 21:07:15 +00:00
fifo := NewDeltaFIFOWithOptions ( DeltaFIFOOptions {
KnownObjects : s . indexer ,
EmitDeltaTypeReplaced : true ,
} )
2019-01-12 04:58:27 +00:00
cfg := & Config {
Queue : fifo ,
ListerWatcher : s . listerWatcher ,
ObjectType : s . objectType ,
FullResyncPeriod : s . resyncCheckPeriod ,
RetryOnError : false ,
ShouldResync : s . processor . shouldResync ,
Process : s . HandleDeltas ,
}
func ( ) {
s . startedLock . Lock ( )
defer s . startedLock . Unlock ( )
s . controller = New ( cfg )
s . controller . ( * controller ) . clock = s . clock
s . started = true
} ( )
// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make ( chan struct { } )
var wg wait . Group
defer wg . Wait ( ) // Wait for Processor to stop
defer close ( processorStopCh ) // Tell Processor to stop
wg . StartWithChannel ( processorStopCh , s . cacheMutationDetector . Run )
wg . StartWithChannel ( processorStopCh , s . processor . run )
defer func ( ) {
s . startedLock . Lock ( )
defer s . startedLock . Unlock ( )
s . stopped = true // Don't want any new listeners
} ( )
s . controller . Run ( stopCh )
}
func ( s * sharedIndexInformer ) HasSynced ( ) bool {
s . startedLock . Lock ( )
defer s . startedLock . Unlock ( )
if s . controller == nil {
return false
}
return s . controller . HasSynced ( )
}
func ( s * sharedIndexInformer ) LastSyncResourceVersion ( ) string {
s . startedLock . Lock ( )
defer s . startedLock . Unlock ( )
if s . controller == nil {
return ""
}
return s . controller . LastSyncResourceVersion ( )
}
func ( s * sharedIndexInformer ) GetStore ( ) Store {
return s . indexer
}
func ( s * sharedIndexInformer ) GetIndexer ( ) Indexer {
return s . indexer
}
func ( s * sharedIndexInformer ) AddIndexers ( indexers Indexers ) error {
s . startedLock . Lock ( )
defer s . startedLock . Unlock ( )
if s . started {
2019-12-12 01:27:03 +00:00
return fmt . Errorf ( "informer has already started" )
2019-01-12 04:58:27 +00:00
}
return s . indexer . AddIndexers ( indexers )
}
func ( s * sharedIndexInformer ) GetController ( ) Controller {
return & dummyController { informer : s }
}
func ( s * sharedIndexInformer ) AddEventHandler ( handler ResourceEventHandler ) {
s . AddEventHandlerWithResyncPeriod ( handler , s . defaultEventHandlerResyncPeriod )
}
func determineResyncPeriod ( desired , check time . Duration ) time . Duration {
if desired == 0 {
return desired
}
if check == 0 {
klog . Warningf ( "The specified resyncPeriod %v is invalid because this shared informer doesn't support resyncing" , desired )
return 0
}
if desired < check {
klog . Warningf ( "The specified resyncPeriod %v is being increased to the minimum resyncCheckPeriod %v" , desired , check )
return check
}
return desired
}
const minimumResyncPeriod = 1 * time . Second
func ( s * sharedIndexInformer ) AddEventHandlerWithResyncPeriod ( handler ResourceEventHandler , resyncPeriod time . Duration ) {
s . startedLock . Lock ( )
defer s . startedLock . Unlock ( )
if s . stopped {
klog . V ( 2 ) . Infof ( "Handler %v was not added to shared informer because it has stopped already" , handler )
return
}
if resyncPeriod > 0 {
if resyncPeriod < minimumResyncPeriod {
klog . Warningf ( "resyncPeriod %d is too small. Changing it to the minimum allowed value of %d" , resyncPeriod , minimumResyncPeriod )
resyncPeriod = minimumResyncPeriod
}
if resyncPeriod < s . resyncCheckPeriod {
if s . started {
klog . Warningf ( "resyncPeriod %d is smaller than resyncCheckPeriod %d and the informer has already started. Changing it to %d" , resyncPeriod , s . resyncCheckPeriod , s . resyncCheckPeriod )
resyncPeriod = s . resyncCheckPeriod
} else {
// if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update
// resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners
// accordingly
s . resyncCheckPeriod = resyncPeriod
s . processor . resyncCheckPeriodChanged ( resyncPeriod )
}
}
}
listener := newProcessListener ( handler , resyncPeriod , determineResyncPeriod ( resyncPeriod , s . resyncCheckPeriod ) , s . clock . Now ( ) , initialBufferSize )
if ! s . started {
s . processor . addListener ( listener )
return
}
// in order to safely join, we have to
// 1. stop sending add/update/delete notifications
// 2. do a list against the store
// 3. send synthetic "Add" events to the new handler
// 4. unblock
s . blockDeltas . Lock ( )
defer s . blockDeltas . Unlock ( )
s . processor . addListener ( listener )
for _ , item := range s . indexer . List ( ) {
listener . add ( addNotification { newObj : item } )
}
}
func ( s * sharedIndexInformer ) HandleDeltas ( obj interface { } ) error {
s . blockDeltas . Lock ( )
defer s . blockDeltas . Unlock ( )
// from oldest to newest
for _ , d := range obj . ( Deltas ) {
switch d . Type {
2020-03-26 21:07:15 +00:00
case Sync , Replaced , Added , Updated :
2019-01-12 04:58:27 +00:00
s . cacheMutationDetector . AddObject ( d . Object )
if old , exists , err := s . indexer . Get ( d . Object ) ; err == nil && exists {
if err := s . indexer . Update ( d . Object ) ; err != nil {
return err
}
2020-03-26 21:07:15 +00:00
isSync := false
switch {
case d . Type == Sync :
// Sync events are only propagated to listeners that requested resync
isSync = true
case d . Type == Replaced :
if accessor , err := meta . Accessor ( d . Object ) ; err == nil {
if oldAccessor , err := meta . Accessor ( old ) ; err == nil {
// Replaced events that didn't change resourceVersion are treated as resync events
// and only propagated to listeners that requested resync
isSync = accessor . GetResourceVersion ( ) == oldAccessor . GetResourceVersion ( )
}
}
}
2019-01-12 04:58:27 +00:00
s . processor . distribute ( updateNotification { oldObj : old , newObj : d . Object } , isSync )
} else {
if err := s . indexer . Add ( d . Object ) ; err != nil {
return err
}
2020-03-26 21:07:15 +00:00
s . processor . distribute ( addNotification { newObj : d . Object } , false )
2019-01-12 04:58:27 +00:00
}
case Deleted :
if err := s . indexer . Delete ( d . Object ) ; err != nil {
return err
}
s . processor . distribute ( deleteNotification { oldObj : d . Object } , false )
}
}
return nil
}
2020-03-26 21:07:15 +00:00
// sharedProcessor has a collection of processorListener and can
// distribute a notification object to its listeners. There are two
// kinds of distribute operations. The sync distributions go to a
// subset of the listeners that (a) is recomputed in the occasional
// calls to shouldResync and (b) every listener is initially put in.
// The non-sync distributions go to every listener.
2019-01-12 04:58:27 +00:00
type sharedProcessor struct {
listenersStarted bool
listenersLock sync . RWMutex
listeners [ ] * processorListener
syncingListeners [ ] * processorListener
clock clock . Clock
wg wait . Group
}
func ( p * sharedProcessor ) addListener ( listener * processorListener ) {
p . listenersLock . Lock ( )
defer p . listenersLock . Unlock ( )
p . addListenerLocked ( listener )
if p . listenersStarted {
p . wg . Start ( listener . run )
p . wg . Start ( listener . pop )
}
}
func ( p * sharedProcessor ) addListenerLocked ( listener * processorListener ) {
p . listeners = append ( p . listeners , listener )
p . syncingListeners = append ( p . syncingListeners , listener )
}
func ( p * sharedProcessor ) distribute ( obj interface { } , sync bool ) {
p . listenersLock . RLock ( )
defer p . listenersLock . RUnlock ( )
if sync {
for _ , listener := range p . syncingListeners {
listener . add ( obj )
}
} else {
for _ , listener := range p . listeners {
listener . add ( obj )
}
}
}
func ( p * sharedProcessor ) run ( stopCh <- chan struct { } ) {
func ( ) {
p . listenersLock . RLock ( )
defer p . listenersLock . RUnlock ( )
for _ , listener := range p . listeners {
p . wg . Start ( listener . run )
p . wg . Start ( listener . pop )
}
p . listenersStarted = true
} ( )
<- stopCh
p . listenersLock . RLock ( )
defer p . listenersLock . RUnlock ( )
for _ , listener := range p . listeners {
close ( listener . addCh ) // Tell .pop() to stop. .pop() will tell .run() to stop
}
p . wg . Wait ( ) // Wait for all .pop() and .run() to stop
}
// shouldResync queries every listener to determine if any of them need a resync, based on each
// listener's resyncPeriod.
func ( p * sharedProcessor ) shouldResync ( ) bool {
p . listenersLock . Lock ( )
defer p . listenersLock . Unlock ( )
p . syncingListeners = [ ] * processorListener { }
resyncNeeded := false
now := p . clock . Now ( )
for _ , listener := range p . listeners {
// need to loop through all the listeners to see if they need to resync so we can prepare any
// listeners that are going to be resyncing.
if listener . shouldResync ( now ) {
resyncNeeded = true
p . syncingListeners = append ( p . syncingListeners , listener )
listener . determineNextResync ( now )
}
}
return resyncNeeded
}
func ( p * sharedProcessor ) resyncCheckPeriodChanged ( resyncCheckPeriod time . Duration ) {
p . listenersLock . RLock ( )
defer p . listenersLock . RUnlock ( )
for _ , listener := range p . listeners {
resyncPeriod := determineResyncPeriod ( listener . requestedResyncPeriod , resyncCheckPeriod )
listener . setResyncPeriod ( resyncPeriod )
}
}
2020-03-26 21:07:15 +00:00
// processorListener relays notifications from a sharedProcessor to
// one ResourceEventHandler --- using two goroutines, two unbuffered
// channels, and an unbounded ring buffer. The `add(notification)`
// function sends the given notification to `addCh`. One goroutine
// runs `pop()`, which pumps notifications from `addCh` to `nextCh`
// using storage in the ring buffer while `nextCh` is not keeping up.
// Another goroutine runs `run()`, which receives notifications from
// `nextCh` and synchronously invokes the appropriate handler method.
//
// processorListener also keeps track of the adjusted requested resync
// period of the listener.
2019-01-12 04:58:27 +00:00
type processorListener struct {
nextCh chan interface { }
addCh chan interface { }
handler ResourceEventHandler
// pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed.
// There is one per listener, but a failing/stalled listener will have infinite pendingNotifications
// added until we OOM.
// TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but
// we should try to do something better.
pendingNotifications buffer . RingGrowing
2020-03-26 21:07:15 +00:00
// requestedResyncPeriod is how frequently the listener wants a
// full resync from the shared informer, but modified by two
// adjustments. One is imposing a lower bound,
// `minimumResyncPeriod`. The other is another lower bound, the
// sharedProcessor's `resyncCheckPeriod`, that is imposed (a) only
// in AddEventHandlerWithResyncPeriod invocations made after the
// sharedProcessor starts and (b) only if the informer does
// resyncs at all.
2019-01-12 04:58:27 +00:00
requestedResyncPeriod time . Duration
2020-03-26 21:07:15 +00:00
// resyncPeriod is the threshold that will be used in the logic
// for this listener. This value differs from
// requestedResyncPeriod only when the sharedIndexInformer does
// not do resyncs, in which case the value here is zero. The
// actual time between resyncs depends on when the
// sharedProcessor's `shouldResync` function is invoked and when
// the sharedIndexInformer processes `Sync` type Delta objects.
2019-01-12 04:58:27 +00:00
resyncPeriod time . Duration
// nextResync is the earliest time the listener should get a full resync
nextResync time . Time
// resyncLock guards access to resyncPeriod and nextResync
resyncLock sync . Mutex
}
func newProcessListener ( handler ResourceEventHandler , requestedResyncPeriod , resyncPeriod time . Duration , now time . Time , bufferSize int ) * processorListener {
ret := & processorListener {
nextCh : make ( chan interface { } ) ,
addCh : make ( chan interface { } ) ,
handler : handler ,
pendingNotifications : * buffer . NewRingGrowing ( bufferSize ) ,
requestedResyncPeriod : requestedResyncPeriod ,
resyncPeriod : resyncPeriod ,
}
ret . determineNextResync ( now )
return ret
}
func ( p * processorListener ) add ( notification interface { } ) {
p . addCh <- notification
}
func ( p * processorListener ) pop ( ) {
defer utilruntime . HandleCrash ( )
defer close ( p . nextCh ) // Tell .run() to stop
var nextCh chan <- interface { }
var notification interface { }
for {
select {
case nextCh <- notification :
// Notification dispatched
var ok bool
notification , ok = p . pendingNotifications . ReadOne ( )
if ! ok { // Nothing to pop
nextCh = nil // Disable this select case
}
case notificationToAdd , ok := <- p . addCh :
if ! ok {
return
}
if notification == nil { // No notification to pop (and pendingNotifications is empty)
// Optimize the case - skip adding to pendingNotifications
notification = notificationToAdd
nextCh = p . nextCh
} else { // There is already a notification waiting to be dispatched
p . pendingNotifications . WriteOne ( notificationToAdd )
}
}
}
}
func ( p * processorListener ) run ( ) {
// this call blocks until the channel is closed. When a panic happens during the notification
// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
// the next notification will be attempted. This is usually better than the alternative of never
// delivering again.
stopCh := make ( chan struct { } )
wait . Until ( func ( ) {
2020-03-26 21:07:15 +00:00
for next := range p . nextCh {
switch notification := next . ( type ) {
case updateNotification :
p . handler . OnUpdate ( notification . oldObj , notification . newObj )
case addNotification :
p . handler . OnAdd ( notification . newObj )
case deleteNotification :
p . handler . OnDelete ( notification . oldObj )
default :
utilruntime . HandleError ( fmt . Errorf ( "unrecognized notification: %T" , next ) )
2019-01-12 04:58:27 +00:00
}
}
2020-03-26 21:07:15 +00:00
// the only way to get here is if the p.nextCh is empty and closed
close ( stopCh )
} , 1 * time . Second , stopCh )
2019-01-12 04:58:27 +00:00
}
// shouldResync deterimines if the listener needs a resync. If the listener's resyncPeriod is 0,
// this always returns false.
func ( p * processorListener ) shouldResync ( now time . Time ) bool {
p . resyncLock . Lock ( )
defer p . resyncLock . Unlock ( )
if p . resyncPeriod == 0 {
return false
}
return now . After ( p . nextResync ) || now . Equal ( p . nextResync )
}
func ( p * processorListener ) determineNextResync ( now time . Time ) {
p . resyncLock . Lock ( )
defer p . resyncLock . Unlock ( )
p . nextResync = now . Add ( p . resyncPeriod )
}
func ( p * processorListener ) setResyncPeriod ( resyncPeriod time . Duration ) {
p . resyncLock . Lock ( )
defer p . resyncLock . Unlock ( )
p . resyncPeriod = resyncPeriod
}