2016-04-14 17:10:57 +00:00
|
|
|
/*
|
2016-06-03 00:25:58 +00:00
|
|
|
Copyright 2015 The Kubernetes Authors.
|
2016-04-14 17:10:57 +00:00
|
|
|
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
you may not use this file except in compliance with the License.
|
|
|
|
You may obtain a copy of the License at
|
|
|
|
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
See the License for the specific language governing permissions and
|
|
|
|
limitations under the License.
|
|
|
|
*/
|
|
|
|
|
|
|
|
package framework
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"k8s.io/kubernetes/pkg/client/cache"
|
|
|
|
"k8s.io/kubernetes/pkg/runtime"
|
|
|
|
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
|
|
|
|
)
|
|
|
|
|
|
|
|
// if you use this, there is one behavior change compared to a standard Informer.
|
|
|
|
// When you receive a notification, the cache will be AT LEAST as fresh as the
|
|
|
|
// notification, but it MAY be more fresh. You should NOT depend on the contents
|
|
|
|
// of the cache exactly matching the notification you've received in handler
|
|
|
|
// functions. If there was a create, followed by a delete, the cache may NOT
|
|
|
|
// have your item. This has advantages over the broadcaster since it allows us
|
|
|
|
// to share a common cache across many controllers. Extending the broadcaster
|
|
|
|
// would have required us keep duplicate caches for each watch.
|
|
|
|
type SharedInformer interface {
|
|
|
|
// events to a single handler are delivered sequentially, but there is no coordination between different handlers
|
|
|
|
// You may NOT add a handler *after* the SharedInformer is running. That will result in an error being returned.
|
|
|
|
// TODO we should try to remove this restriction eventually.
|
|
|
|
AddEventHandler(handler ResourceEventHandler) error
|
|
|
|
GetStore() cache.Store
|
|
|
|
// GetController gives back a synthetic interface that "votes" to start the informer
|
|
|
|
GetController() ControllerInterface
|
|
|
|
Run(stopCh <-chan struct{})
|
|
|
|
HasSynced() bool
|
2016-06-21 17:13:20 +00:00
|
|
|
LastSyncResourceVersion() string
|
2016-04-14 17:10:57 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
type SharedIndexInformer interface {
|
|
|
|
SharedInformer
|
2016-05-02 04:35:18 +00:00
|
|
|
// AddIndexers add indexers to the informer before it starts.
|
2016-04-07 12:15:21 +00:00
|
|
|
AddIndexers(indexers cache.Indexers) error
|
2016-04-14 17:10:57 +00:00
|
|
|
GetIndexer() cache.Indexer
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewSharedInformer creates a new instance for the listwatcher.
|
|
|
|
// TODO: create a cache/factory of these at a higher level for the list all, watch all of a given resource that can
|
|
|
|
// be shared amongst all consumers.
|
|
|
|
func NewSharedInformer(lw cache.ListerWatcher, objType runtime.Object, resyncPeriod time.Duration) SharedInformer {
|
2016-05-02 11:49:46 +00:00
|
|
|
return NewSharedIndexInformer(lw, objType, resyncPeriod, cache.Indexers{})
|
2016-04-14 17:10:57 +00:00
|
|
|
}
|
|
|
|
|
2016-05-02 04:35:18 +00:00
|
|
|
// NewSharedIndexInformer creates a new instance for the listwatcher.
|
2016-04-07 12:15:21 +00:00
|
|
|
// TODO: create a cache/factory of these at a higher level for the list all, watch all of a given resource that can
|
|
|
|
// be shared amongst all consumers.
|
|
|
|
func NewSharedIndexInformer(lw cache.ListerWatcher, objType runtime.Object, resyncPeriod time.Duration, indexers cache.Indexers) SharedIndexInformer {
|
|
|
|
sharedIndexInformer := &sharedIndexInformer{
|
2016-05-02 04:35:18 +00:00
|
|
|
processor: &sharedProcessor{},
|
|
|
|
indexer: cache.NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
|
|
|
|
listerWatcher: lw,
|
|
|
|
objectType: objType,
|
|
|
|
fullResyncPeriod: resyncPeriod,
|
2016-04-07 12:15:21 +00:00
|
|
|
}
|
|
|
|
return sharedIndexInformer
|
|
|
|
}
|
|
|
|
|
|
|
|
type sharedIndexInformer struct {
|
|
|
|
indexer cache.Indexer
|
2016-04-14 17:10:57 +00:00
|
|
|
controller *Controller
|
|
|
|
|
|
|
|
processor *sharedProcessor
|
|
|
|
|
2016-05-02 04:35:18 +00:00
|
|
|
// This block is tracked to handle late initialization of the controller
|
|
|
|
listerWatcher cache.ListerWatcher
|
|
|
|
objectType runtime.Object
|
|
|
|
fullResyncPeriod time.Duration
|
|
|
|
|
2016-04-14 17:10:57 +00:00
|
|
|
started bool
|
|
|
|
startedLock sync.Mutex
|
2016-07-01 20:18:57 +00:00
|
|
|
|
|
|
|
// blockDeltas gives a way to stop all event distribution so that a late event handler
|
|
|
|
// can safely join the shared informer.
|
|
|
|
blockDeltas sync.Mutex
|
2016-04-14 17:10:57 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// dummyController hides the fact that a SharedInformer is different from a dedicated one
|
|
|
|
// where a caller can `Run`. The run method is disonnected in this case, because higher
|
|
|
|
// level logic will decide when to start the SharedInformer and related controller.
|
|
|
|
// Because returning information back is always asynchronous, the legacy callers shouldn't
|
|
|
|
// notice any change in behavior.
|
|
|
|
type dummyController struct {
|
2016-04-07 12:15:21 +00:00
|
|
|
informer *sharedIndexInformer
|
2016-04-14 17:10:57 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (v *dummyController) Run(stopCh <-chan struct{}) {
|
|
|
|
}
|
|
|
|
|
|
|
|
func (v *dummyController) HasSynced() bool {
|
|
|
|
return v.informer.HasSynced()
|
|
|
|
}
|
|
|
|
|
|
|
|
type updateNotification struct {
|
|
|
|
oldObj interface{}
|
|
|
|
newObj interface{}
|
|
|
|
}
|
|
|
|
|
|
|
|
type addNotification struct {
|
|
|
|
newObj interface{}
|
|
|
|
}
|
|
|
|
|
|
|
|
type deleteNotification struct {
|
|
|
|
oldObj interface{}
|
|
|
|
}
|
|
|
|
|
2016-04-07 12:15:21 +00:00
|
|
|
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
|
2016-04-14 17:10:57 +00:00
|
|
|
defer utilruntime.HandleCrash()
|
|
|
|
|
2016-05-02 04:35:18 +00:00
|
|
|
fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, s.indexer)
|
|
|
|
|
|
|
|
cfg := &Config{
|
|
|
|
Queue: fifo,
|
|
|
|
ListerWatcher: s.listerWatcher,
|
|
|
|
ObjectType: s.objectType,
|
|
|
|
FullResyncPeriod: s.fullResyncPeriod,
|
|
|
|
RetryOnError: false,
|
|
|
|
|
|
|
|
Process: s.HandleDeltas,
|
|
|
|
}
|
|
|
|
|
2016-04-14 17:10:57 +00:00
|
|
|
func() {
|
|
|
|
s.startedLock.Lock()
|
|
|
|
defer s.startedLock.Unlock()
|
2016-05-16 13:53:45 +00:00
|
|
|
|
|
|
|
s.controller = New(cfg)
|
2016-04-14 17:10:57 +00:00
|
|
|
s.started = true
|
|
|
|
}()
|
|
|
|
|
|
|
|
s.processor.run(stopCh)
|
|
|
|
s.controller.Run(stopCh)
|
|
|
|
}
|
|
|
|
|
2016-04-07 12:15:21 +00:00
|
|
|
func (s *sharedIndexInformer) isStarted() bool {
|
2016-04-14 17:10:57 +00:00
|
|
|
s.startedLock.Lock()
|
|
|
|
defer s.startedLock.Unlock()
|
|
|
|
return s.started
|
|
|
|
}
|
|
|
|
|
2016-04-07 12:15:21 +00:00
|
|
|
func (s *sharedIndexInformer) HasSynced() bool {
|
2016-05-16 13:53:45 +00:00
|
|
|
s.startedLock.Lock()
|
|
|
|
defer s.startedLock.Unlock()
|
|
|
|
|
|
|
|
if s.controller == nil {
|
|
|
|
return false
|
|
|
|
}
|
2016-04-14 17:10:57 +00:00
|
|
|
return s.controller.HasSynced()
|
|
|
|
}
|
|
|
|
|
2016-06-21 17:13:20 +00:00
|
|
|
func (s *sharedIndexInformer) LastSyncResourceVersion() string {
|
|
|
|
s.startedLock.Lock()
|
|
|
|
defer s.startedLock.Unlock()
|
|
|
|
|
|
|
|
if s.controller == nil {
|
|
|
|
return ""
|
|
|
|
}
|
|
|
|
return s.controller.reflector.LastSyncResourceVersion()
|
|
|
|
}
|
|
|
|
|
2016-04-07 12:15:21 +00:00
|
|
|
func (s *sharedIndexInformer) GetStore() cache.Store {
|
|
|
|
return s.indexer
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *sharedIndexInformer) GetIndexer() cache.Indexer {
|
|
|
|
return s.indexer
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *sharedIndexInformer) AddIndexers(indexers cache.Indexers) error {
|
2016-05-02 04:35:18 +00:00
|
|
|
s.startedLock.Lock()
|
|
|
|
defer s.startedLock.Unlock()
|
|
|
|
|
|
|
|
if s.started {
|
|
|
|
return fmt.Errorf("informer has already started")
|
|
|
|
}
|
|
|
|
|
2016-05-02 11:49:46 +00:00
|
|
|
return s.indexer.AddIndexers(indexers)
|
2016-04-14 17:10:57 +00:00
|
|
|
}
|
|
|
|
|
2016-04-07 12:15:21 +00:00
|
|
|
func (s *sharedIndexInformer) GetController() ControllerInterface {
|
2016-04-14 17:10:57 +00:00
|
|
|
return &dummyController{informer: s}
|
|
|
|
}
|
|
|
|
|
2016-04-07 12:15:21 +00:00
|
|
|
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) error {
|
2016-04-14 17:10:57 +00:00
|
|
|
s.startedLock.Lock()
|
|
|
|
defer s.startedLock.Unlock()
|
|
|
|
|
2016-07-01 20:18:57 +00:00
|
|
|
if !s.started {
|
|
|
|
listener := newProcessListener(handler)
|
|
|
|
s.processor.listeners = append(s.processor.listeners, listener)
|
|
|
|
return nil
|
2016-04-14 17:10:57 +00:00
|
|
|
}
|
|
|
|
|
2016-07-01 20:18:57 +00:00
|
|
|
// in order to safely join, we have to
|
|
|
|
// 1. stop sending add/update/delete notifications
|
|
|
|
// 2. do a list against the store
|
|
|
|
// 3. send synthetic "Add" events to the new handler
|
|
|
|
// 4. unblock
|
|
|
|
s.blockDeltas.Lock()
|
|
|
|
defer s.blockDeltas.Unlock()
|
|
|
|
|
2016-04-14 17:10:57 +00:00
|
|
|
listener := newProcessListener(handler)
|
|
|
|
s.processor.listeners = append(s.processor.listeners, listener)
|
2016-07-01 20:18:57 +00:00
|
|
|
|
|
|
|
items := s.indexer.List()
|
|
|
|
for i := range items {
|
|
|
|
listener.add(addNotification{newObj: items[i]})
|
|
|
|
}
|
|
|
|
|
2016-04-14 17:10:57 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-04-07 12:15:21 +00:00
|
|
|
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
|
2016-07-01 20:18:57 +00:00
|
|
|
s.blockDeltas.Lock()
|
|
|
|
defer s.blockDeltas.Unlock()
|
|
|
|
|
2016-04-14 17:10:57 +00:00
|
|
|
// from oldest to newest
|
|
|
|
for _, d := range obj.(cache.Deltas) {
|
|
|
|
switch d.Type {
|
|
|
|
case cache.Sync, cache.Added, cache.Updated:
|
2016-04-07 12:15:21 +00:00
|
|
|
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
|
|
|
|
if err := s.indexer.Update(d.Object); err != nil {
|
2016-04-14 17:10:57 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object})
|
|
|
|
} else {
|
2016-04-07 12:15:21 +00:00
|
|
|
if err := s.indexer.Add(d.Object); err != nil {
|
2016-04-14 17:10:57 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
s.processor.distribute(addNotification{newObj: d.Object})
|
|
|
|
}
|
|
|
|
case cache.Deleted:
|
2016-04-07 12:15:21 +00:00
|
|
|
if err := s.indexer.Delete(d.Object); err != nil {
|
2016-04-14 17:10:57 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
s.processor.distribute(deleteNotification{oldObj: d.Object})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
type sharedProcessor struct {
|
|
|
|
listeners []*processorListener
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p *sharedProcessor) distribute(obj interface{}) {
|
|
|
|
for _, listener := range p.listeners {
|
|
|
|
listener.add(obj)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
|
|
|
|
for _, listener := range p.listeners {
|
|
|
|
go listener.run(stopCh)
|
|
|
|
go listener.pop(stopCh)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
type processorListener struct {
|
|
|
|
// lock/cond protects access to 'pendingNotifications'.
|
|
|
|
lock sync.RWMutex
|
|
|
|
cond sync.Cond
|
|
|
|
|
|
|
|
// pendingNotifications is an unbounded slice that holds all notifications not yet distributed
|
|
|
|
// there is one per listener, but a failing/stalled listener will have infinite pendingNotifications
|
|
|
|
// added until we OOM.
|
|
|
|
// TODO This is no worse that before, since reflectors were backed by unbounded DeltaFIFOs, but
|
|
|
|
// we should try to do something better
|
|
|
|
pendingNotifications []interface{}
|
|
|
|
|
|
|
|
nextCh chan interface{}
|
|
|
|
|
|
|
|
handler ResourceEventHandler
|
|
|
|
}
|
|
|
|
|
|
|
|
func newProcessListener(handler ResourceEventHandler) *processorListener {
|
|
|
|
ret := &processorListener{
|
|
|
|
pendingNotifications: []interface{}{},
|
|
|
|
nextCh: make(chan interface{}),
|
|
|
|
handler: handler,
|
|
|
|
}
|
|
|
|
|
|
|
|
ret.cond.L = &ret.lock
|
|
|
|
return ret
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p *processorListener) add(notification interface{}) {
|
|
|
|
p.lock.Lock()
|
|
|
|
defer p.lock.Unlock()
|
|
|
|
|
|
|
|
p.pendingNotifications = append(p.pendingNotifications, notification)
|
|
|
|
p.cond.Broadcast()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p *processorListener) pop(stopCh <-chan struct{}) {
|
|
|
|
defer utilruntime.HandleCrash()
|
|
|
|
|
|
|
|
for {
|
2016-06-08 16:20:06 +00:00
|
|
|
blockingGet := func() (interface{}, bool) {
|
|
|
|
p.lock.Lock()
|
|
|
|
defer p.lock.Unlock()
|
|
|
|
|
|
|
|
for len(p.pendingNotifications) == 0 {
|
|
|
|
// check if we're shutdown
|
|
|
|
select {
|
|
|
|
case <-stopCh:
|
|
|
|
return nil, true
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
p.cond.Wait()
|
2016-04-14 17:10:57 +00:00
|
|
|
}
|
|
|
|
|
2016-06-08 16:20:06 +00:00
|
|
|
nt := p.pendingNotifications[0]
|
|
|
|
p.pendingNotifications = p.pendingNotifications[1:]
|
|
|
|
return nt, false
|
|
|
|
}
|
|
|
|
|
|
|
|
notification, stopped := blockingGet()
|
|
|
|
if stopped {
|
|
|
|
return
|
2016-04-14 17:10:57 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
select {
|
|
|
|
case <-stopCh:
|
|
|
|
return
|
|
|
|
case p.nextCh <- notification:
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p *processorListener) run(stopCh <-chan struct{}) {
|
|
|
|
defer utilruntime.HandleCrash()
|
|
|
|
|
|
|
|
for {
|
|
|
|
var next interface{}
|
|
|
|
select {
|
|
|
|
case <-stopCh:
|
|
|
|
func() {
|
|
|
|
p.lock.Lock()
|
|
|
|
defer p.lock.Unlock()
|
|
|
|
p.cond.Broadcast()
|
|
|
|
}()
|
|
|
|
return
|
|
|
|
case next = <-p.nextCh:
|
|
|
|
}
|
|
|
|
|
|
|
|
switch notification := next.(type) {
|
|
|
|
case updateNotification:
|
|
|
|
p.handler.OnUpdate(notification.oldObj, notification.newObj)
|
|
|
|
case addNotification:
|
|
|
|
p.handler.OnAdd(notification.newObj)
|
|
|
|
case deleteNotification:
|
|
|
|
p.handler.OnDelete(notification.oldObj)
|
|
|
|
default:
|
|
|
|
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %#v", next))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|