diff --git a/staging/src/k8s.io/client-go/tools/cache/controller.go b/staging/src/k8s.io/client-go/tools/cache/controller.go index fc756eb8f3..2159014d60 100644 --- a/staging/src/k8s.io/client-go/tools/cache/controller.go +++ b/staging/src/k8s.io/client-go/tools/cache/controller.go @@ -23,6 +23,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/util/clock" ) // Config contains all the settings for a Controller. @@ -50,6 +51,11 @@ type Config struct { // queue. FullResyncPeriod time.Duration + // ShouldResync, if specified, is invoked when the controller's reflector determines the next + // periodic sync should occur. If this returns true, it means the reflector should proceed with + // the resync. + ShouldResync ShouldResyncFunc + // If true, when Process() returns an error, re-enqueue the object. // TODO: add interface to let you inject a delay/backoff or drop // the object completely if desired. Pass the object in @@ -57,6 +63,11 @@ type Config struct { RetryOnError bool } +// ShouldResyncFunc is a type of function that indicates if a reflector should perform a +// resync or not. It can be used by a shared informer to support multiple event handlers with custom +// resync periods. +type ShouldResyncFunc func() bool + // ProcessFunc processes a single object. type ProcessFunc func(obj interface{}) error @@ -65,6 +76,7 @@ type controller struct { config Config reflector *Reflector reflectorMutex sync.RWMutex + clock clock.Clock } type Controller interface { @@ -77,6 +89,7 @@ type Controller interface { func New(c *Config) Controller { ctlr := &controller{ config: *c, + clock: &clock.RealClock{}, } return ctlr } @@ -92,6 +105,8 @@ func (c *controller) Run(stopCh <-chan struct{}) { c.config.Queue, c.config.FullResyncPeriod, ) + r.ShouldResync = c.config.ShouldResync + r.clock = c.clock c.reflectorMutex.Lock() c.reflector = r diff --git a/staging/src/k8s.io/client-go/tools/cache/processor_listener_test.go b/staging/src/k8s.io/client-go/tools/cache/processor_listener_test.go index c26ab023e1..dbffcc53f7 100644 --- a/staging/src/k8s.io/client-go/tools/cache/processor_listener_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/processor_listener_test.go @@ -26,7 +26,7 @@ import ( // TestPopReleaseLock tests that when processor listener blocks on chan, // it should release the lock for pendingNotifications. func TestPopReleaseLock(t *testing.T) { - pl := newProcessListener(nil) + pl := newProcessListener(nil, 0, 0, time.Now()) stopCh := make(chan struct{}) defer close(stopCh) // make pop() block on nextCh: waiting for receiver to get notification. diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector.go b/staging/src/k8s.io/client-go/tools/cache/reflector.go index b46ade6ec8..b43f43a5e7 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector.go @@ -41,6 +41,7 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/util/clock" ) // Reflector watches a specified resource and causes all changes to be reflected in the given store. @@ -58,8 +59,9 @@ type Reflector struct { // the beginning of the next one. period time.Duration resyncPeriod time.Duration - // now() returns current time - exposed for testing purposes - now func() time.Time + ShouldResync func() bool + // clock allows tests to manipulate time + clock clock.Clock // lastSyncResourceVersion is the resource version token last // observed when doing a sync with the underlying store // it is thread safe, but not synchronized with the underlying store @@ -103,7 +105,7 @@ func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, expectedType: reflect.TypeOf(expectedType), period: time.Second, resyncPeriod: resyncPeriod, - now: time.Now, + clock: &clock.RealClock{}, } return r } @@ -223,8 +225,8 @@ func (r *Reflector) resyncChan() (<-chan time.Time, func() bool) { // always fail so we end up listing frequently. Then, if we don't // manually stop the timer, we could end up with many timers active // concurrently. - t := time.NewTimer(r.resyncPeriod) - return t.C, t.Stop + t := r.clock.NewTimer(r.resyncPeriod) + return t.C(), t.Stop } // ListAndWatch first lists all items and get the resource version at the moment of call, @@ -270,10 +272,12 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { case <-cancelCh: return } - glog.V(4).Infof("%s: forcing resync", r.name) - if err := r.store.Resync(); err != nil { - resyncerrc <- err - return + if r.ShouldResync == nil || r.ShouldResync() { + glog.V(4).Infof("%s: forcing resync", r.name) + if err := r.store.Resync(); err != nil { + resyncerrc <- err + return + } } cleanup() resyncCh, cleanup = r.resyncChan() @@ -334,7 +338,7 @@ func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) err // watchHandler watches w and keeps *resourceVersion up to date. func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error { - start := time.Now() + start := r.clock.Now() eventCount := 0 // Stopping the watcher should be idempotent and if we return from this function there's no way @@ -393,7 +397,7 @@ loop: } } - watchDuration := time.Now().Sub(start) + watchDuration := r.clock.Now().Sub(start) if watchDuration < 1*time.Second && eventCount == 0 { glog.V(4).Infof("%s: Unexpected watch close - watch lasted less than a second and no items received", r.name) return errors.New("very short watch") diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go index 840211effa..1349f335d0 100644 --- a/staging/src/k8s.io/client-go/tools/cache/shared_informer.go +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer.go @@ -24,28 +24,40 @@ import ( "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/util/clock" "github.com/golang/glog" ) -// if you use this, there is one behavior change compared to a standard Informer. -// When you receive a notification, the cache will be AT LEAST as fresh as the -// notification, but it MAY be more fresh. You should NOT depend on the contents -// of the cache exactly matching the notification you've received in handler -// functions. If there was a create, followed by a delete, the cache may NOT -// have your item. This has advantages over the broadcaster since it allows us -// to share a common cache across many controllers. Extending the broadcaster -// would have required us keep duplicate caches for each watch. +// SharedInformer has a shared data cache and is capable of distributing notifications for changes +// to the cache to multiple listeners who registered via AddEventHandler. If you use this, there is +// one behavior change compared to a standard Informer. When you receive a notification, the cache +// will be AT LEAST as fresh as the notification, but it MAY be more fresh. You should NOT depend +// on the contents of the cache exactly matching the notification you've received in handler +// functions. If there was a create, followed by a delete, the cache may NOT have your item. This +// has advantages over the broadcaster since it allows us to share a common cache across many +// controllers. Extending the broadcaster would have required us keep duplicate caches for each +// watch. type SharedInformer interface { - // events to a single handler are delivered sequentially, but there is no coordination between different handlers - // You may NOT add a handler *after* the SharedInformer is running. That will result in an error being returned. - // TODO we should try to remove this restriction eventually. - AddEventHandler(handler ResourceEventHandler) error + // AddEventHandler adds an event handler to the shared informer using the shared informer's resync + // period. Events to a single handler are delivered sequentially, but there is no coordination + // between different handlers. + AddEventHandler(handler ResourceEventHandler) + // AddEventHandlerWithResyncPeriod adds an event handler to the shared informer using the + // specified resync period. Events to a single handler are delivered sequentially, but there is + // no coordination between different handlers. + AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) + // GetStore returns the Store. GetStore() Store // GetController gives back a synthetic interface that "votes" to start the informer GetController() Controller + // Run starts the shared informer, which will be stopped when stopCh is closed. Run(stopCh <-chan struct{}) + // HasSynced returns true if the shared informer's store has synced. HasSynced() bool + // LastSyncResourceVersion is the resource version observed when last synced with the underlying + // store. The value returned is not synchronized with access to the underlying store and is not + // thread-safe. LastSyncResourceVersion() string } @@ -57,23 +69,22 @@ type SharedIndexInformer interface { } // NewSharedInformer creates a new instance for the listwatcher. -// TODO: create a cache/factory of these at a higher level for the list all, watch all of a given resource that can -// be shared amongst all consumers. func NewSharedInformer(lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration) SharedInformer { return NewSharedIndexInformer(lw, objType, resyncPeriod, Indexers{}) } // NewSharedIndexInformer creates a new instance for the listwatcher. -// TODO: create a cache/factory of these at a higher level for the list all, watch all of a given resource that can -// be shared amongst all consumers. -func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration, indexers Indexers) SharedIndexInformer { +func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer { + realClock := &clock.RealClock{} sharedIndexInformer := &sharedIndexInformer{ - processor: &sharedProcessor{}, - indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), - listerWatcher: lw, - objectType: objType, - fullResyncPeriod: resyncPeriod, - cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", objType)), + processor: &sharedProcessor{clock: realClock}, + indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), + listerWatcher: lw, + objectType: objType, + resyncCheckPeriod: defaultEventHandlerResyncPeriod, + defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod, + cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", objType)), + clock: realClock, } return sharedIndexInformer } @@ -114,9 +125,18 @@ type sharedIndexInformer struct { cacheMutationDetector CacheMutationDetector // This block is tracked to handle late initialization of the controller - listerWatcher ListerWatcher - objectType runtime.Object - fullResyncPeriod time.Duration + listerWatcher ListerWatcher + objectType runtime.Object + + // resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call + // shouldResync to check if any of our listeners need a resync. + resyncCheckPeriod time.Duration + // defaultEventHandlerResyncPeriod is the default resync period for any handlers added via + // AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default + // value). + defaultEventHandlerResyncPeriod time.Duration + // clock allows for testability + clock clock.Clock started bool startedLock sync.Mutex @@ -171,8 +191,9 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { Queue: fifo, ListerWatcher: s.listerWatcher, ObjectType: s.objectType, - FullResyncPeriod: s.fullResyncPeriod, + FullResyncPeriod: s.resyncCheckPeriod, RetryOnError: false, + ShouldResync: s.processor.shouldResync, Process: s.HandleDeltas, } @@ -182,6 +203,7 @@ func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { defer s.startedLock.Unlock() s.controller = New(cfg) + s.controller.(*controller).clock = s.clock s.started = true }() @@ -240,14 +262,56 @@ func (s *sharedIndexInformer) GetController() Controller { return &dummyController{informer: s} } -func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) error { +func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) { + s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod) +} + +func determineResyncPeriod(desired, check time.Duration) time.Duration { + if desired == 0 { + return desired + } + if check == 0 { + glog.Warningf("The specified resyncPeriod %v is invalid because this shared informer doesn't support resyncing", desired) + return 0 + } + if desired < check { + glog.Warningf("The specified resyncPeriod %v is being increased to the minimum resyncCheckPeriod %v", desired, check) + return check + } + return desired +} + +const minimumResyncPeriod = 1 * time.Second + +func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) { s.startedLock.Lock() defer s.startedLock.Unlock() + if resyncPeriod > 0 { + if resyncPeriod < minimumResyncPeriod { + glog.Warningf("resyncPeriod %d is too small. Changing it to the minimum allowed value of %d", resyncPeriod, minimumResyncPeriod) + resyncPeriod = minimumResyncPeriod + } + + if resyncPeriod < s.resyncCheckPeriod { + if s.started { + glog.Warningf("resyncPeriod %d is smaller than resyncCheckPeriod %d and the informer has already started. Changing it to %d", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod) + resyncPeriod = s.resyncCheckPeriod + } else { + // if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update + // resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners + // accordingly + s.resyncCheckPeriod = resyncPeriod + s.processor.resyncCheckPeriodChanged(resyncPeriod) + } + } + } + + listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now()) + if !s.started { - listener := newProcessListener(handler) - s.processor.listeners = append(s.processor.listeners, listener) - return nil + s.processor.addListener(listener) + return } // in order to safely join, we have to @@ -258,8 +322,7 @@ func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) erro s.blockDeltas.Lock() defer s.blockDeltas.Unlock() - listener := newProcessListener(handler) - s.processor.listeners = append(s.processor.listeners, listener) + s.processor.addListener(listener) go listener.run(s.stopCh) go listener.pop(s.stopCh) @@ -268,8 +331,6 @@ func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) erro for i := range items { listener.add(addNotification{newObj: items[i]}) } - - return nil } func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { @@ -280,45 +341,101 @@ func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { for _, d := range obj.(Deltas) { switch d.Type { case Sync, Added, Updated: + isSync := d.Type == Sync s.cacheMutationDetector.AddObject(d.Object) if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { if err := s.indexer.Update(d.Object); err != nil { return err } - s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}) + s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) } else { if err := s.indexer.Add(d.Object); err != nil { return err } - s.processor.distribute(addNotification{newObj: d.Object}) + s.processor.distribute(addNotification{newObj: d.Object}, isSync) } case Deleted: if err := s.indexer.Delete(d.Object); err != nil { return err } - s.processor.distribute(deleteNotification{oldObj: d.Object}) + s.processor.distribute(deleteNotification{oldObj: d.Object}, false) } } return nil } type sharedProcessor struct { - listeners []*processorListener + listenersLock sync.RWMutex + listeners []*processorListener + syncingListeners []*processorListener + clock clock.Clock } -func (p *sharedProcessor) distribute(obj interface{}) { - for _, listener := range p.listeners { - listener.add(obj) +func (p *sharedProcessor) addListener(listener *processorListener) { + p.listenersLock.Lock() + defer p.listenersLock.Unlock() + + p.listeners = append(p.listeners, listener) + p.syncingListeners = append(p.syncingListeners, listener) +} + +func (p *sharedProcessor) distribute(obj interface{}, sync bool) { + p.listenersLock.RLock() + defer p.listenersLock.RUnlock() + + if sync { + for _, listener := range p.syncingListeners { + listener.add(obj) + } + } else { + for _, listener := range p.listeners { + listener.add(obj) + } } } func (p *sharedProcessor) run(stopCh <-chan struct{}) { + p.listenersLock.RLock() + defer p.listenersLock.RUnlock() + for _, listener := range p.listeners { go listener.run(stopCh) go listener.pop(stopCh) } } +// shouldResync queries every listener to determine if any of them need a resync, based on each +// listener's resyncPeriod. +func (p *sharedProcessor) shouldResync() bool { + p.listenersLock.Lock() + defer p.listenersLock.Unlock() + + p.syncingListeners = []*processorListener{} + + resyncNeeded := false + now := p.clock.Now() + for _, listener := range p.listeners { + // need to loop through all the listeners to see if they need to resync so we can prepare any + // listeners that are going to be resyncing. + if listener.shouldResync(now) { + resyncNeeded = true + p.syncingListeners = append(p.syncingListeners, listener) + listener.determineNextResync(now) + } + } + return resyncNeeded +} + +func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Duration) { + p.listenersLock.RLock() + defer p.listenersLock.RUnlock() + + for _, listener := range p.listeners { + resyncPeriod := determineResyncPeriod(listener.requestedResyncPeriod, resyncCheckPeriod) + listener.setResyncPeriod(resyncPeriod) + } +} + type processorListener struct { // lock/cond protects access to 'pendingNotifications'. lock sync.RWMutex @@ -334,16 +451,32 @@ type processorListener struct { nextCh chan interface{} handler ResourceEventHandler + + // requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer + requestedResyncPeriod time.Duration + // resyncPeriod is how frequently the listener wants a full resync from the shared informer. This + // value may differ from requestedResyncPeriod if the shared informer adjusts it to align with the + // informer's overall resync check period. + resyncPeriod time.Duration + // nextResync is the earliest time the listener should get a full resync + nextResync time.Time + // resyncLock guards access to resyncPeriod and nextResync + resyncLock sync.Mutex } -func newProcessListener(handler ResourceEventHandler) *processorListener { +func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time) *processorListener { ret := &processorListener{ - pendingNotifications: []interface{}{}, - nextCh: make(chan interface{}), - handler: handler, + pendingNotifications: []interface{}{}, + nextCh: make(chan interface{}), + handler: handler, + requestedResyncPeriod: requestedResyncPeriod, + resyncPeriod: resyncPeriod, } ret.cond.L = &ret.lock + + ret.determineNextResync(now) + return ret } @@ -419,3 +552,30 @@ func (p *processorListener) run(stopCh <-chan struct{}) { } } } + +// shouldResync deterimines if the listener needs a resync. If the listener's resyncPeriod is 0, +// this always returns false. +func (p *processorListener) shouldResync(now time.Time) bool { + p.resyncLock.Lock() + defer p.resyncLock.Unlock() + + if p.resyncPeriod == 0 { + return false + } + + return now.After(p.nextResync) || now.Equal(p.nextResync) +} + +func (p *processorListener) determineNextResync(now time.Time) { + p.resyncLock.Lock() + defer p.resyncLock.Unlock() + + p.nextResync = now.Add(p.resyncPeriod) +} + +func (p *processorListener) setResyncPeriod(resyncPeriod time.Duration) { + p.resyncLock.Lock() + defer p.resyncLock.Unlock() + + p.resyncPeriod = resyncPeriod +} diff --git a/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go new file mode 100644 index 0000000000..1115747988 --- /dev/null +++ b/staging/src/k8s.io/client-go/tools/cache/shared_informer_test.go @@ -0,0 +1,253 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "fmt" + "sync" + "testing" + "time" + + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/pkg/api" + fcache "k8s.io/client-go/tools/cache/testing" + "k8s.io/client-go/util/clock" +) + +type testListener struct { + lock sync.RWMutex + resyncPeriod time.Duration + expectedItemNames sets.String + receivedItemNames []string + name string +} + +func newTestListener(name string, resyncPeriod time.Duration, expected ...string) *testListener { + l := &testListener{ + resyncPeriod: resyncPeriod, + expectedItemNames: sets.NewString(expected...), + name: name, + } + return l +} + +func (l *testListener) OnAdd(obj interface{}) { + l.handle(obj) +} + +func (l *testListener) OnUpdate(old, new interface{}) { + l.handle(new) +} + +func (l *testListener) OnDelete(obj interface{}) { +} + +func (l *testListener) handle(obj interface{}) { + key, _ := MetaNamespaceKeyFunc(obj) + fmt.Printf("%s: handle: %v\n", l.name, key) + l.lock.Lock() + defer l.lock.Unlock() + + objectMeta, _ := meta.Accessor(obj) + l.receivedItemNames = append(l.receivedItemNames, objectMeta.GetName()) +} + +func (l *testListener) ok() bool { + fmt.Println("polling") + err := wait.PollImmediate(100*time.Millisecond, 2*time.Second, func() (bool, error) { + if l.satisfiedExpectations() { + return true, nil + } + return false, nil + }) + if err != nil { + return false + } + + // wait just a bit to allow any unexpected stragglers to come in + fmt.Println("sleeping") + time.Sleep(1 * time.Second) + fmt.Println("final check") + return l.satisfiedExpectations() +} + +func (l *testListener) satisfiedExpectations() bool { + l.lock.RLock() + defer l.lock.RUnlock() + + return len(l.receivedItemNames) == l.expectedItemNames.Len() && sets.NewString(l.receivedItemNames...).Equal(l.expectedItemNames) +} + +func TestListenerResyncPeriods(t *testing.T) { + // source simulates an apiserver object endpoint. + source := fcache.NewFakeControllerSource() + source.Add(&api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) + source.Add(&api.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2"}}) + + // create the shared informer and resync every 1s + informer := NewSharedInformer(source, &api.Pod{}, 1*time.Second).(*sharedIndexInformer) + + clock := clock.NewFakeClock(time.Now()) + informer.clock = clock + informer.processor.clock = clock + + // listener 1, never resync + listener1 := newTestListener("listener1", 0, "pod1", "pod2") + informer.AddEventHandlerWithResyncPeriod(listener1, listener1.resyncPeriod) + + // listener 2, resync every 2s + listener2 := newTestListener("listener2", 2*time.Second, "pod1", "pod2") + informer.AddEventHandlerWithResyncPeriod(listener2, listener2.resyncPeriod) + + // listener 3, resync every 3s + listener3 := newTestListener("listener3", 3*time.Second, "pod1", "pod2") + informer.AddEventHandlerWithResyncPeriod(listener3, listener3.resyncPeriod) + listeners := []*testListener{listener1, listener2, listener3} + + stop := make(chan struct{}) + defer close(stop) + + go informer.Run(stop) + + // ensure all listeners got the initial List + for _, listener := range listeners { + if !listener.ok() { + t.Errorf("%s: expected %v, got %v", listener.name, listener.expectedItemNames, listener.receivedItemNames) + } + } + + // reset + for _, listener := range listeners { + listener.receivedItemNames = []string{} + } + + // advance so listener2 gets a resync + clock.Step(2 * time.Second) + + // make sure listener2 got the resync + if !listener2.ok() { + t.Errorf("%s: expected %v, got %v", listener2.name, listener2.expectedItemNames, listener2.receivedItemNames) + } + + // wait a bit to give errant items a chance to go to 1 and 3 + time.Sleep(1 * time.Second) + + // make sure listeners 1 and 3 got nothing + if len(listener1.receivedItemNames) != 0 { + t.Errorf("listener1: should not have resynced (got %d)", len(listener1.receivedItemNames)) + } + if len(listener3.receivedItemNames) != 0 { + t.Errorf("listener3: should not have resynced (got %d)", len(listener3.receivedItemNames)) + } + + // reset + for _, listener := range listeners { + listener.receivedItemNames = []string{} + } + + // advance so listener3 gets a resync + clock.Step(1 * time.Second) + + // make sure listener3 got the resync + if !listener3.ok() { + t.Errorf("%s: expected %v, got %v", listener3.name, listener3.expectedItemNames, listener3.receivedItemNames) + } + + // wait a bit to give errant items a chance to go to 1 and 2 + time.Sleep(1 * time.Second) + + // make sure listeners 1 and 2 got nothing + if len(listener1.receivedItemNames) != 0 { + t.Errorf("listener1: should not have resynced (got %d)", len(listener1.receivedItemNames)) + } + if len(listener2.receivedItemNames) != 0 { + t.Errorf("listener2: should not have resynced (got %d)", len(listener2.receivedItemNames)) + } +} + +func TestResyncCheckPeriod(t *testing.T) { + // source simulates an apiserver object endpoint. + source := fcache.NewFakeControllerSource() + + // create the shared informer and resync every 12 hours + informer := NewSharedInformer(source, &api.Pod{}, 12*time.Hour).(*sharedIndexInformer) + + clock := clock.NewFakeClock(time.Now()) + informer.clock = clock + informer.processor.clock = clock + + // listener 1, never resync + listener1 := newTestListener("listener1", 0) + informer.AddEventHandlerWithResyncPeriod(listener1, listener1.resyncPeriod) + if e, a := 12*time.Hour, informer.resyncCheckPeriod; e != a { + t.Errorf("expected %d, got %d", e, a) + } + if e, a := time.Duration(0), informer.processor.listeners[0].resyncPeriod; e != a { + t.Errorf("expected %d, got %d", e, a) + } + + // listener 2, resync every minute + listener2 := newTestListener("listener2", 1*time.Minute) + informer.AddEventHandlerWithResyncPeriod(listener2, listener2.resyncPeriod) + if e, a := 1*time.Minute, informer.resyncCheckPeriod; e != a { + t.Errorf("expected %d, got %d", e, a) + } + if e, a := time.Duration(0), informer.processor.listeners[0].resyncPeriod; e != a { + t.Errorf("expected %d, got %d", e, a) + } + if e, a := 1*time.Minute, informer.processor.listeners[1].resyncPeriod; e != a { + t.Errorf("expected %d, got %d", e, a) + } + + // listener 3, resync every 55 seconds + listener3 := newTestListener("listener3", 55*time.Second) + informer.AddEventHandlerWithResyncPeriod(listener3, listener3.resyncPeriod) + if e, a := 55*time.Second, informer.resyncCheckPeriod; e != a { + t.Errorf("expected %d, got %d", e, a) + } + if e, a := time.Duration(0), informer.processor.listeners[0].resyncPeriod; e != a { + t.Errorf("expected %d, got %d", e, a) + } + if e, a := 1*time.Minute, informer.processor.listeners[1].resyncPeriod; e != a { + t.Errorf("expected %d, got %d", e, a) + } + if e, a := 55*time.Second, informer.processor.listeners[2].resyncPeriod; e != a { + t.Errorf("expected %d, got %d", e, a) + } + + // listener 4, resync every 5 seconds + listener4 := newTestListener("listener4", 5*time.Second) + informer.AddEventHandlerWithResyncPeriod(listener4, listener4.resyncPeriod) + if e, a := 5*time.Second, informer.resyncCheckPeriod; e != a { + t.Errorf("expected %d, got %d", e, a) + } + if e, a := time.Duration(0), informer.processor.listeners[0].resyncPeriod; e != a { + t.Errorf("expected %d, got %d", e, a) + } + if e, a := 1*time.Minute, informer.processor.listeners[1].resyncPeriod; e != a { + t.Errorf("expected %d, got %d", e, a) + } + if e, a := 55*time.Second, informer.processor.listeners[2].resyncPeriod; e != a { + t.Errorf("expected %d, got %d", e, a) + } + if e, a := 5*time.Second, informer.processor.listeners[3].resyncPeriod; e != a { + t.Errorf("expected %d, got %d", e, a) + } +} diff --git a/vendor/BUILD b/vendor/BUILD index a091d671e8..367360faed 100644 --- a/vendor/BUILD +++ b/vendor/BUILD @@ -12403,6 +12403,7 @@ go_test( "k8s.io/client-go/tools/cache/mutation_detector_test.go", "k8s.io/client-go/tools/cache/processor_listener_test.go", "k8s.io/client-go/tools/cache/reflector_test.go", + "k8s.io/client-go/tools/cache/shared_informer_test.go", "k8s.io/client-go/tools/cache/store_test.go", "k8s.io/client-go/tools/cache/undelta_store_test.go", ], @@ -12410,6 +12411,7 @@ go_test( tags = ["automanaged"], deps = [ "//vendor:github.com/google/gofuzz", + "//vendor:k8s.io/apimachinery/pkg/api/meta", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/runtime", "//vendor:k8s.io/apimachinery/pkg/util/sets",