implement AddIndexers for SharedIndexInformer

pull/6/head
mqliang 2016-05-02 12:35:18 +08:00
parent 9011207f18
commit c10f43a2e5
17 changed files with 107 additions and 61 deletions

View File

@ -38,7 +38,6 @@ import (
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/client/restclient"
@ -195,7 +194,7 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string
eventBroadcaster.StartRecordingToSink(cl.Events(""))
scheduler.New(schedulerConfig).Run()
podInformer := informers.CreateSharedIndexPodInformer(clientset, controller.NoResyncPeriodFunc(), cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
podInformer := informers.CreateSharedPodIndexInformer(clientset, controller.NoResyncPeriodFunc())
// ensure the service endpoints are sync'd several times within the window that the integration tests wait
go endpointcontroller.NewEndpointController(podInformer, clientset).

View File

@ -37,7 +37,6 @@ import (
"k8s.io/kubernetes/cmd/kube-controller-manager/app/options"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/leaderelection"
"k8s.io/kubernetes/pkg/client/record"
@ -195,7 +194,7 @@ func Run(s *options.CMServer) error {
}
func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig *restclient.Config, stop <-chan struct{}) error {
podInformer := informers.CreateSharedIndexPodInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pod-informer")), ResyncPeriod(s)(), cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
podInformer := informers.CreateSharedPodIndexInformer(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "pod-informer")), ResyncPeriod(s)())
informers := map[reflect.Type]framework.SharedIndexInformer{}
informers[reflect.TypeOf(&api.Pod{})] = podInformer

View File

@ -32,6 +32,8 @@ type Indexer interface {
ListIndexFuncValues(indexName string) []string
// ByIndex lists object that match on the named indexing function with the exact key
ByIndex(indexName, indexKey string) ([]interface{}, error)
// GetIndexer return the indexers
GetIndexers() Indexers
}
// IndexFunc knows how to provide an indexed value for an object.

View File

@ -91,8 +91,16 @@ func (s storePodsNamespacer) List(selector labels.Selector) (pods api.PodList, e
key := &api.Pod{ObjectMeta: api.ObjectMeta{Namespace: s.namespace}}
items, err := s.indexer.Index(NamespaceIndex, key)
if err != nil {
return api.PodList{}, err
glog.Warningf("can not retrieve list of objects using index : %v", err)
for _, m := range s.indexer.List() {
pod := m.(*api.Pod)
if s.namespace == pod.Namespace && selector.Matches(labels.Set(pod.Labels)) {
list.Items = append(list.Items, *pod)
}
}
return list, err
}
for _, m := range items {
pod := m.(*api.Pod)
if selector.Matches(labels.Set(pod.Labels)) {
@ -200,6 +208,13 @@ func (s storeReplicationControllersNamespacer) List(selector labels.Selector) (c
key := &api.ReplicationController{ObjectMeta: api.ObjectMeta{Namespace: s.namespace}}
items, err := s.indexer.Index(NamespaceIndex, key)
if err != nil {
glog.Warningf("can not retrieve list of objects using index : %v", err)
for _, m := range s.indexer.List() {
rc := *(m.(*api.ReplicationController))
if s.namespace == rc.Namespace && selector.Matches(labels.Set(rc.Labels)) {
controllers = append(controllers, rc)
}
}
return
}
for _, m := range items {

View File

@ -160,6 +160,11 @@ func (c *cache) ListKeys() []string {
return c.cacheStorage.ListKeys()
}
// GetIndexers returns the indexers of cache
func (c *cache) GetIndexers() Indexers {
return c.cacheStorage.GetIndexers()
}
// Index returns a list of items that match on the index function
// Index is thread-safe so long as you treat all items as immutable
func (c *cache) Index(indexName string, obj interface{}) ([]interface{}, error) {

View File

@ -45,6 +45,7 @@ type ThreadSafeStore interface {
Index(indexName string, obj interface{}) ([]interface{}, error)
ListIndexFuncValues(name string) []string
ByIndex(indexName, indexKey string) ([]interface{}, error)
GetIndexers() Indexers
}
// threadSafeMap implements ThreadSafeStore
@ -187,6 +188,10 @@ func (c *threadSafeMap) ListIndexFuncValues(indexName string) []string {
return names
}
func (c *threadSafeMap) GetIndexers() Indexers {
return c.indexers
}
// updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj
// updateIndices must be called from a function that already has a lock on the cache
func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) error {

View File

@ -210,7 +210,7 @@ func NewDaemonSetsController(podInformer framework.SharedIndexInformer, kubeClie
}
func NewDaemonSetsControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, lookupCacheSize int) *DaemonSetsController {
podInformer := informers.CreateSharedIndexPodInformer(kubeClient, resyncPeriod(), cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
podInformer := informers.CreateSharedPodIndexInformer(kubeClient, resyncPeriod())
dsc := NewDaemonSetsController(podInformer, kubeClient, resyncPeriod, lookupCacheSize)
dsc.internalPodInformer = podInformer

View File

@ -104,7 +104,7 @@ func NewEndpointController(podInformer framework.SharedIndexInformer, client *cl
// NewEndpointControllerFromClient returns a new *EndpointController that runs its own informer.
func NewEndpointControllerFromClient(client *clientset.Clientset, resyncPeriod controller.ResyncPeriodFunc) *EndpointController {
podInformer := informers.CreateSharedIndexPodInformer(client, resyncPeriod(), cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
podInformer := informers.CreateSharedPodIndexInformer(client, resyncPeriod())
e := NewEndpointController(podInformer, client)
e.internalPodInformer = podInformer

View File

@ -27,9 +27,9 @@ import (
"k8s.io/kubernetes/pkg/watch"
)
// CreateSharedPodInformer returns a SharedInformer that lists and watches all pods
func CreateSharedPodInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedInformer {
sharedInformer := framework.NewSharedInformer(
// CreateSharedPodInformer returns a SharedIndexInformer that lists and watches all pods
func CreateSharedPodInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer {
sharedInformer := framework.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return client.Core().Pods(api.NamespaceAll).List(options)
@ -38,13 +38,16 @@ func CreateSharedPodInformer(client clientset.Interface, resyncPeriod time.Durat
return client.Core().Pods(api.NamespaceAll).Watch(options)
},
},
&api.Pod{}, resyncPeriod)
&api.Pod{},
resyncPeriod,
cache.Indexers{},
)
return sharedInformer
}
// CreateSharedIndexPodInformer returns a SharedIndexInformer that lists and watches all pods
func CreateSharedIndexPodInformer(client clientset.Interface, resyncPeriod time.Duration, indexers cache.Indexers) framework.SharedIndexInformer {
// CreateSharedPodIndexInformer returns a SharedIndexInformer that lists and watches all pods
func CreateSharedPodIndexInformer(client clientset.Interface, resyncPeriod time.Duration) framework.SharedIndexInformer {
sharedIndexInformer := framework.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
@ -54,7 +57,10 @@ func CreateSharedIndexPodInformer(client clientset.Interface, resyncPeriod time.
return client.Core().Pods(api.NamespaceAll).Watch(options)
},
},
&api.Pod{}, resyncPeriod, indexers)
&api.Pod{},
resyncPeriod,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
return sharedIndexInformer
}

View File

@ -48,6 +48,7 @@ type SharedInformer interface {
type SharedIndexInformer interface {
SharedInformer
// AddIndexers add indexers to the informer before it starts.
AddIndexers(indexers cache.Indexers) error
GetIndexer() cache.Indexer
}
@ -57,48 +58,26 @@ type SharedIndexInformer interface {
// be shared amongst all consumers.
func NewSharedInformer(lw cache.ListerWatcher, objType runtime.Object, resyncPeriod time.Duration) SharedInformer {
sharedInformer := &sharedIndexInformer{
processor: &sharedProcessor{},
indexer: cache.NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{}),
processor: &sharedProcessor{},
indexer: cache.NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{}),
listerWatcher: lw,
objectType: objType,
fullResyncPeriod: resyncPeriod,
}
fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, sharedInformer.indexer)
cfg := &Config{
Queue: fifo,
ListerWatcher: lw,
ObjectType: objType,
FullResyncPeriod: resyncPeriod,
RetryOnError: false,
Process: sharedInformer.HandleDeltas,
}
sharedInformer.controller = New(cfg)
return sharedInformer
}
/// NewSharedIndexInformer creates a new instance for the listwatcher.
// NewSharedIndexInformer creates a new instance for the listwatcher.
// TODO: create a cache/factory of these at a higher level for the list all, watch all of a given resource that can
// be shared amongst all consumers.
func NewSharedIndexInformer(lw cache.ListerWatcher, objType runtime.Object, resyncPeriod time.Duration, indexers cache.Indexers) SharedIndexInformer {
sharedIndexInformer := &sharedIndexInformer{
processor: &sharedProcessor{},
indexer: cache.NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
processor: &sharedProcessor{},
indexer: cache.NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
listerWatcher: lw,
objectType: objType,
fullResyncPeriod: resyncPeriod,
}
fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, sharedIndexInformer.indexer)
cfg := &Config{
Queue: fifo,
ListerWatcher: lw,
ObjectType: objType,
FullResyncPeriod: resyncPeriod,
RetryOnError: false,
Process: sharedIndexInformer.HandleDeltas,
}
sharedIndexInformer.controller = New(cfg)
return sharedIndexInformer
}
@ -108,6 +87,11 @@ type sharedIndexInformer struct {
processor *sharedProcessor
// This block is tracked to handle late initialization of the controller
listerWatcher cache.ListerWatcher
objectType runtime.Object
fullResyncPeriod time.Duration
started bool
startedLock sync.Mutex
}
@ -144,6 +128,19 @@ type deleteNotification struct {
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, nil, s.indexer)
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.fullResyncPeriod,
RetryOnError: false,
Process: s.HandleDeltas,
}
s.controller = New(cfg)
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
@ -172,9 +169,25 @@ func (s *sharedIndexInformer) GetIndexer() cache.Indexer {
return s.indexer
}
// TODO(mqliang): implement this
func (s *sharedIndexInformer) AddIndexers(indexers cache.Indexers) error {
panic("has not implemeted yet")
s.startedLock.Lock()
defer s.startedLock.Unlock()
if s.started {
return fmt.Errorf("informer has already started")
}
oldIndexers := s.indexer.GetIndexers()
for name, indexFunc := range oldIndexers {
if _, exist := indexers[name]; exist {
return fmt.Errorf("there is an index named %s already exist", name)
}
indexers[name] = indexFunc
}
s.indexer = cache.NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers)
return nil
}
func (s *sharedIndexInformer) GetController() ControllerInterface {

View File

@ -77,8 +77,10 @@ func New(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFun
&api.Pod{},
resyncPeriod(),
framework.ResourceEventHandlerFuncs{},
// We don't need to build a index for podStore here
cache.Indexers{},
// We don't need to build a index for podStore here actually, but build one for consistency.
// It will ensure that if people start making use of the podStore in more specific ways,
// they'll get the benefits they expect. It will also reserve the name for future refactorings.
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
return gcc
}

View File

@ -135,7 +135,7 @@ func NewJobController(podInformer framework.SharedIndexInformer, kubeClient clie
}
func NewJobControllerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) *JobController {
podInformer := informers.CreateSharedIndexPodInformer(kubeClient, resyncPeriod(), cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
podInformer := informers.CreateSharedPodIndexInformer(kubeClient, resyncPeriod())
jm := NewJobController(podInformer, kubeClient)
jm.internalPodInformer = podInformer

View File

@ -193,8 +193,10 @@ func NewNodeController(
AddFunc: nc.maybeDeleteTerminatingPod,
UpdateFunc: func(_, obj interface{}) { nc.maybeDeleteTerminatingPod(obj) },
},
// We don't need to build a index for podStore here
cache.Indexers{},
// We don't need to build a index for podStore here actually, but build one for consistency.
// It will ensure that if people start making use of the podStore in more specific ways,
// they'll get the benefits they expect. It will also reserve the name for future refactorings.
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
nc.nodeStore.Store, nc.nodeController = framework.NewInformer(
&cache.ListWatch{

View File

@ -81,7 +81,7 @@ type PetSetController struct {
}
// NewPetSetController creates a new petset controller.
func NewPetSetController(podInformer framework.SharedInformer, kubeClient *client.Client, resyncPeriod time.Duration) *PetSetController {
func NewPetSetController(podInformer framework.SharedIndexInformer, kubeClient *client.Client, resyncPeriod time.Duration) *PetSetController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(kubeClient.Events(""))
@ -105,7 +105,7 @@ func NewPetSetController(podInformer framework.SharedInformer, kubeClient *clien
// lookup petset accounting for deletion tombstones
DeleteFunc: psc.deletePod,
})
psc.podStore.Store = podInformer.GetStore()
psc.podStore.Indexer = podInformer.GetIndexer()
psc.podController = podInformer.GetController()
psc.psStore.Store, psc.psController = framework.NewInformer(

View File

@ -35,7 +35,7 @@ func newFakePetSetController() (*PetSetController, *fakePetClient) {
blockingPetStore: newUnHealthyPetTracker(fpc),
podStoreSynced: func() bool { return true },
psStore: cache.StoreToPetSetLister{Store: cache.NewStore(controller.KeyFunc)},
podStore: cache.StoreToPodLister{Store: cache.NewStore(controller.KeyFunc)},
podStore: cache.StoreToPodLister{Indexer: cache.NewIndexer(controller.KeyFunc, cache.Indexers{})},
newSyncer: func(blockingPet *pcb) *petSyncer {
return &petSyncer{fpc, blockingPet}
},

View File

@ -200,7 +200,7 @@ func NewReplicationManager(podInformer framework.SharedIndexInformer, kubeClient
// NewReplicationManagerFromClient creates a new ReplicationManager that runs its own informer.
func NewReplicationManagerFromClient(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int, lookupCacheSize int) *ReplicationManager {
podInformer := informers.CreateSharedIndexPodInformer(kubeClient, resyncPeriod(), cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
podInformer := informers.CreateSharedPodIndexInformer(kubeClient, resyncPeriod())
rm := NewReplicationManager(podInformer, kubeClient, resyncPeriod, burstReplicas, lookupCacheSize)
rm.internalPodInformer = podInformer

View File

@ -231,7 +231,7 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) {
activePods := 5
rc := newReplicationController(activePods)
manager.rcStore.Indexer.Add(rc)
rc.Status = api.ReplicationControllerStatus{Replicas: activePods}
rc.Status = api.ReplicationControllerStatus{Replicas: int32(activePods)}
newPodList(manager.podStore.Indexer, activePods, api.PodRunning, rc, "pod")
fakePodControl := controller.FakePodControl{}
@ -1015,7 +1015,6 @@ func TestDeletionTimestamp(t *testing.T) {
}
}
/*
func BenchmarkGetPodControllerMultiNS(b *testing.B) {
client := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewReplicationManagerFromClient(client, controller.NoResyncPeriodFunc, BurstReplicas, 0)
@ -1101,4 +1100,3 @@ func BenchmarkGetPodControllerSingleNS(b *testing.B) {
}
}
}
*/