Merge pull request #55262 from liggitt/schedulercache

Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Fix 'Schedulercache is corrupted' error

Fixes #50916

If an Assume()ed pod is Add()ed with a different nodeName, the podStates view of the pod is not corrected to reflect the actual nodeName. On the next Update(), the scheduler observes the mismatch and process exits.

```release-note
Fixed 'Schedulercache is corrupted' error in kube-scheduler
```
pull/6/head
Kubernetes Submit Queue 2017-11-07 23:23:33 -08:00 committed by GitHub
commit 33f873dbbe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 63 additions and 0 deletions

View File

@ -241,6 +241,7 @@ func (cache *schedulerCache) AddPod(pod *v1.Pod) error {
} }
delete(cache.assumedPods, key) delete(cache.assumedPods, key)
cache.podStates[key].deadline = nil cache.podStates[key].deadline = nil
cache.podStates[key].pod = pod
case !ok: case !ok:
// Pod was expired. We should add it back. // Pod was expired. We should add it back.
cache.addPod(pod) cache.addPod(pod)

View File

@ -300,6 +300,68 @@ func TestAddPodWillConfirm(t *testing.T) {
} }
} }
// TestAddPodWillReplaceAssumed tests that a pod being Add()ed will replace any assumed pod.
func TestAddPodWillReplaceAssumed(t *testing.T) {
now := time.Now()
ttl := 10 * time.Second
assumedPod := makeBasePod(t, "assumed-node-1", "test-1", "100m", "500", "", []v1.ContainerPort{{HostPort: 80}})
addedPod := makeBasePod(t, "actual-node", "test-1", "100m", "500", "", []v1.ContainerPort{{HostPort: 80}})
updatedPod := makeBasePod(t, "actual-node", "test-1", "200m", "500", "", []v1.ContainerPort{{HostPort: 90}})
tests := []struct {
podsToAssume []*v1.Pod
podsToAdd []*v1.Pod
podsToUpdate [][]*v1.Pod
wNodeInfo map[string]*NodeInfo
}{{
podsToAssume: []*v1.Pod{assumedPod.DeepCopy()},
podsToAdd: []*v1.Pod{addedPod.DeepCopy()},
podsToUpdate: [][]*v1.Pod{{addedPod.DeepCopy(), updatedPod.DeepCopy()}},
wNodeInfo: map[string]*NodeInfo{
"assumed-node": nil,
"actual-node": {
requestedResource: &Resource{
MilliCPU: 200,
Memory: 500,
},
nonzeroRequest: &Resource{
MilliCPU: 200,
Memory: 500,
},
allocatableResource: &Resource{},
pods: []*v1.Pod{updatedPod.DeepCopy()},
usedPorts: map[int]bool{90: true},
},
},
}}
for i, tt := range tests {
cache := newSchedulerCache(ttl, time.Second, nil)
for _, podToAssume := range tt.podsToAssume {
if err := assumeAndFinishBinding(cache, podToAssume, now); err != nil {
t.Fatalf("assumePod failed: %v", err)
}
}
for _, podToAdd := range tt.podsToAdd {
if err := cache.AddPod(podToAdd); err != nil {
t.Fatalf("AddPod failed: %v", err)
}
}
for _, podToUpdate := range tt.podsToUpdate {
if err := cache.UpdatePod(podToUpdate[0], podToUpdate[1]); err != nil {
t.Fatalf("UpdatePod failed: %v", err)
}
}
for nodeName, expected := range tt.wNodeInfo {
t.Log(nodeName)
n := cache.nodes[nodeName]
deepEqualWithoutGeneration(t, i, n, expected)
}
}
}
// TestAddPodAfterExpiration tests that a pod being Add()ed will be added back if expired. // TestAddPodAfterExpiration tests that a pod being Add()ed will be added back if expired.
func TestAddPodAfterExpiration(t *testing.T) { func TestAddPodAfterExpiration(t *testing.T) {
nodeName := "node" nodeName := "node"