Merge pull request #38503 from dshulyak/ttlassumed

Automatic merge from submit-queue

Ensure that assumed pod won't be expired until the end of binding

In case if api server is overloaded and will reply with 429 too many requests error, binding may take longer than ttl of scheduler cache for assumed pods 1199d42210/pkg/client/restclient/request.go (L787-L850) 

This problem was mitigated by using this fix e4d215d508 and increased rate limit for api server. But it is possible that it will occur again.
pull/6/head
Kubernetes Submit Queue 2017-01-03 04:15:06 -08:00 committed by GitHub
commit 2b7899ae46
6 changed files with 143 additions and 14 deletions

View File

@ -138,6 +138,9 @@ func (s *Scheduler) scheduleOne() {
// If binding succeeded then PodScheduled condition will be updated in apiserver so that
// it's atomic with setting host.
err := s.config.Binder.Bind(b)
if err := s.config.SchedulerCache.FinishBinding(&assumed); err != nil {
glog.Errorf("scheduler cache FinishBinding failed: %v", err)
}
if err != nil {
glog.V(1).Infof("Failed to bind pod: %v/%v", pod.Namespace, pod.Name)
if err := s.config.SchedulerCache.ForgetPod(&assumed); err != nil {

View File

@ -297,6 +297,65 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
}
}
// Scheduler should preserve predicate constraint even if binding was longer
// than cache ttl
func TestSchedulerErrorWithLongBinding(t *testing.T) {
stop := make(chan struct{})
defer close(stop)
firstPod := podWithPort("foo", "", 8080)
conflictPod := podWithPort("bar", "", 8080)
pods := map[string]*v1.Pod{firstPod.Name: firstPod, conflictPod.Name: conflictPod}
for _, test := range []struct {
Expected map[string]bool
CacheTTL time.Duration
BindingDuration time.Duration
}{
{
Expected: map[string]bool{firstPod.Name: true},
CacheTTL: 100 * time.Millisecond,
BindingDuration: 300 * time.Millisecond,
},
{
Expected: map[string]bool{firstPod.Name: true},
CacheTTL: 10 * time.Second,
BindingDuration: 300 * time.Millisecond,
},
} {
queuedPodStore := clientcache.NewFIFO(clientcache.MetaNamespaceKeyFunc)
scache := schedulercache.New(test.CacheTTL, stop)
node := v1.Node{ObjectMeta: v1.ObjectMeta{Name: "machine1"}}
scache.AddNode(&node)
nodeLister := algorithm.FakeNodeLister([]*v1.Node{&node})
predicateMap := map[string]algorithm.FitPredicate{"PodFitsHostPorts": predicates.PodFitsHostPorts}
scheduler, bindingChan := setupTestSchedulerLongBindingWithRetry(
queuedPodStore, scache, nodeLister, predicateMap, stop, test.BindingDuration)
scheduler.Run()
queuedPodStore.Add(firstPod)
queuedPodStore.Add(conflictPod)
resultBindings := map[string]bool{}
waitChan := time.After(5 * time.Second)
for finished := false; !finished; {
select {
case b := <-bindingChan:
resultBindings[b.Name] = true
p := pods[b.Name]
p.Spec.NodeName = b.Target.Name
scache.AddPod(p)
case <-waitChan:
finished = true
}
}
if !reflect.DeepEqual(resultBindings, test.Expected) {
t.Errorf("Result binding are not equal to expected. %v != %v", resultBindings, test.Expected)
}
}
}
// queuedPodStore: pods queued before processing.
// cache: scheduler cache that might contain assumed pods.
func setupTestSchedulerWithOnePodOnNode(t *testing.T, queuedPodStore *clientcache.FIFO, scache schedulercache.Cache,
@ -429,3 +488,34 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache.
}
return New(cfg), bindingChan, errChan
}
func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, scache schedulercache.Cache, nodeLister algorithm.FakeNodeLister, predicateMap map[string]algorithm.FitPredicate, stop chan struct{}, bindingTime time.Duration) (*Scheduler, chan *v1.Binding) {
algo := NewGenericScheduler(
scache,
predicateMap,
algorithm.EmptyMetadataProducer,
[]algorithm.PriorityConfig{},
algorithm.EmptyMetadataProducer,
[]algorithm.SchedulerExtender{})
bindingChan := make(chan *v1.Binding, 2)
cfg := &Config{
SchedulerCache: scache,
NodeLister: nodeLister,
Algorithm: algo,
Binder: fakeBinder{func(b *v1.Binding) error {
time.Sleep(bindingTime)
bindingChan <- b
return nil
}},
NextPod: func() *v1.Pod {
return clientcache.Pop(queuedPodStore).(*v1.Pod)
},
Error: func(p *v1.Pod, err error) {
queuedPodStore.AddIfNotPresent(p)
},
Recorder: &record.FakeRecorder{},
PodConditionUpdater: fakePodConditionUpdater{},
StopEverything: stop,
}
return New(cfg), bindingChan
}

View File

