move getMaxVols function to predicates.go and change the params of NewMaxPDVolumeCountPredicate funcs

Signed-off-by: zhangjie <zhangjie0619@yeah.net>
pull/6/head
zhangjie 2017-09-01 17:30:24 +08:00
parent 0ee76d0258
commit 968df828a0
6 changed files with 110 additions and 97 deletions

View File

@ -18,6 +18,7 @@ go_library(
deps = [
"//pkg/api/v1/helper:go_default_library",
"//pkg/api/v1/helper/qos:go_default_library",
"//pkg/cloudprovider/providers/aws:go_default_library",
"//pkg/features:go_default_library",
"//pkg/kubelet/apis:go_default_library",
"//pkg/volume/util:go_default_library",

View File

@ -19,6 +19,8 @@ package predicates
import (
"errors"
"fmt"
"os"
"strconv"
"sync"
"k8s.io/api/core/v1"
@ -31,6 +33,7 @@ import (
"k8s.io/client-go/util/workqueue"
v1helper "k8s.io/kubernetes/pkg/api/v1/helper"
v1qos "k8s.io/kubernetes/pkg/api/v1/helper/qos"
"k8s.io/kubernetes/pkg/cloudprovider/providers/aws"
"k8s.io/kubernetes/pkg/features"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
@ -45,6 +48,24 @@ import (
const (
MatchInterPodAffinity = "MatchInterPodAffinity"
// DefaultMaxGCEPDVolumes defines the maximum number of PD Volumes for GCE
// GCE instances can have up to 16 PD volumes attached.
DefaultMaxGCEPDVolumes = 16
// DefaultMaxAzureDiskVolumes defines the maximum number of PD Volumes for Azure
// Larger Azure VMs can actually have much more disks attached.
// TODO We should determine the max based on VM size
DefaultMaxAzureDiskVolumes = 16
// KubeMaxPDVols defines the maximum number of PD Volumes per kubelet
KubeMaxPDVols = "KUBE_MAX_PD_VOLS"
// for EBSVolumeFilter
EBSVolumeFilterType = "EBS"
// for GCEPDVolumeFilter
GCEPDVolumeFilterType = "GCE"
// for AzureDiskVolumeFilter
AzureDiskVolumeFilterType = "AzureDisk"
)
// IMPORTANT NOTE for predicate developers:
@ -193,13 +214,33 @@ type VolumeFilter struct {
}
// NewMaxPDVolumeCountPredicate creates a predicate which evaluates whether a pod can fit based on the
// number of volumes which match a filter that it requests, and those that are already present. The
// maximum number is configurable to accommodate different systems.
// number of volumes which match a filter that it requests, and those that are already present.
//
// The predicate looks for both volumes used directly, as well as PVC volumes that are backed by relevant volume
// types, counts the number of unique volumes, and rejects the new pod if it would place the total count over
// the maximum.
func NewMaxPDVolumeCountPredicate(filter VolumeFilter, maxVolumes int, pvInfo PersistentVolumeInfo, pvcInfo PersistentVolumeClaimInfo) algorithm.FitPredicate {
func NewMaxPDVolumeCountPredicate(filterName string, pvInfo PersistentVolumeInfo, pvcInfo PersistentVolumeClaimInfo) algorithm.FitPredicate {
var filter VolumeFilter
var maxVolumes int
switch filterName {
case EBSVolumeFilterType:
filter = EBSVolumeFilter
maxVolumes = getMaxVols(aws.DefaultMaxEBSVolumes)
case GCEPDVolumeFilterType:
filter = GCEPDVolumeFilter
maxVolumes = getMaxVols(DefaultMaxGCEPDVolumes)
case AzureDiskVolumeFilterType:
filter = AzureDiskVolumeFilter
maxVolumes = getMaxVols(DefaultMaxAzureDiskVolumes)
default:
glog.Fatalf("Wrong filterName, Only Support %v %v %v ", EBSVolumeFilterType,
GCEPDVolumeFilterType, AzureDiskVolumeFilterType)
return nil
}
c := &MaxPDVolumeCountChecker{
filter: filter,
maxVolumes: maxVolumes,
@ -211,7 +252,23 @@ func NewMaxPDVolumeCountPredicate(filter VolumeFilter, maxVolumes int, pvInfo Pe
return c.predicate
}
// getMaxVols checks the max PD volumes environment variable, otherwise returning a default value
func getMaxVols(defaultVal int) int {
if rawMaxVols := os.Getenv(KubeMaxPDVols); rawMaxVols != "" {
if parsedMaxVols, err := strconv.Atoi(rawMaxVols); err != nil {
glog.Errorf("Unable to parse maximum PD volumes value, using default of %v: %v", defaultVal, err)
} else if parsedMaxVols <= 0 {
glog.Errorf("Maximum PD volumes must be a positive value, using default of %v", defaultVal)
} else {
return parsedMaxVols
}
}
return defaultVal
}
func (c *MaxPDVolumeCountChecker) filterVolumes(volumes []v1.Volume, namespace string, filteredVolumes map[string]bool) error {
for i := range volumes {
vol := &volumes[i]
if id, ok := c.filter.FilterVolume(vol); ok {

View File

@ -18,7 +18,9 @@ package predicates
import (
"fmt"
"os"
"reflect"
"strconv"
"testing"
"k8s.io/api/core/v1"
@ -1989,24 +1991,11 @@ func TestEBSVolumeCountConflicts(t *testing.T) {
},
}
filter := VolumeFilter{
FilterVolume: func(vol *v1.Volume) (string, bool) {
if vol.AWSElasticBlockStore != nil {
return vol.AWSElasticBlockStore.VolumeID, true
}
return "", false
},
FilterPersistentVolume: func(pv *v1.PersistentVolume) (string, bool) {
if pv.Spec.AWSElasticBlockStore != nil {
return pv.Spec.AWSElasticBlockStore.VolumeID, true
}
return "", false
},
}
expectedFailureReasons := []algorithm.PredicateFailureReason{ErrMaxVolumeCountExceeded}
for _, test := range tests {
pred := NewMaxPDVolumeCountPredicate(filter, test.maxVols, pvInfo, pvcInfo)
os.Setenv(KubeMaxPDVols, strconv.Itoa(test.maxVols))
pred := NewMaxPDVolumeCountPredicate(EBSVolumeFilterType, pvInfo, pvcInfo)
fits, reasons, err := pred(test.newPod, PredicateMetadata(test.newPod, nil), schedulercache.NewNodeInfo(test.existingPods...))
if err != nil {
t.Errorf("%s: unexpected error: %v", test.test, err)
@ -3893,3 +3882,43 @@ func TestVolumeZonePredicateMultiZone(t *testing.T) {
}
}
func TestGetMaxVols(t *testing.T) {
previousValue := os.Getenv(KubeMaxPDVols)
defaultValue := 39
tests := []struct {
rawMaxVols string
expected int
test string
}{
{
rawMaxVols: "invalid",
expected: defaultValue,
test: "Unable to parse maximum PD volumes value, using default value",
},
{
rawMaxVols: "-2",
expected: defaultValue,
test: "Maximum PD volumes must be a positive value, using default value",
},
{
rawMaxVols: "40",
expected: 40,
test: "Parse maximum PD volumes value from env",
},
}
for _, test := range tests {
os.Setenv(KubeMaxPDVols, test.rawMaxVols)
result := getMaxVols(defaultValue)
if result != test.expected {
t.Errorf("%s: expected %v got %v", test.test, test.expected, result)
}
}
os.Unsetenv(KubeMaxPDVols)
if previousValue != "" {
os.Setenv(KubeMaxPDVols, previousValue)
}
}

View File

@ -11,7 +11,6 @@ go_library(
srcs = ["defaults.go"],
importpath = "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider/defaults",
deps = [
"//pkg/cloudprovider/providers/aws:go_default_library",
"//pkg/features:go_default_library",
"//plugin/pkg/scheduler/algorithm:go_default_library",
"//plugin/pkg/scheduler/algorithm/predicates:go_default_library",

View File

@ -17,12 +17,9 @@ limitations under the License.
package defaults
import (
"os"
"strconv"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/kubernetes/pkg/cloudprovider/providers/aws"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
@ -34,19 +31,11 @@ import (
)
const (
// DefaultMaxGCEPDVolumes defines the maximum number of PD Volumes for GCE
// GCE instances can have up to 16 PD volumes attached.
DefaultMaxGCEPDVolumes = 16
// DefaultMaxAzureDiskVolumes defines the maximum number of PD Volumes for Azure
// Larger Azure VMs can actually have much more disks attached.
// TODO We should determine the max based on VM size
DefaultMaxAzureDiskVolumes = 16
// ClusterAutoscalerProvider defines the default autoscaler provider
ClusterAutoscalerProvider = "ClusterAutoscalerProvider"
// StatefulSetKind defines the name of 'StatefulSet' kind
StatefulSetKind = "StatefulSet"
// KubeMaxPDVols defines the maximum number of PD Volumes per kubelet
KubeMaxPDVols = "KUBE_MAX_PD_VOLS"
)
func init() {
@ -133,27 +122,21 @@ func defaultPredicates() sets.String {
factory.RegisterFitPredicateFactory(
"MaxEBSVolumeCount",
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
// TODO: allow for generically parameterized scheduler predicates, because this is a bit ugly
maxVols := getMaxVols(aws.DefaultMaxEBSVolumes)
return predicates.NewMaxPDVolumeCountPredicate(predicates.EBSVolumeFilter, maxVols, args.PVInfo, args.PVCInfo)
return predicates.NewMaxPDVolumeCountPredicate(predicates.EBSVolumeFilterType, args.PVInfo, args.PVCInfo)
},
),
// Fit is determined by whether or not there would be too many GCE PD volumes attached to the node
factory.RegisterFitPredicateFactory(
"MaxGCEPDVolumeCount",
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
// TODO: allow for generically parameterized scheduler predicates, because this is a bit ugly
maxVols := getMaxVols(DefaultMaxGCEPDVolumes)
return predicates.NewMaxPDVolumeCountPredicate(predicates.GCEPDVolumeFilter, maxVols, args.PVInfo, args.PVCInfo)
return predicates.NewMaxPDVolumeCountPredicate(predicates.GCEPDVolumeFilterType, args.PVInfo, args.PVCInfo)
},
),
// Fit is determined by whether or not there would be too many Azure Disk volumes attached to the node
factory.RegisterFitPredicateFactory(
"MaxAzureDiskVolumeCount",
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
// TODO: allow for generically parameterized scheduler predicates, because this is a bit ugly
maxVols := getMaxVols(DefaultMaxAzureDiskVolumes)
return predicates.NewMaxPDVolumeCountPredicate(predicates.AzureDiskVolumeFilter, maxVols, args.PVInfo, args.PVCInfo)
return predicates.NewMaxPDVolumeCountPredicate(predicates.AzureDiskVolumeFilterType, args.PVInfo, args.PVCInfo)
},
),
// Fit is determined by inter-pod affinity.
@ -262,21 +245,6 @@ func defaultPriorities() sets.String {
)
}
// getMaxVols checks the max PD volumes environment variable, otherwise returning a default value
func getMaxVols(defaultVal int) int {
if rawMaxVols := os.Getenv(KubeMaxPDVols); rawMaxVols != "" {
if parsedMaxVols, err := strconv.Atoi(rawMaxVols); err != nil {
glog.Errorf("Unable to parse maximum PD volumes value, using default of %v: %v", defaultVal, err)
} else if parsedMaxVols <= 0 {
glog.Errorf("Maximum PD volumes must be a positive value, using default of %v", defaultVal)
} else {
return parsedMaxVols
}
}
return defaultVal
}
func copyAndReplace(set sets.String, replaceWhat, replaceWith string) sets.String {
result := sets.NewString(set.List()...)
if result.Has(replaceWhat) {

View File

@ -17,52 +17,11 @@ limitations under the License.
package defaults
import (
"os"
"testing"
"k8s.io/apimachinery/pkg/util/sets"
)
func TestGetMaxVols(t *testing.T) {
previousValue := os.Getenv(KubeMaxPDVols)
defaultValue := 39
tests := []struct {
rawMaxVols string
expected int
test string
}{
{
rawMaxVols: "invalid",
expected: defaultValue,
test: "Unable to parse maximum PD volumes value, using default value",
},
{
rawMaxVols: "-2",
expected: defaultValue,
test: "Maximum PD volumes must be a positive value, using default value",
},
{
rawMaxVols: "40",
expected: 40,
test: "Parse maximum PD volumes value from env",
},
}
for _, test := range tests {
os.Setenv(KubeMaxPDVols, test.rawMaxVols)
result := getMaxVols(defaultValue)
if result != test.expected {
t.Errorf("%s: expected %v got %v", test.test, test.expected, result)
}
}
os.Unsetenv(KubeMaxPDVols)
if previousValue != "" {
os.Setenv(KubeMaxPDVols, previousValue)
}
}
func TestCopyAndReplace(t *testing.T) {
testCases := []struct {
set sets.String