diff --git a/pkg/client/cache/listers.go b/pkg/client/cache/listers.go index e6e49b9b6b..f55ff5399d 100644 --- a/pkg/client/cache/listers.go +++ b/pkg/client/cache/listers.go @@ -288,25 +288,6 @@ func (s *StoreToPVFetcher) GetPersistentVolumeInfo(id string) (*api.PersistentVo return o.(*api.PersistentVolume), nil } -// Typed wrapper around a store of PersistentVolumeClaims -type StoreToPVCFetcher struct { - Store -} - -// GetPersistentVolumeClaimInfo returns cached data for the PersistentVolumeClaim 'id'. -func (s *StoreToPVCFetcher) GetPersistentVolumeClaimInfo(namespace string, id string) (*api.PersistentVolumeClaim, error) { - o, exists, err := s.Get(&api.PersistentVolumeClaim{ObjectMeta: api.ObjectMeta{Namespace: namespace, Name: id}}) - if err != nil { - return nil, fmt.Errorf("error retrieving PersistentVolumeClaim '%s/%s' from cache: %v", namespace, id, err) - } - - if !exists { - return nil, fmt.Errorf("PersistentVolumeClaim '%s/%s' not found", namespace, id) - } - - return o.(*api.PersistentVolumeClaim), nil -} - // StoreToPetSetLister gives a store List and Exists methods. The store must contain only PetSets. type StoreToPetSetLister struct { Store diff --git a/pkg/client/cache/listers_core.go b/pkg/client/cache/listers_core.go index df8f4281dd..3546c90e0b 100644 --- a/pkg/client/cache/listers_core.go +++ b/pkg/client/cache/listers_core.go @@ -216,6 +216,19 @@ func (s *StoreToLimitRangeLister) List(selector labels.Selector) (ret []*api.Lim return ret, err } +// StoreToPersistentVolumeClaimLister helps list pvcs +type StoreToPersistentVolumeClaimLister struct { + Indexer Indexer +} + +// List returns all persistentvolumeclaims that match the specified selector +func (s *StoreToPersistentVolumeClaimLister) List(selector labels.Selector) (ret []*api.PersistentVolumeClaim, err error) { + err = ListAll(s.Indexer, selector, func(m interface{}) { + ret = append(ret, m.(*api.PersistentVolumeClaim)) + }) + return ret, err +} + func (s *StoreToLimitRangeLister) LimitRanges(namespace string) storeLimitRangesNamespacer { return storeLimitRangesNamespacer{s.Indexer, namespace} } @@ -242,3 +255,31 @@ func (s storeLimitRangesNamespacer) Get(name string) (*api.LimitRange, error) { } return obj.(*api.LimitRange), nil } + +// PersistentVolumeClaims returns all claims in a specified namespace. +func (s *StoreToPersistentVolumeClaimLister) PersistentVolumeClaims(namespace string) storePersistentVolumeClaimsNamespacer { + return storePersistentVolumeClaimsNamespacer{Indexer: s.Indexer, namespace: namespace} +} + +type storePersistentVolumeClaimsNamespacer struct { + Indexer Indexer + namespace string +} + +func (s storePersistentVolumeClaimsNamespacer) List(selector labels.Selector) (ret []*api.PersistentVolumeClaim, err error) { + err = ListAllByNamespace(s.Indexer, s.namespace, selector, func(m interface{}) { + ret = append(ret, m.(*api.PersistentVolumeClaim)) + }) + return ret, err +} + +func (s storePersistentVolumeClaimsNamespacer) Get(name string) (*api.PersistentVolumeClaim, error) { + obj, exists, err := s.Indexer.GetByKey(s.namespace + "/" + name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(api.Resource("persistentvolumeclaims"), name) + } + return obj.(*api.PersistentVolumeClaim), nil +} diff --git a/pkg/controller/informers/core.go b/pkg/controller/informers/core.go index 2f5687fc2f..62ff70365e 100644 --- a/pkg/controller/informers/core.go +++ b/pkg/controller/informers/core.go @@ -139,7 +139,7 @@ func (f *nodeInformer) Lister() *cache.StoreToNodeLister { // Interface provides constructor for informer and lister for persistent volume claims type PVCInformer interface { Informer() cache.SharedIndexInformer - Lister() *cache.StoreToPVCFetcher + Lister() *cache.StoreToPersistentVolumeClaimLister } type pvcInformer struct { @@ -164,9 +164,9 @@ func (f *pvcInformer) Informer() cache.SharedIndexInformer { } // Lister returns lister for pvcInformer -func (f *pvcInformer) Lister() *cache.StoreToPVCFetcher { +func (f *pvcInformer) Lister() *cache.StoreToPersistentVolumeClaimLister { informer := f.Informer() - return &cache.StoreToPVCFetcher{Store: informer.GetStore()} + return &cache.StoreToPersistentVolumeClaimLister{Indexer: informer.GetIndexer()} } //***************************************************************************** @@ -291,7 +291,8 @@ func NewPVCInformer(client clientset.Interface, resyncPeriod time.Duration) cach }, &api.PersistentVolumeClaim{}, resyncPeriod, - cache.Indexers{}) + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + ) return sharedIndexInformer } diff --git a/plugin/pkg/scheduler/algorithm/predicates/predicates.go b/plugin/pkg/scheduler/algorithm/predicates/predicates.go index 53ed83e1d3..f2fc77b696 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/predicates.go +++ b/plugin/pkg/scheduler/algorithm/predicates/predicates.go @@ -45,7 +45,17 @@ type PersistentVolumeInfo interface { } type PersistentVolumeClaimInfo interface { - GetPersistentVolumeClaimInfo(namespace string, pvcID string) (*api.PersistentVolumeClaim, error) + GetPersistentVolumeClaimInfo(namespace string, name string) (*api.PersistentVolumeClaim, error) +} + +// CachedPersistentVolumeClaimInfo implements PersistentVolumeClaimInfo +type CachedPersistentVolumeClaimInfo struct { + *cache.StoreToPersistentVolumeClaimLister +} + +// GetPersistentVolumeClaimInfo fetches the claim in specified namespace with specified name +func (c *CachedPersistentVolumeClaimInfo) GetPersistentVolumeClaimInfo(namespace string, name string) (*api.PersistentVolumeClaim, error) { + return c.PersistentVolumeClaims(namespace).Get(name) } type CachedNodeInfo struct { diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index 3cc793a054..d998ce1720 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -29,6 +29,7 @@ import ( "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/client/cache" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + "k8s.io/kubernetes/pkg/controller/informers" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/runtime" @@ -65,7 +66,7 @@ type ConfigFactory struct { // a means to list all PersistentVolumes PVLister *cache.StoreToPVFetcher // a means to list all PersistentVolumeClaims - PVCLister *cache.StoreToPVCFetcher + PVCLister *cache.StoreToPersistentVolumeClaimLister // a means to list all services ServiceLister *cache.StoreToServiceLister // a means to list all controllers @@ -76,10 +77,11 @@ type ConfigFactory struct { // Close this to stop all reflectors StopEverything chan struct{} + informerFactory informers.SharedInformerFactory scheduledPodPopulator *cache.Controller nodePopulator *cache.Controller pvPopulator *cache.Controller - pvcPopulator *cache.Controller + pvcPopulator cache.ControllerInterface servicePopulator *cache.Controller controllerPopulator *cache.Controller @@ -107,14 +109,20 @@ func NewConfigFactory(client clientset.Interface, schedulerName string, hardPodA stopEverything := make(chan struct{}) schedulerCache := schedulercache.New(30*time.Second, stopEverything) + // TODO: pass this in as an argument... + informerFactory := informers.NewSharedInformerFactory(client, 0) + pvcInformer := informerFactory.PersistentVolumeClaims() + c := &ConfigFactory{ Client: client, PodQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc), ScheduledPodLister: &cache.StoreToPodLister{}, + informerFactory: informerFactory, // Only nodes in the "Ready" condition with status == "True" are schedulable NodeLister: &cache.StoreToNodeLister{}, PVLister: &cache.StoreToPVFetcher{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}, - PVCLister: &cache.StoreToPVCFetcher{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}, + PVCLister: pvcInformer.Lister(), + pvcPopulator: pvcInformer.Informer().GetController(), ServiceLister: &cache.StoreToServiceLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}, ControllerLister: &cache.StoreToReplicationControllerLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}, ReplicaSetLister: &cache.StoreToReplicaSetLister{Indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})}, @@ -162,13 +170,6 @@ func NewConfigFactory(client clientset.Interface, schedulerName string, hardPodA cache.ResourceEventHandlerFuncs{}, ) - c.PVCLister.Store, c.pvcPopulator = cache.NewInformer( - c.createPersistentVolumeClaimLW(), - &api.PersistentVolumeClaim{}, - 0, - cache.ResourceEventHandlerFuncs{}, - ) - c.ServiceLister.Indexer, c.servicePopulator = cache.NewIndexerInformer( c.createServiceLW(), &api.Service{}, @@ -434,7 +435,7 @@ func (f *ConfigFactory) getPluginArgs() (*PluginFactoryArgs, error) { NodeLister: f.NodeLister.NodeCondition(getNodeConditionPredicate()), NodeInfo: &predicates.CachedNodeInfo{StoreToNodeLister: f.NodeLister}, PVInfo: f.PVLister, - PVCInfo: f.PVCLister, + PVCInfo: &predicates.CachedPersistentVolumeClaimInfo{StoreToPersistentVolumeClaimLister: f.PVCLister}, HardPodAffinitySymmetricWeight: f.HardPodAffinitySymmetricWeight, FailureDomains: sets.NewString(failureDomainArgs...).List(), }, nil @@ -460,6 +461,9 @@ func (f *ConfigFactory) Run() { // Begin populating controllers go f.controllerPopulator.Run(f.StopEverything) + // start informers... + f.informerFactory.Start(f.StopEverything) + // Watch and cache all ReplicaSet objects. Scheduler needs to find all pods // created by the same services or ReplicationControllers/ReplicaSets, so that it can spread them correctly. // Cache this locally.