@ -60,6 +60,8 @@ type podState struct {
pod *v1.Pod
// Used by assumedPod to determinate expiration.
deadline *time.Time
// Used to block cache from expiring assumedPod if binding still runs
bindingFinished bool
}
func newSchedulerCache(ttl, period time.Duration, stop <-chan struct{}) *schedulerCache {
@ -105,11 +107,6 @@ func (cache *schedulerCache) List(selector labels.Selector) ([]*v1.Pod, error) {
}
func (cache *schedulerCache) AssumePod(pod *v1.Pod) error {
return cache.assumePod(pod, time.Now())
}
// assumePod exists for making test deterministic by taking time as input argument.
func (cache *schedulerCache) assumePod(pod *v1.Pod, now time.Time) error {
key, err := getPodKey(pod)
if err != nil {
return err
@ -122,16 +119,38 @@ func (cache *schedulerCache) assumePod(pod *v1.Pod, now time.Time) error {
}
cache.addPod(pod)
dl := now.Add(cache.ttl)
ps := &podState{
pod: pod,
deadline: &dl,
pod: pod,
}
cache.podStates[key] = ps
cache.assumedPods[key] = true
return nil
}
func (cache *schedulerCache) FinishBinding(pod *v1.Pod) error {
return cache.finishBinding(pod, time.Now())
}
// finishBinding exists to make tests determinitistic by injecting now as an argument
func (cache *schedulerCache) finishBinding(pod *v1.Pod, now time.Time) error {
key, err := getPodKey(pod)
if err != nil {
return err
}
cache.mu.Lock()
defer cache.mu.Unlock()
glog.V(5).Infof("Finished binding for pod %v. Can be expired.", key)
currState, ok := cache.podStates[key]
if ok && cache.assumedPods[key] {
dl := now.Add(cache.ttl)
currState.bindingFinished = true
currState.deadline = &dl
}
return nil
}
func (cache *schedulerCache) ForgetPod(pod *v1.Pod) error {
key, err := getPodKey(pod)
if err != nil {
@ -343,6 +362,11 @@ func (cache *schedulerCache) cleanupAssumedPods(now time.Time) {
if !ok {
panic("Key found in assumed set but not in podStates. Potentially a logical error.")
}
if !ps.bindingFinished {
glog.Warningf("Couldn't expire cache for pod %v/%v. Binding is still in progress.",
ps.pod.Namespace, ps.pod.Name)
continue
}
if now.After(*ps.deadline) {
glog.Warningf("Pod %s/%s expired", ps.pod.Namespace, ps.pod.Name)
if err := cache.expirePod(key, ps); err != nil {

View File

@ -123,6 +123,13 @@ type testExpirePodStruct struct {
assumedTime time.Time
}
func assumeAndFinishBinding(cache *schedulerCache, pod *v1.Pod, assumedTime time.Time) error {
if err := cache.AssumePod(pod); err != nil {
return err
}
return cache.finishBinding(pod, assumedTime)
}
// TestExpirePod tests that assumed pods will be removed if expired.
// The removal will be reflected in node info.
func TestExpirePod(t *testing.T) {
@ -168,7 +175,7 @@ func TestExpirePod(t *testing.T) {
cache := newSchedulerCache(ttl, time.Second, nil)
for _, pod := range tt.pods {
if err := cache.assumePod(pod.pod, pod.assumedTime); err != nil {
if err := assumeAndFinishBinding(cache, pod.pod, pod.assumedTime); err != nil {
t.Fatalf("assumePod failed: %v", err)
}
}
@ -215,7 +222,7 @@ func TestAddPodWillConfirm(t *testing.T) {
for i, tt := range tests {
cache := newSchedulerCache(ttl, time.Second, nil)
for _, podToAssume := range tt.podsToAssume {
if err := cache.assumePod(podToAssume, now); err != nil {
if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil {
t.Fatalf("assumePod failed: %v", err)
}
}
@ -259,7 +266,7 @@ func TestAddPodAfterExpiration(t *testing.T) {
now := time.Now()
for i, tt := range tests {
cache := newSchedulerCache(ttl, time.Second, nil)
if err := cache.assumePod(tt.pod, now); err != nil {
if err := assumeAndFinishBinding(cache, tt.pod, now); err != nil {
t.Fatalf("assumePod failed: %v", err)
}
cache.cleanupAssumedPods(now.Add(2 * ttl))
@ -388,7 +395,7 @@ func TestExpireAddUpdatePod(t *testing.T) {
for _, tt := range tests {
cache := newSchedulerCache(ttl, time.Second, nil)
for _, podToAssume := range tt.podsToAssume {
if err := cache.assumePod(podToAssume, now); err != nil {
if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil {
t.Fatalf("assumePod failed: %v", err)
}
}
@ -471,7 +478,7 @@ func TestForgetPod(t *testing.T) {
for i, tt := range tests {
cache := newSchedulerCache(ttl, time.Second, nil)
for _, pod := range tt.pods {
if err := cache.assumePod(pod, now); err != nil {
if err := assumeAndFinishBinding(cache, pod, now); err != nil {
t.Fatalf("assumePod failed: %v", err)
}
}
@ -565,7 +572,7 @@ func setupCacheWithAssumedPods(b *testing.B, podNum int, assumedTime time.Time)
objName := fmt.Sprintf("%s-pod-%d", nodeName, i%10)
pod := makeBasePod(nodeName, objName, "0", "0", nil)
err := cache.assumePod(pod, assumedTime)
err := assumeAndFinishBinding(cache, pod, assumedTime)
if err != nil {
b.Fatalf("assumePod failed: %v", err)
}

View File

@ -61,6 +61,9 @@ type Cache interface {
// After expiration, its information would be subtracted.
AssumePod(pod *v1.Pod) error
// FinishBinding signals that cache for assumed pod can be expired
FinishBinding(pod *v1.Pod) error
// ForgetPod removes an assumed pod from cache.
ForgetPod(pod *v1.Pod) error

View File

@ -32,6 +32,8 @@ func (f *FakeCache) AssumePod(pod *v1.Pod) error {
return nil
}
func (f *FakeCache) FinishBinding(pod *v1.Pod) error { return nil }
func (f *FakeCache) ForgetPod(pod *v1.Pod) error { return nil }
func (f *FakeCache) AddPod(pod *v1.Pod) error { return nil }