Merge pull request #45453 from k82cn/k8s_45220

Automatic merge from submit-queue (batch tested with PRs 45453, 45307, 44987)

Init cache with assigned non-terminated pods before scheduling

**Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: fixes #45220

**Release note**:

```release-note
The fix makes scheduling go routine waiting for cache (e.g. Pod) to be synced.
```
pull/6/head
Kubernetes Submit Queue 2017-05-09 20:23:37 -07:00 committed by GitHub
commit 61593ba8b8
12 changed files with 201 additions and 1 deletions

View File

@ -127,6 +127,28 @@ func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
}
}
// WaitForCacheSync waits for all started informers' cache were synced.
func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {
informers := func()map[reflect.Type]cache.SharedIndexInformer{
f.lock.Lock()
defer f.lock.Unlock()
informers := map[reflect.Type]cache.SharedIndexInformer{}
for informerType, informer := range f.informers {
if f.startedInformers[informerType] {
informers[informerType] = informer
}
}
return informers
}()
res := map[reflect.Type]bool{}
for informType, informer := range informers {
res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)
}
return res
}
// InternalInformerFor returns the SharedIndexInformer for obj using an internal
// client.
func (f *sharedInformerFactory) InformerFor(obj {{.runtimeObject|raw}}, newFunc {{.interfacesNewInformerFunc|raw}}) {{.cacheSharedIndexInformer|raw}} {
@ -152,6 +174,7 @@ var sharedInformerFactoryInterface = `
type SharedInformerFactory interface {
{{.informerFactoryInterface|raw}}
ForResource(resource {{.schemaGroupVersionResource|raw}}) (GenericInformer, error)
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
{{$gvInterfaces := .gvInterfaces}}
{{range $groupName, $group := .groupVersions}}{{$groupName}}() {{index $gvInterfaces $groupName|raw}}

View File

@ -73,6 +73,28 @@ func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
}
}
// WaitForCacheSync waits for all started informers' cache were synced.
func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {
informers := func() map[reflect.Type]cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
informers := map[reflect.Type]cache.SharedIndexInformer{}
for informerType, informer := range f.informers {
if f.startedInformers[informerType] {
informers[informerType] = informer
}
}
return informers
}()
res := map[reflect.Type]bool{}
for informType, informer := range informers {
res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)
}
return res
}
// InternalInformerFor returns the SharedIndexInformer for obj using an internal
// client.
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
@ -95,6 +117,7 @@ func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internal
type SharedInformerFactory interface {
internalinterfaces.SharedInformerFactory
ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
Apps() apps.Interface
Autoscaling() autoscaling.Interface

View File

@ -73,6 +73,28 @@ func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
}
}
// WaitForCacheSync waits for all started informers' cache were synced.
func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {
informers := func() map[reflect.Type]cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
informers := map[reflect.Type]cache.SharedIndexInformer{}
for informerType, informer := range f.informers {
if f.startedInformers[informerType] {
informers[informerType] = informer
}
}
return informers
}()
res := map[reflect.Type]bool{}
for informType, informer := range informers {
res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)
}
return res
}
// InternalInformerFor returns the SharedIndexInformer for obj using an internal
// client.
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
@ -95,6 +117,7 @@ func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internal
type SharedInformerFactory interface {
internalinterfaces.SharedInformerFactory
ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
Apps() apps.Interface
Autoscaling() autoscaling.Interface

View File

@ -94,6 +94,8 @@ func Run(s *options.SchedulerServer) error {
stop := make(chan struct{})
defer close(stop)
informerFactory.Start(stop)
// Waiting for all cache to sync before scheduling.
informerFactory.WaitForCacheSync(stop)
run := func(_ <-chan struct{}) {
sched.Run()

View File

@ -380,6 +380,9 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
Algorithm: algo,
Binder: &binder{f.client},
PodConditionUpdater: &podConditionUpdater{f.client},
WaitForCacheSync: func() bool {
return cache.WaitForCacheSync(f.StopEverything, f.scheduledPodPopulator.HasSynced)
},
NextPod: func() *v1.Pod {
return f.getNextPod()
},

View File

@ -110,6 +110,10 @@ type Config struct {
// stale while they sit in a channel.
NextPod func() *v1.Pod
// WaitForCacheSync waits for scheduler cache to populate.
// It returns true if it was successful, false if the controller should shutdown.
WaitForCacheSync func() bool
// Error is called if there is an error. It is passed the pod in
// question, and the error
Error func(*v1.Pod, error)
@ -140,8 +144,12 @@ func NewFromConfigurator(c Configurator, modifiers ...func(c *Config)) (*Schedul
return s, nil
}
// Run begins watching and scheduling. It starts a goroutine and returns immediately.
// Run begins watching and scheduling. It waits for cache to be synced, then starts a goroutine and returns immediately.
func (sched *Scheduler) Run() {
if !sched.config.WaitForCacheSync() {
return
}
go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}

View File

@ -543,6 +543,9 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
bindingChan <- b
return nil
}},
WaitForCacheSync: func() bool {
return true
},
NextPod: func() *v1.Pod {
return clientcache.Pop(queuedPodStore).(*v1.Pod)
},

View File

@ -73,6 +73,28 @@ func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
}
}
// WaitForCacheSync waits for all started informers' cache were synced.
func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {
informers := func() map[reflect.Type]cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
informers := map[reflect.Type]cache.SharedIndexInformer{}
for informerType, informer := range f.informers {
if f.startedInformers[informerType] {
informers[informerType] = informer
}
}
return informers
}()
res := map[reflect.Type]bool{}
for informType, informer := range informers {
res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)
}
return res
}
// InternalInformerFor returns the SharedIndexInformer for obj using an internal
// client.
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
@ -95,6 +117,7 @@ func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internal
type SharedInformerFactory interface {
internalinterfaces.SharedInformerFactory
ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
Apps() apps.Interface
Autoscaling() autoscaling.Interface

View File

@ -64,6 +64,28 @@ func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
}
}
// WaitForCacheSync waits for all started informers' cache were synced.
func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {
informers := func() map[reflect.Type]cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
informers := map[reflect.Type]cache.SharedIndexInformer{}
for informerType, informer := range f.informers {
if f.startedInformers[informerType] {
informers[informerType] = informer
}
}
return informers
}()
res := map[reflect.Type]bool{}
for informType, informer := range informers {
res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)
}
return res
}
// InternalInformerFor returns the SharedIndexInformer for obj using an internal
// client.
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
@ -86,6 +108,7 @@ func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internal
type SharedInformerFactory interface {
internalinterfaces.SharedInformerFactory
ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
Apiregistration() apiregistration.Interface
}

View File

@ -64,6 +64,28 @@ func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
}
}
// WaitForCacheSync waits for all started informers' cache were synced.
func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {
informers := func() map[reflect.Type]cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
informers := map[reflect.Type]cache.SharedIndexInformer{}
for informerType, informer := range f.informers {
if f.startedInformers[informerType] {
informers[informerType] = informer
}
}
return informers
}()
res := map[reflect.Type]bool{}
for informType, informer := range informers {
res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)
}
return res
}
// InternalInformerFor returns the SharedIndexInformer for obj using an internal
// client.
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
@ -86,6 +108,7 @@ func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internal
type SharedInformerFactory interface {
internalinterfaces.SharedInformerFactory
ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
Apiregistration() apiregistration.Interface
}

View File

@ -64,6 +64,28 @@ func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
}
}
// WaitForCacheSync waits for all started informers' cache were synced.
func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {
informers := func() map[reflect.Type]cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
informers := map[reflect.Type]cache.SharedIndexInformer{}
for informerType, informer := range f.informers {
if f.startedInformers[informerType] {
informers[informerType] = informer
}
}
return informers
}()
res := map[reflect.Type]bool{}
for informType, informer := range informers {
res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)
}
return res
}
// InternalInformerFor returns the SharedIndexInformer for obj using an internal
// client.
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
@ -86,6 +108,7 @@ func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internal
type SharedInformerFactory interface {
internalinterfaces.SharedInformerFactory
ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
Apiextensions() apiextensions.Interface
}

View File

@ -64,6 +64,28 @@ func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
}
}
// WaitForCacheSync waits for all started informers' cache were synced.
func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {
informers := func() map[reflect.Type]cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
informers := map[reflect.Type]cache.SharedIndexInformer{}
for informerType, informer := range f.informers {
if f.startedInformers[informerType] {
informers[informerType] = informer
}
}
return informers
}()
res := map[reflect.Type]bool{}
for informType, informer := range informers {
res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)
}
return res
}
// InternalInformerFor returns the SharedIndexInformer for obj using an internal
// client.
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
@ -86,6 +108,7 @@ func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internal
type SharedInformerFactory interface {
internalinterfaces.SharedInformerFactory
ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
Apiextensions() apiextensions.Interface
}