Merge pull request #52723 from k82cn/k8s_42001_5

Automatic merge from submit-queue (batch tested with PRs 52723, 53271). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Apply algorithm in scheduler by feature gates.

**Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: part of #42001

**Release note**:
```release-note
Apply algorithm in scheduler by feature gates.
```
pull/6/head
Kubernetes Submit Queue 2017-10-03 05:15:20 -07:00 committed by GitHub
commit 0c6970bf7b
10 changed files with 118 additions and 68 deletions

View File

@ -298,6 +298,7 @@ func NewNodeController(
zoneStates: make(map[string]ZoneState),
runTaintManager: runTaintManager,
useTaintBasedEvictions: useTaintBasedEvictions && runTaintManager,
taintNodeByCondition: taintNodeByCondition,
}
if useTaintBasedEvictions {
glog.Infof("Controller is using taint based evictions.")
@ -394,6 +395,7 @@ func NewNodeController(
}
if nc.taintNodeByCondition {
glog.Infof("Controller will taint node by condition.")
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: util.CreateAddNodeHandler(func(node *v1.Node) error {
return nc.doNoScheduleTaintingPass(node)
@ -618,7 +620,7 @@ func (nc *Controller) monitorNodeStatus() error {
}
return false, nil
}); err != nil {
glog.Errorf("Update status of Node %v from Controller error : %v. "+
glog.Errorf("Update status of Node '%v' from Controller error: %v. "+
"Skipping - no pods will be evicted.", node.Name, err)
continue
}

View File

@ -36,7 +36,7 @@ import (
"k8s.io/kubernetes/pkg/util/configz"
"k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/options"
_ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider"
"k8s.io/kubernetes/plugin/pkg/scheduler/factory"
"github.com/golang/glog"
@ -81,6 +81,9 @@ func Run(s *options.SchedulerServer) error {
// cache only non-terminal pods
podInformer := factory.NewPodInformer(kubecli, 0)
// Apply algorithms based on feature gates.
algorithmprovider.ApplyFeatureGates()
sched, err := CreateScheduler(
s,
kubecli,

View File

@ -16,7 +16,10 @@ go_test(
name = "go_default_test",
srcs = ["plugins_test.go"],
library = ":go_default_library",
deps = ["//plugin/pkg/scheduler/factory:go_default_library"],
deps = [
"//plugin/pkg/scheduler/factory:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
],
)
filegroup(

View File

@ -39,7 +39,6 @@ go_test(
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/client-go/informers:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",

View File

@ -60,12 +60,7 @@ func init() {
return priorities.PriorityMetadata
})
// Registers algorithm providers. By default we use 'DefaultProvider', but user can specify one to be used
// by specifying flag.
factory.RegisterAlgorithmProvider(factory.DefaultProvider, defaultPredicates(), defaultPriorities())
// Cluster autoscaler friendly scheduling algorithm.
factory.RegisterAlgorithmProvider(ClusterAutoscalerProvider, defaultPredicates(),
copyAndReplace(defaultPriorities(), "LeastRequestedPriority", "MostRequestedPriority"))
registerAlgorithmProvider(defaultPredicates(), defaultPriorities())
// IMPORTANT NOTES for predicate developers:
// We are using cached predicate result for pods belonging to the same equivalence class.
@ -126,7 +121,7 @@ func init() {
}
func defaultPredicates() sets.String {
predSet := sets.NewString(
return sets.NewString(
// Fit is determined by volume zone requirements.
factory.RegisterFitPredicateFactory(
"NoVolumeZoneConflict",
@ -182,6 +177,12 @@ func defaultPredicates() sets.String {
// Fit is determined by node disk pressure condition.
factory.RegisterFitPredicate("CheckNodeDiskPressure", predicates.CheckNodeDiskPressurePredicate),
// Fit is determied by node condtions: not ready, network unavailable and out of disk.
factory.RegisterMandatoryFitPredicate("CheckNodeCondition", predicates.CheckNodeConditionPredicate),
// Fit is determined based on whether a pod can tolerate all of the node's taints
factory.RegisterFitPredicate("PodToleratesNodeTaints", predicates.PodToleratesNodeTaints),
// Fit is determined by volume zone requirements.
factory.RegisterFitPredicateFactory(
"NoVolumeNodeConflict",
@ -190,19 +191,33 @@ func defaultPredicates() sets.String {
},
),
)
}
// ApplyFeatureGates applies algorithm by feature gates.
func ApplyFeatureGates() {
predSet := defaultPredicates()
if utilfeature.DefaultFeatureGate.Enabled(features.TaintNodesByCondition) {
// Remove "CheckNodeCondition" predicate
factory.RemoveFitPredicate("CheckNodeCondition")
predSet.Delete("CheckNodeCondition")
// Fit is determined based on whether a pod can tolerate all of the node's taints
predSet.Insert(factory.RegisterMandatoryFitPredicate("PodToleratesNodeTaints", predicates.PodToleratesNodeTaints))
glog.Warningf("TaintNodesByCondition is enabled, PodToleratesNodeTaints predicate is mandatory")
} else {
// Fit is determied by node condtions: not ready, network unavailable and out of disk.
predSet.Insert(factory.RegisterMandatoryFitPredicate("CheckNodeCondition", predicates.CheckNodeConditionPredicate))
// Fit is determined based on whether a pod can tolerate all of the node's taints
predSet.Insert(factory.RegisterFitPredicate("PodToleratesNodeTaints", predicates.PodToleratesNodeTaints))
}
return predSet
registerAlgorithmProvider(predSet, defaultPriorities())
}
func registerAlgorithmProvider(predSet, priSet sets.String) {
// Registers algorithm providers. By default we use 'DefaultProvider', but user can specify one to be used
// by specifying flag.
factory.RegisterAlgorithmProvider(factory.DefaultProvider, predSet, priSet)
// Cluster autoscaler friendly scheduling algorithm.
factory.RegisterAlgorithmProvider(ClusterAutoscalerProvider, predSet,
copyAndReplace(priSet, "LeastRequestedPriority", "MostRequestedPriority"))
}
func defaultPriorities() sets.String {

View File

@ -17,10 +17,10 @@ limitations under the License.
package defaults
import (
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"os"
"testing"
"k8s.io/apimachinery/pkg/util/sets"
)
func TestGetMaxVols(t *testing.T) {
@ -106,51 +106,22 @@ func TestDefaultPriorities(t *testing.T) {
}
func TestDefaultPredicates(t *testing.T) {
testCases := []struct {
actionFunc func(value string) error
actionParam string
expected sets.String
}{
{
actionFunc: utilfeature.DefaultFeatureGate.Set,
actionParam: "TaintNodesByCondition=true",
expected: sets.NewString(
"NoVolumeZoneConflict",
"MaxEBSVolumeCount",
"MaxGCEPDVolumeCount",
"MaxAzureDiskVolumeCount",
"MatchInterPodAffinity",
"NoDiskConflict",
"GeneralPredicates",
"CheckNodeMemoryPressure",
"CheckNodeDiskPressure",
"NoVolumeNodeConflict",
"PodToleratesNodeTaints",
),
},
{
actionFunc: utilfeature.DefaultFeatureGate.Set,
actionParam: "TaintNodesByCondition=false",
expected: sets.NewString(
"NoVolumeZoneConflict",
"MaxEBSVolumeCount",
"MaxGCEPDVolumeCount",
"MaxAzureDiskVolumeCount",
"MatchInterPodAffinity",
"NoDiskConflict",
"GeneralPredicates",
"CheckNodeMemoryPressure",
"CheckNodeDiskPressure",
"NoVolumeNodeConflict",
"CheckNodeCondition",
"PodToleratesNodeTaints",
),
},
}
for _, testCase := range testCases {
testCase.actionFunc(testCase.actionParam)
if result := defaultPredicates(); !result.Equal(testCase.expected) {
t.Errorf("expected %v got %v", testCase.expected, result)
}
result := sets.NewString(
"NoVolumeZoneConflict",
"MaxEBSVolumeCount",
"MaxGCEPDVolumeCount",
"MaxAzureDiskVolumeCount",
"MatchInterPodAffinity",
"NoDiskConflict",
"GeneralPredicates",
"CheckNodeMemoryPressure",
"CheckNodeDiskPressure",
"NoVolumeNodeConflict",
"CheckNodeCondition",
"PodToleratesNodeTaints",
)
if expected := defaultPredicates(); !result.Equal(expected) {
t.Errorf("expected %v got %v", expected, result)
}
}

View File

@ -17,6 +17,10 @@ limitations under the License.
package algorithmprovider
import (
// Import defaults of algorithmprovider for initialization.
_ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider/defaults"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider/defaults"
)
// ApplyFeatureGates applies algorithm by feature gates.
func ApplyFeatureGates() {
defaults.ApplyFeatureGates()
}

View File

@ -19,6 +19,7 @@ package algorithmprovider
import (
"testing"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/plugin/pkg/scheduler/factory"
)
@ -63,3 +64,46 @@ func TestAlgorithmProviders(t *testing.T) {
}
}
}
func TestApplyFeatureGates(t *testing.T) {
for _, pn := range algorithmProviderNames {
p, err := factory.GetAlgorithmProvider(pn)
if err != nil {
t.Errorf("Error retrieving '%s' provider: %v", pn, err)
break
}
if !p.FitPredicateKeys.Has("CheckNodeCondition") {
t.Errorf("Failed to find predicate: 'CheckNodeCondition'")
break
}
if !p.FitPredicateKeys.Has("PodToleratesNodeTaints") {
t.Errorf("Failed to find predicate: 'PodToleratesNodeTaints'")
break
}
}
// Apply features for algorithm providers.
utilfeature.DefaultFeatureGate.Set("TaintNodesByCondition=True")
ApplyFeatureGates()
for _, pn := range algorithmProviderNames {
p, err := factory.GetAlgorithmProvider(pn)
if err != nil {
t.Errorf("Error retrieving '%s' provider: %v", pn, err)
break
}
if !p.FitPredicateKeys.Has("PodToleratesNodeTaints") {
t.Errorf("Failed to find predicate: 'PodToleratesNodeTaints'")
break
}
if p.FitPredicateKeys.Has("CheckNodeCondition") {
t.Errorf("Unexpected predicate: 'CheckNodeCondition'")
break
}
}
}

View File

@ -209,7 +209,6 @@ func NewConfigFactory(
// they may need to call.
c.scheduledPodLister = assignedPodLister{podInformer.Lister()}
// Only nodes in the "Ready" condition with status == "True" are schedulable
nodeInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: c.addNodeToCache,

View File

@ -105,6 +105,16 @@ func RegisterFitPredicate(name string, predicate algorithm.FitPredicate) string
return RegisterFitPredicateFactory(name, func(PluginFactoryArgs) algorithm.FitPredicate { return predicate })
}
// RemoveFitPredicate removes a fit predicate from factory.
func RemoveFitPredicate(name string) {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
validateAlgorithmNameOrDie(name)
delete(fitPredicateMap, name)
mandatoryFitPredicates.Delete(name)
}
// RegisterMandatoryFitPredicate registers a fit predicate with the algorithm registry, the predicate is used by
// kubelet, DaemonSet; it is always included in configuration. Returns the name with which the predicate was
// registered.