Fix a race in the scheduler.

Loop over priorityConfigs seperately. The node loop can only safely
modify result[i][index]. Before this change it sometimes modified
result[i] concurrently with other loops.

Fixes: 7164967662

==================== Test output for //pkg/scheduler/core:go_default_test:
==================
WARNING: DATA RACE
Read at 0x00c0005e8ed0 by goroutine 22:
  k8s.io/kubernetes/pkg/scheduler/core.PrioritizeNodes.func2()
      pkg/scheduler/core/generic_scheduler.go:667 +0x2ea
  k8s.io/kubernetes/vendor/k8s.io/client-go/util/workqueue.ParallelizeUntil.func1()
      staging/src/k8s.io/client-go/util/workqueue/parallelizer.go:65 +0x9e

Previous write at 0x00c0005e8ed0 by goroutine 21:
  k8s.io/kubernetes/pkg/scheduler/core.PrioritizeNodes.func2()
      pkg/scheduler/core/generic_scheduler.go:668 +0x450
  k8s.io/kubernetes/vendor/k8s.io/client-go/util/workqueue.ParallelizeUntil.func1()
      staging/src/k8s.io/client-go/util/workqueue/parallelizer.go:65 +0x9e

Goroutine 22 (running) created at:
  k8s.io/kubernetes/vendor/k8s.io/client-go/util/workqueue.ParallelizeUntil()
      staging/src/k8s.io/client-go/util/workqueue/parallelizer.go:57 +0x1a3
  k8s.io/kubernetes/pkg/scheduler/core.PrioritizeNodes()
      pkg/scheduler/core/generic_scheduler.go:682 +0x592
  k8s.io/kubernetes/pkg/scheduler/core.(*genericScheduler).Schedule()
      pkg/scheduler/core/generic_scheduler.go:186 +0x77d
  k8s.io/kubernetes/pkg/scheduler/core.TestGenericScheduler.func1()
      pkg/scheduler/core/generic_scheduler_test.go:464 +0x91f
  testing.tRunner()
      GOROOT/src/testing/testing.go:827 +0x162

Goroutine 21 (running) created at:
  k8s.io/kubernetes/vendor/k8s.io/client-go/util/workqueue.ParallelizeUntil()
      staging/src/k8s.io/client-go/util/workqueue/parallelizer.go:57 +0x1a3
  k8s.io/kubernetes/pkg/scheduler/core.PrioritizeNodes()
      pkg/scheduler/core/generic_scheduler.go:682 +0x592
  k8s.io/kubernetes/pkg/scheduler/core.(*genericScheduler).Schedule()
      pkg/scheduler/core/generic_scheduler.go:186 +0x77d
  k8s.io/kubernetes/pkg/scheduler/core.TestGenericScheduler.func1()
      pkg/scheduler/core/generic_scheduler_test.go:464 +0x91f
  testing.tRunner()
      GOROOT/src/testing/testing.go:827 +0x162
==================
--- FAIL: TestGenericScheduler (0.01s)
    --- FAIL: TestGenericScheduler/test_6 (0.00s)
        testing.go:771: race detected during execution of test
    testing.go:771: race detected during execution of test
FAIL
pull/58/head
Mike Danese 2018-11-09 11:55:24 -08:00
parent 06e737367d
commit 62c3ec969d
1 changed files with 21 additions and 16 deletions

View File

@ -653,33 +653,38 @@ func PrioritizeNodes(
results := make([]schedulerapi.HostPriorityList, len(priorityConfigs), len(priorityConfigs))
for i := range priorityConfigs {
results[i] = make(schedulerapi.HostPriorityList, len(nodes))
}
// DEPRECATED: we can remove this when all priorityConfigs implement the
// Map-Reduce pattern.
workqueue.ParallelizeUntil(context.TODO(), 16, len(priorityConfigs), func(i int) {
priorityConfig := priorityConfigs[i]
if priorityConfig.Function == nil {
results[i] = make(schedulerapi.HostPriorityList, len(nodes))
return
}
processNode := func(index int) {
nodeInfo := nodeNameToInfo[nodes[index].Name]
var err error
results[i], err = priorityConfig.Function(pod, nodeNameToInfo, nodes)
if err != nil {
appendError(err)
}
})
workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
nodeInfo := nodeNameToInfo[nodes[index].Name]
for i, priorityConfig := range priorityConfigs {
// DEPRECATED when ALL priorityConfigs have Map-Reduce pattern.
if priorityConfigs[i].Function != nil {
// Make sure that the old-style priority function only runs once.
if results[i][0].Host == "" {
results[i], err = priorityConfig.Function(pod, nodeNameToInfo, nodes)
if err != nil {
appendError(err)
}
}
if priorityConfig.Function != nil {
continue
}
var err error
results[i][index], err = priorityConfigs[i].Map(pod, meta, nodeInfo)
if err != nil {
appendError(err)
results[i][index].Host = nodes[index].Name
}
}
}
workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), processNode)
})
for i, priorityConfig := range priorityConfigs {
if priorityConfig.Reduce == nil {
continue