mirror of https://github.com/k3s-io/k3s
Revert "Revert revert of equivalence class hash calculation in scheduler"
parent
915798d229
commit
4386751b5d
|
@ -17,8 +17,12 @@ limitations under the License.
|
|||
package predicates
|
||||
|
||||
import (
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/kubernetes/pkg/scheduler/algorithm"
|
||||
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
|
||||
)
|
||||
|
||||
|
@ -66,6 +70,69 @@ func CreateSelectorFromLabels(aL map[string]string) labels.Selector {
|
|||
return labels.Set(aL).AsSelector()
|
||||
}
|
||||
|
||||
// EquivalencePodGenerator is a generator of equivalence class for pod with consideration of PVC info.
|
||||
type EquivalencePodGenerator struct {
|
||||
pvcInfo PersistentVolumeClaimInfo
|
||||
}
|
||||
|
||||
// NewEquivalencePodGenerator returns a getEquivalencePod method with consideration of PVC info.
|
||||
func NewEquivalencePodGenerator(pvcInfo PersistentVolumeClaimInfo) algorithm.GetEquivalencePodFunc {
|
||||
g := &EquivalencePodGenerator{
|
||||
pvcInfo: pvcInfo,
|
||||
}
|
||||
return g.getEquivalencePod
|
||||
}
|
||||
|
||||
// GetEquivalencePod returns a EquivalencePod which contains a group of pod attributes which can be reused.
|
||||
func (e *EquivalencePodGenerator) getEquivalencePod(pod *v1.Pod) interface{} {
|
||||
// For now we only consider pods:
|
||||
// 1. OwnerReferences is Controller
|
||||
// 2. with same OwnerReferences
|
||||
// 3. with same PVC claim
|
||||
// to be equivalent
|
||||
for _, ref := range pod.OwnerReferences {
|
||||
if ref.Controller != nil && *ref.Controller {
|
||||
pvcSet, err := e.getPVCSet(pod)
|
||||
if err == nil {
|
||||
// A pod can only belongs to one controller, so let's return.
|
||||
return &EquivalencePod{
|
||||
ControllerRef: ref,
|
||||
PVCSet: pvcSet,
|
||||
}
|
||||
}
|
||||
|
||||
// If error encountered, log warning and return nil (i.e. no equivalent pod found)
|
||||
glog.Warningf("[EquivalencePodGenerator] for pod: %v failed due to: %v", pod.GetName(), err)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// getPVCSet returns a set of PVC UIDs of given pod.
|
||||
func (e *EquivalencePodGenerator) getPVCSet(pod *v1.Pod) (sets.String, error) {
|
||||
result := sets.NewString()
|
||||
for _, volume := range pod.Spec.Volumes {
|
||||
if volume.PersistentVolumeClaim == nil {
|
||||
continue
|
||||
}
|
||||
pvcName := volume.PersistentVolumeClaim.ClaimName
|
||||
pvc, err := e.pvcInfo.GetPersistentVolumeClaimInfo(pod.GetNamespace(), pvcName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result.Insert(string(pvc.UID))
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// EquivalencePod is a group of pod attributes which can be reused as equivalence to schedule other pods.
|
||||
type EquivalencePod struct {
|
||||
ControllerRef metav1.OwnerReference
|
||||
PVCSet sets.String
|
||||
}
|
||||
|
||||
// portsConflict check whether existingPorts and wantPorts conflict with each other
|
||||
// return true if we have a conflict
|
||||
func portsConflict(existingPorts schedutil.HostPortInfo, wantPorts []*v1.ContainerPort) bool {
|
||||
|
|
|
@ -78,6 +78,9 @@ type PredicateFailureReason interface {
|
|||
GetReason() string
|
||||
}
|
||||
|
||||
// GetEquivalencePodFunc is a function that gets a EquivalencePod from a pod.
|
||||
type GetEquivalencePodFunc func(pod *v1.Pod) interface{}
|
||||
|
||||
// NodeLister interface represents anything that can list nodes for a scheduler.
|
||||
type NodeLister interface {
|
||||
// We explicitly return []*v1.Node, instead of v1.NodeList, to avoid
|
||||
|
|
|
@ -77,6 +77,13 @@ func init() {
|
|||
// Fit is determined by node selector query.
|
||||
factory.RegisterFitPredicate(predicates.MatchNodeSelectorPred, predicates.PodMatchNodeSelector)
|
||||
|
||||
// Use equivalence class to speed up heavy predicates phase.
|
||||
factory.RegisterGetEquivalencePodFunction(
|
||||
func(args factory.PluginFactoryArgs) algorithm.GetEquivalencePodFunc {
|
||||
return predicates.NewEquivalencePodGenerator(args.PVCInfo)
|
||||
},
|
||||
)
|
||||
|
||||
// ServiceSpreadingPriority is a priority config factory that spreads pods by minimizing
|
||||
// the number of pods (belonging to the same service) on the same node.
|
||||
// Register the factory so that it's available, but do not include it as part of the default priorities
|
||||
|
|
|
@ -23,7 +23,6 @@ import (
|
|||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/kubernetes/pkg/scheduler/algorithm"
|
||||
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
|
||||
hashutil "k8s.io/kubernetes/pkg/util/hash"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
@ -38,7 +37,8 @@ const maxCacheEntries = 100
|
|||
// 2. function to get equivalence pod
|
||||
type EquivalenceCache struct {
|
||||
sync.RWMutex
|
||||
algorithmCache map[string]AlgorithmCache
|
||||
getEquivalencePod algorithm.GetEquivalencePodFunc
|
||||
algorithmCache map[string]AlgorithmCache
|
||||
}
|
||||
|
||||
// The AlgorithmCache stores PredicateMap with predicate name as key
|
||||
|
@ -62,11 +62,11 @@ func newAlgorithmCache() AlgorithmCache {
|
|||
}
|
||||
}
|
||||
|
||||
// NewEquivalenceCache returns EquivalenceCache to speed up predicates by caching
|
||||
// result from previous scheduling.
|
||||
func NewEquivalenceCache() *EquivalenceCache {
|
||||
// NewEquivalenceCache creates a EquivalenceCache object.
|
||||
func NewEquivalenceCache(getEquivalencePodFunc algorithm.GetEquivalencePodFunc) *EquivalenceCache {
|
||||
return &EquivalenceCache{
|
||||
algorithmCache: make(map[string]AlgorithmCache),
|
||||
getEquivalencePod: getEquivalencePodFunc,
|
||||
algorithmCache: make(map[string]AlgorithmCache),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -191,21 +191,21 @@ func (ec *EquivalenceCache) InvalidateCachedPredicateItemForPodAdd(pod *v1.Pod,
|
|||
// it will also fits to equivalence class of existing pods
|
||||
|
||||
// GeneralPredicates: will always be affected by adding a new pod
|
||||
invalidPredicates := sets.NewString(predicates.GeneralPred)
|
||||
invalidPredicates := sets.NewString("GeneralPredicates")
|
||||
|
||||
// MaxPDVolumeCountPredicate: we check the volumes of pod to make decision.
|
||||
for _, vol := range pod.Spec.Volumes {
|
||||
if vol.PersistentVolumeClaim != nil {
|
||||
invalidPredicates.Insert(predicates.MaxEBSVolumeCountPred, predicates.MaxGCEPDVolumeCountPred, predicates.MaxAzureDiskVolumeCountPred)
|
||||
invalidPredicates.Insert("MaxEBSVolumeCount", "MaxGCEPDVolumeCount", "MaxAzureDiskVolumeCount")
|
||||
} else {
|
||||
if vol.AWSElasticBlockStore != nil {
|
||||
invalidPredicates.Insert(predicates.MaxEBSVolumeCountPred)
|
||||
invalidPredicates.Insert("MaxEBSVolumeCount")
|
||||
}
|
||||
if vol.GCEPersistentDisk != nil {
|
||||
invalidPredicates.Insert(predicates.MaxGCEPDVolumeCountPred)
|
||||
invalidPredicates.Insert("MaxGCEPDVolumeCount")
|
||||
}
|
||||
if vol.AzureDisk != nil {
|
||||
invalidPredicates.Insert(predicates.MaxAzureDiskVolumeCountPred)
|
||||
invalidPredicates.Insert("MaxAzureDiskVolumeCount")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -219,11 +219,9 @@ type equivalenceClassInfo struct {
|
|||
hash uint64
|
||||
}
|
||||
|
||||
// getEquivalenceClassInfo returns a hash of the given pod.
|
||||
// The hashing function returns the same value for any two pods that are
|
||||
// equivalent from the perspective of scheduling.
|
||||
// getEquivalenceClassInfo returns the equivalence class of given pod.
|
||||
func (ec *EquivalenceCache) getEquivalenceClassInfo(pod *v1.Pod) *equivalenceClassInfo {
|
||||
equivalencePod := getEquivalenceHash(pod)
|
||||
equivalencePod := ec.getEquivalencePod(pod)
|
||||
if equivalencePod != nil {
|
||||
hash := fnv.New32a()
|
||||
hashutil.DeepHashObject(hash, equivalencePod)
|
||||
|
@ -233,60 +231,3 @@ func (ec *EquivalenceCache) getEquivalenceClassInfo(pod *v1.Pod) *equivalenceCla
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// equivalencePod is the set of pod attributes which must match for two pods to
|
||||
// be considered equivalent for scheduling purposes. For correctness, this must
|
||||
// include any Pod field which is used by a FitPredicate.
|
||||
//
|
||||
// NOTE: For equivalence hash to be formally correct, lists and maps in the
|
||||
// equivalencePod should be normalized. (e.g. by sorting them) However, the
|
||||
// vast majority of equivalent pod classes are expected to be created from a
|
||||
// single pod template, so they will all have the same ordering.
|
||||
type equivalencePod struct {
|
||||
Namespace *string
|
||||
Labels map[string]string
|
||||
Affinity *v1.Affinity
|
||||
Containers []v1.Container // See note about ordering
|
||||
InitContainers []v1.Container // See note about ordering
|
||||
NodeName *string
|
||||
NodeSelector map[string]string
|
||||
Tolerations []v1.Toleration
|
||||
Volumes []v1.Volume // See note about ordering
|
||||
}
|
||||
|
||||
// getEquivalenceHash returns the equivalencePod for a Pod.
|
||||
func getEquivalenceHash(pod *v1.Pod) *equivalencePod {
|
||||
ep := &equivalencePod{
|
||||
Namespace: &pod.Namespace,
|
||||
Labels: pod.Labels,
|
||||
Affinity: pod.Spec.Affinity,
|
||||
Containers: pod.Spec.Containers,
|
||||
InitContainers: pod.Spec.InitContainers,
|
||||
NodeName: &pod.Spec.NodeName,
|
||||
NodeSelector: pod.Spec.NodeSelector,
|
||||
Tolerations: pod.Spec.Tolerations,
|
||||
Volumes: pod.Spec.Volumes,
|
||||
}
|
||||
// DeepHashObject considers nil and empty slices to be different. Normalize them.
|
||||
if len(ep.Containers) == 0 {
|
||||
ep.Containers = nil
|
||||
}
|
||||
if len(ep.InitContainers) == 0 {
|
||||
ep.InitContainers = nil
|
||||
}
|
||||
if len(ep.Tolerations) == 0 {
|
||||
ep.Tolerations = nil
|
||||
}
|
||||
if len(ep.Volumes) == 0 {
|
||||
ep.Volumes = nil
|
||||
}
|
||||
// Normalize empty maps also.
|
||||
if len(ep.Labels) == 0 {
|
||||
ep.Labels = nil
|
||||
}
|
||||
if len(ep.NodeSelector) == 0 {
|
||||
ep.NodeSelector = nil
|
||||
}
|
||||
// TODO(misterikkit): Also normalize nested maps and slices.
|
||||
return ep
|
||||
}
|
||||
|
|
|
@ -21,132 +21,12 @@ import (
|
|||
"testing"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/kubernetes/pkg/scheduler/algorithm"
|
||||
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
|
||||
)
|
||||
|
||||
// makeBasicPod returns a Pod object with many of the fields populated.
|
||||
func makeBasicPod(name string) *v1.Pod {
|
||||
isController := true
|
||||
return &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name,
|
||||
Namespace: "test-ns",
|
||||
Labels: map[string]string{"app": "web", "env": "prod"},
|
||||
OwnerReferences: []metav1.OwnerReference{
|
||||
{
|
||||
APIVersion: "v1",
|
||||
Kind: "ReplicationController",
|
||||
Name: "rc",
|
||||
UID: "123",
|
||||
Controller: &isController,
|
||||
},
|
||||
},
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Affinity: &v1.Affinity{
|
||||
NodeAffinity: &v1.NodeAffinity{
|
||||
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
|
||||
NodeSelectorTerms: []v1.NodeSelectorTerm{
|
||||
{
|
||||
MatchExpressions: []v1.NodeSelectorRequirement{
|
||||
{
|
||||
Key: "failure-domain.beta.kubernetes.io/zone",
|
||||
Operator: "Exists",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
PodAffinity: &v1.PodAffinity{
|
||||
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
|
||||
{
|
||||
LabelSelector: &metav1.LabelSelector{
|
||||
MatchLabels: map[string]string{"app": "db"}},
|
||||
TopologyKey: "kubernetes.io/hostname",
|
||||
},
|
||||
},
|
||||
},
|
||||
PodAntiAffinity: &v1.PodAntiAffinity{
|
||||
RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
|
||||
{
|
||||
LabelSelector: &metav1.LabelSelector{
|
||||
MatchLabels: map[string]string{"app": "web"}},
|
||||
TopologyKey: "kubernetes.io/hostname",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
InitContainers: []v1.Container{
|
||||
{
|
||||
Name: "init-pause",
|
||||
Image: "gcr.io/google_containers/pause",
|
||||
Resources: v1.ResourceRequirements{
|
||||
Limits: v1.ResourceList{
|
||||
"cpu": resource.MustParse("1"),
|
||||
"mem": resource.MustParse("100Mi"),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Name: "pause",
|
||||
Image: "gcr.io/google_containers/pause",
|
||||
Resources: v1.ResourceRequirements{
|
||||
Limits: v1.ResourceList{
|
||||
"cpu": resource.MustParse("1"),
|
||||
"mem": resource.MustParse("100Mi"),
|
||||
},
|
||||
},
|
||||
VolumeMounts: []v1.VolumeMount{
|
||||
{
|
||||
Name: "nfs",
|
||||
MountPath: "/srv/data",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
NodeSelector: map[string]string{"node-type": "awesome"},
|
||||
Tolerations: []v1.Toleration{
|
||||
{
|
||||
Effect: "NoSchedule",
|
||||
Key: "experimental",
|
||||
Operator: "Exists",
|
||||
},
|
||||
},
|
||||
Volumes: []v1.Volume{
|
||||
{
|
||||
VolumeSource: v1.VolumeSource{
|
||||
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
|
||||
ClaimName: "someEBSVol1",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
VolumeSource: v1.VolumeSource{
|
||||
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
|
||||
ClaimName: "someEBSVol2",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "nfs",
|
||||
VolumeSource: v1.VolumeSource{
|
||||
NFS: &v1.NFSVolumeSource{
|
||||
Server: "nfs.corp.example.com",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
type predicateItemType struct {
|
||||
fit bool
|
||||
reasons []algorithm.PredicateFailureReason
|
||||
|
@ -190,7 +70,9 @@ func TestUpdateCachedPredicateItem(t *testing.T) {
|
|||
},
|
||||
}
|
||||
for _, test := range tests {
|
||||
ecache := NewEquivalenceCache()
|
||||
// this case does not need to calculate equivalence hash, just pass an empty function
|
||||
fakeGetEquivalencePodFunc := func(pod *v1.Pod) interface{} { return nil }
|
||||
ecache := NewEquivalenceCache(fakeGetEquivalencePodFunc)
|
||||
if test.expectPredicateMap {
|
||||
ecache.algorithmCache[test.nodeName] = newAlgorithmCache()
|
||||
predicateItem := HostPredicate{
|
||||
|
@ -309,7 +191,9 @@ func TestPredicateWithECache(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, test := range tests {
|
||||
ecache := NewEquivalenceCache()
|
||||
// this case does not need to calculate equivalence hash, just pass an empty function
|
||||
fakeGetEquivalencePodFunc := func(pod *v1.Pod) interface{} { return nil }
|
||||
ecache := NewEquivalenceCache(fakeGetEquivalencePodFunc)
|
||||
// set cached item to equivalence cache
|
||||
ecache.UpdateCachedPredicateItem(
|
||||
test.podName,
|
||||
|
@ -356,46 +240,205 @@ func TestPredicateWithECache(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestGetEquivalenceHash(t *testing.T) {
|
||||
func TestGetHashEquivalencePod(t *testing.T) {
|
||||
|
||||
ecache := NewEquivalenceCache()
|
||||
testNamespace := "test"
|
||||
|
||||
pod1 := makeBasicPod("pod1")
|
||||
pod2 := makeBasicPod("pod2")
|
||||
|
||||
pod3 := makeBasicPod("pod3")
|
||||
pod3.Spec.Volumes = []v1.Volume{
|
||||
pvcInfo := predicates.FakePersistentVolumeClaimInfo{
|
||||
{
|
||||
VolumeSource: v1.VolumeSource{
|
||||
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
|
||||
ClaimName: "someEBSVol111",
|
||||
ObjectMeta: metav1.ObjectMeta{UID: "someEBSVol1", Name: "someEBSVol1", Namespace: testNamespace},
|
||||
Spec: v1.PersistentVolumeClaimSpec{VolumeName: "someEBSVol1"},
|
||||
},
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{UID: "someEBSVol2", Name: "someEBSVol2", Namespace: testNamespace},
|
||||
Spec: v1.PersistentVolumeClaimSpec{VolumeName: "someNonEBSVol"},
|
||||
},
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{UID: "someEBSVol3-0", Name: "someEBSVol3-0", Namespace: testNamespace},
|
||||
Spec: v1.PersistentVolumeClaimSpec{VolumeName: "pvcWithDeletedPV"},
|
||||
},
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{UID: "someEBSVol3-1", Name: "someEBSVol3-1", Namespace: testNamespace},
|
||||
Spec: v1.PersistentVolumeClaimSpec{VolumeName: "anotherPVCWithDeletedPV"},
|
||||
},
|
||||
}
|
||||
|
||||
// use default equivalence class generator
|
||||
ecache := NewEquivalenceCache(predicates.NewEquivalencePodGenerator(pvcInfo))
|
||||
|
||||
isController := true
|
||||
|
||||
pod1 := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pod1",
|
||||
Namespace: testNamespace,
|
||||
OwnerReferences: []metav1.OwnerReference{
|
||||
{
|
||||
APIVersion: "v1",
|
||||
Kind: "ReplicationController",
|
||||
Name: "rc",
|
||||
UID: "123",
|
||||
Controller: &isController,
|
||||
},
|
||||
},
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Volumes: []v1.Volume{
|
||||
{
|
||||
VolumeSource: v1.VolumeSource{
|
||||
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
|
||||
ClaimName: "someEBSVol1",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
VolumeSource: v1.VolumeSource{
|
||||
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
|
||||
ClaimName: "someEBSVol2",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
pod4 := makeBasicPod("pod4")
|
||||
pod4.Spec.Volumes = []v1.Volume{
|
||||
{
|
||||
VolumeSource: v1.VolumeSource{
|
||||
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
|
||||
ClaimName: "someEBSVol222",
|
||||
pod2 := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pod2",
|
||||
Namespace: testNamespace,
|
||||
OwnerReferences: []metav1.OwnerReference{
|
||||
{
|
||||
APIVersion: "v1",
|
||||
Kind: "ReplicationController",
|
||||
Name: "rc",
|
||||
UID: "123",
|
||||
Controller: &isController,
|
||||
},
|
||||
},
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Volumes: []v1.Volume{
|
||||
{
|
||||
VolumeSource: v1.VolumeSource{
|
||||
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
|
||||
ClaimName: "someEBSVol2",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
VolumeSource: v1.VolumeSource{
|
||||
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
|
||||
ClaimName: "someEBSVol1",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
pod5 := makeBasicPod("pod5")
|
||||
pod5.Spec.Volumes = []v1.Volume{}
|
||||
pod3 := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pod3",
|
||||
Namespace: testNamespace,
|
||||
OwnerReferences: []metav1.OwnerReference{
|
||||
{
|
||||
APIVersion: "v1",
|
||||
Kind: "ReplicationController",
|
||||
Name: "rc",
|
||||
UID: "567",
|
||||
Controller: &isController,
|
||||
},
|
||||
},
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Volumes: []v1.Volume{
|
||||
{
|
||||
VolumeSource: v1.VolumeSource{
|
||||
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
|
||||
ClaimName: "someEBSVol3-1",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
pod6 := makeBasicPod("pod6")
|
||||
pod6.Spec.Volumes = nil
|
||||
pod4 := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pod4",
|
||||
Namespace: testNamespace,
|
||||
OwnerReferences: []metav1.OwnerReference{
|
||||
{
|
||||
APIVersion: "v1",
|
||||
Kind: "ReplicationController",
|
||||
Name: "rc",
|
||||
UID: "567",
|
||||
Controller: &isController,
|
||||
},
|
||||
},
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Volumes: []v1.Volume{
|
||||
{
|
||||
VolumeSource: v1.VolumeSource{
|
||||
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
|
||||
ClaimName: "someEBSVol3-0",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
pod7 := makeBasicPod("pod7")
|
||||
pod7.Spec.NodeSelector = nil
|
||||
pod5 := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pod5",
|
||||
Namespace: testNamespace,
|
||||
},
|
||||
}
|
||||
|
||||
pod8 := makeBasicPod("pod8")
|
||||
pod8.Spec.NodeSelector = make(map[string]string)
|
||||
pod6 := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pod6",
|
||||
Namespace: testNamespace,
|
||||
OwnerReferences: []metav1.OwnerReference{
|
||||
{
|
||||
APIVersion: "v1",
|
||||
Kind: "ReplicationController",
|
||||
Name: "rc",
|
||||
UID: "567",
|
||||
Controller: &isController,
|
||||
},
|
||||
},
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Volumes: []v1.Volume{
|
||||
{
|
||||
VolumeSource: v1.VolumeSource{
|
||||
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
|
||||
ClaimName: "no-exists-pvc",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
pod7 := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pod7",
|
||||
Namespace: testNamespace,
|
||||
OwnerReferences: []metav1.OwnerReference{
|
||||
{
|
||||
APIVersion: "v1",
|
||||
Kind: "ReplicationController",
|
||||
Name: "rc",
|
||||
UID: "567",
|
||||
Controller: &isController,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
type podInfo struct {
|
||||
pod *v1.Pod
|
||||
|
@ -403,41 +446,39 @@ func TestGetEquivalenceHash(t *testing.T) {
|
|||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
podInfoList []podInfo
|
||||
isEquivalent bool
|
||||
}{
|
||||
// pods with same controllerRef and same pvc claim
|
||||
{
|
||||
name: "pods with everything the same except name",
|
||||
podInfoList: []podInfo{
|
||||
{pod: pod1, hashIsValid: true},
|
||||
{pod: pod2, hashIsValid: true},
|
||||
},
|
||||
isEquivalent: true,
|
||||
},
|
||||
// pods with same controllerRef but different pvc claim
|
||||
{
|
||||
name: "pods that only differ in their PVC volume sources",
|
||||
podInfoList: []podInfo{
|
||||
{pod: pod3, hashIsValid: true},
|
||||
{pod: pod4, hashIsValid: true},
|
||||
},
|
||||
isEquivalent: false,
|
||||
},
|
||||
// pod without controllerRef
|
||||
{
|
||||
name: "pods that have no volumes, but one uses nil and one uses an empty slice",
|
||||
podInfoList: []podInfo{
|
||||
{pod: pod5, hashIsValid: true},
|
||||
{pod: pod6, hashIsValid: true},
|
||||
{pod: pod5, hashIsValid: false},
|
||||
},
|
||||
isEquivalent: true,
|
||||
isEquivalent: false,
|
||||
},
|
||||
// pods with same controllerRef but one has non-exists pvc claim
|
||||
{
|
||||
name: "pods that have no NodeSelector, but one uses nil and one uses an empty map",
|
||||
podInfoList: []podInfo{
|
||||
{pod: pod6, hashIsValid: false},
|
||||
{pod: pod7, hashIsValid: true},
|
||||
{pod: pod8, hashIsValid: true},
|
||||
},
|
||||
isEquivalent: true,
|
||||
isEquivalent: false,
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -447,30 +488,28 @@ func TestGetEquivalenceHash(t *testing.T) {
|
|||
)
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
for i, podInfo := range test.podInfoList {
|
||||
testPod := podInfo.pod
|
||||
eclassInfo := ecache.getEquivalenceClassInfo(testPod)
|
||||
if eclassInfo == nil && podInfo.hashIsValid {
|
||||
t.Errorf("Failed: pod %v is expected to have valid hash", testPod)
|
||||
}
|
||||
for i, podInfo := range test.podInfoList {
|
||||
testPod := podInfo.pod
|
||||
eclassInfo := ecache.getEquivalenceClassInfo(testPod)
|
||||
if eclassInfo == nil && podInfo.hashIsValid {
|
||||
t.Errorf("Failed: pod %v is expected to have valid hash", testPod)
|
||||
}
|
||||
|
||||
if eclassInfo != nil {
|
||||
// NOTE(harry): the first element will be used as target so
|
||||
// this logic can't verify more than two inequivalent pods
|
||||
if i == 0 {
|
||||
targetHash = eclassInfo.hash
|
||||
targetPodInfo = podInfo
|
||||
} else {
|
||||
if targetHash != eclassInfo.hash {
|
||||
if test.isEquivalent {
|
||||
t.Errorf("Failed: pod: %v is expected to be equivalent to: %v", testPod, targetPodInfo.pod)
|
||||
}
|
||||
if eclassInfo != nil {
|
||||
// NOTE(harry): the first element will be used as target so
|
||||
// this logic can't verify more than two inequivalent pods
|
||||
if i == 0 {
|
||||
targetHash = eclassInfo.hash
|
||||
targetPodInfo = podInfo
|
||||
} else {
|
||||
if targetHash != eclassInfo.hash {
|
||||
if test.isEquivalent {
|
||||
t.Errorf("Failed: pod: %v is expected to be equivalent to: %v", testPod, targetPodInfo.pod)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -515,7 +554,9 @@ func TestInvalidateCachedPredicateItemOfAllNodes(t *testing.T) {
|
|||
},
|
||||
},
|
||||
}
|
||||
ecache := NewEquivalenceCache()
|
||||
// this case does not need to calculate equivalence hash, just pass an empty function
|
||||
fakeGetEquivalencePodFunc := func(pod *v1.Pod) interface{} { return nil }
|
||||
ecache := NewEquivalenceCache(fakeGetEquivalencePodFunc)
|
||||
|
||||
for _, test := range tests {
|
||||
// set cached item to equivalence cache
|
||||
|
@ -582,7 +623,9 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) {
|
|||
},
|
||||
},
|
||||
}
|
||||
ecache := NewEquivalenceCache()
|
||||
// this case does not need to calculate equivalence hash, just pass an empty function
|
||||
fakeGetEquivalencePodFunc := func(pod *v1.Pod) interface{} { return nil }
|
||||
ecache := NewEquivalenceCache(fakeGetEquivalencePodFunc)
|
||||
|
||||
for _, test := range tests {
|
||||
// set cached item to equivalence cache
|
||||
|
@ -606,10 +649,3 @@ func TestInvalidateAllCachedPredicateItemOfNode(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkEquivalenceHash(b *testing.B) {
|
||||
pod := makeBasicPod("test")
|
||||
for i := 0; i < b.N; i++ {
|
||||
getEquivalenceHash(pod)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -455,6 +455,10 @@ func podFitsOnNode(
|
|||
var (
|
||||
eCacheAvailable bool
|
||||
failedPredicates []algorithm.PredicateFailureReason
|
||||
invalid bool
|
||||
fit bool
|
||||
reasons []algorithm.PredicateFailureReason
|
||||
err error
|
||||
)
|
||||
predicateResults := make(map[string]HostPredicate)
|
||||
|
||||
|
@ -490,54 +494,38 @@ func podFitsOnNode(
|
|||
// when pods are nominated or their nominations change.
|
||||
eCacheAvailable = equivCacheInfo != nil && !podsAdded
|
||||
for _, predicateKey := range predicates.Ordering() {
|
||||
var (
|
||||
fit bool
|
||||
reasons []algorithm.PredicateFailureReason
|
||||
err error
|
||||
)
|
||||
//TODO (yastij) : compute average predicate restrictiveness to export it as Prometheus metric
|
||||
if predicate, exist := predicateFuncs[predicateKey]; exist {
|
||||
// Use an in-line function to guarantee invocation of ecache.Unlock()
|
||||
// when the in-line function returns.
|
||||
func() {
|
||||
var invalid bool
|
||||
if eCacheAvailable {
|
||||
// Lock ecache here to avoid a race condition against cache invalidation invoked
|
||||
// in event handlers. This race has existed despite locks in eCache implementation.
|
||||
ecache.Lock()
|
||||
// PredicateWithECache will return its cached predicate results.
|
||||
fit, reasons, invalid = ecache.PredicateWithECache(pod.GetName(), info.Node().GetName(), predicateKey, equivCacheInfo.hash, false)
|
||||
}
|
||||
|
||||
if !eCacheAvailable || invalid {
|
||||
// we need to execute predicate functions since equivalence cache does not work
|
||||
fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
|
||||
if err != nil {
|
||||
return false, []algorithm.PredicateFailureReason{}, err
|
||||
}
|
||||
if eCacheAvailable {
|
||||
// Lock ecache here to avoid a race condition against cache invalidation invoked
|
||||
// in event handlers. This race has existed despite locks in equivClassCacheimplementation.
|
||||
ecache.Lock()
|
||||
defer ecache.Unlock()
|
||||
// PredicateWithECache will return its cached predicate results.
|
||||
fit, reasons, invalid = ecache.PredicateWithECache(
|
||||
pod.GetName(), info.Node().GetName(),
|
||||
predicateKey, equivCacheInfo.hash, false)
|
||||
}
|
||||
|
||||
if !eCacheAvailable || invalid {
|
||||
// we need to execute predicate functions since equivalence cache does not work
|
||||
fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if eCacheAvailable {
|
||||
// Store data to update equivClassCacheafter this loop.
|
||||
if res, exists := predicateResults[predicateKey]; exists {
|
||||
res.Fit = res.Fit && fit
|
||||
res.FailReasons = append(res.FailReasons, reasons...)
|
||||
predicateResults[predicateKey] = res
|
||||
} else {
|
||||
predicateResults[predicateKey] = HostPredicate{Fit: fit, FailReasons: reasons}
|
||||
}
|
||||
result := predicateResults[predicateKey]
|
||||
ecache.UpdateCachedPredicateItem(
|
||||
pod.GetName(), info.Node().GetName(),
|
||||
predicateKey, result.Fit, result.FailReasons, equivCacheInfo.hash, false)
|
||||
// Store data to update eCache after this loop.
|
||||
if res, exists := predicateResults[predicateKey]; exists {
|
||||
res.Fit = res.Fit && fit
|
||||
res.FailReasons = append(res.FailReasons, reasons...)
|
||||
predicateResults[predicateKey] = res
|
||||
} else {
|
||||
predicateResults[predicateKey] = HostPredicate{Fit: fit, FailReasons: reasons}
|
||||
}
|
||||
result := predicateResults[predicateKey]
|
||||
ecache.UpdateCachedPredicateItem(pod.GetName(), info.Node().GetName(), predicateKey, result.Fit, result.FailReasons, equivCacheInfo.hash, false)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return false, []algorithm.PredicateFailureReason{}, err
|
||||
if eCacheAvailable {
|
||||
ecache.Unlock()
|
||||
}
|
||||
|
||||
if !fit {
|
||||
|
@ -545,9 +533,7 @@ func podFitsOnNode(
|
|||
failedPredicates = append(failedPredicates, reasons...)
|
||||
// if alwaysCheckAllPredicates is false, short circuit all predicates when one predicate fails.
|
||||
if !alwaysCheckAllPredicates {
|
||||
glog.V(5).Infoln("since alwaysCheckAllPredicates has not been set, the predicate" +
|
||||
"evaluation is short circuited and there are chances" +
|
||||
"of other predicates failing as well.")
|
||||
glog.V(5).Infoln("since alwaysCheckAllPredicates has not been set, the predicate evaluation is short circuited and there are chances of other predicates failing as well.")
|
||||
break
|
||||
}
|
||||
}
|
||||
|
|
|
@ -136,8 +136,6 @@ type configFactory struct {
|
|||
alwaysCheckAllPredicates bool
|
||||
}
|
||||
|
||||
var _ scheduler.Configurator = &configFactory{}
|
||||
|
||||
// NewConfigFactory initializes the default implementation of a Configurator To encourage eventual privatization of the struct type, we only
|
||||
// return the interface.
|
||||
func NewConfigFactory(
|
||||
|
@ -1073,8 +1071,14 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
|
|||
}
|
||||
|
||||
// Init equivalence class cache
|
||||
if c.enableEquivalenceClassCache {
|
||||
c.equivalencePodCache = core.NewEquivalenceCache()
|
||||
if c.enableEquivalenceClassCache && getEquivalencePodFuncFactory != nil {
|
||||
pluginArgs, err := c.getPluginArgs()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.equivalencePodCache = core.NewEquivalenceCache(
|
||||
getEquivalencePodFuncFactory(*pluginArgs),
|
||||
)
|
||||
glog.Info("Created equivalence class cache")
|
||||
}
|
||||
|
||||
|
|
|
@ -75,6 +75,9 @@ type PriorityConfigFactory struct {
|
|||
Weight int
|
||||
}
|
||||
|
||||
// EquivalencePodFuncFactory produces a function to get equivalence class for given pod.
|
||||
type EquivalencePodFuncFactory func(PluginFactoryArgs) algorithm.GetEquivalencePodFunc
|
||||
|
||||
var (
|
||||
schedulerFactoryMutex sync.Mutex
|
||||
|
||||
|
@ -87,6 +90,9 @@ var (
|
|||
// Registered metadata producers
|
||||
priorityMetadataProducer PriorityMetadataProducerFactory
|
||||
predicateMetadataProducer PredicateMetadataProducerFactory
|
||||
|
||||
// get equivalence pod function
|
||||
getEquivalencePodFuncFactory EquivalencePodFuncFactory
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -340,6 +346,11 @@ func RegisterCustomPriorityFunction(policy schedulerapi.PriorityPolicy) string {
|
|||
return RegisterPriorityConfigFactory(policy.Name, *pcf)
|
||||
}
|
||||
|
||||
// RegisterGetEquivalencePodFunction registers equivalenceFuncFactory to produce equivalence class for given pod.
|
||||
func RegisterGetEquivalencePodFunction(equivalenceFuncFactory EquivalencePodFuncFactory) {
|
||||
getEquivalencePodFuncFactory = equivalenceFuncFactory
|
||||
}
|
||||
|
||||
// IsPriorityFunctionRegistered is useful for testing providers.
|
||||
func IsPriorityFunctionRegistered(name string) bool {
|
||||
schedulerFactoryMutex.Lock()
|
||||
|
|
Loading…
Reference in New Issue