Populate NodeAffinity on top of labels for cloud based PersistentVolumes

Signed-off-by: Deep Debroy <ddebroy@docker.com>
pull/8/head
Deep Debroy 2018-06-26 19:48:03 -07:00
parent 0d9c432542
commit 074dc6487b
8 changed files with 749 additions and 50 deletions

View File

@ -283,6 +283,20 @@ func NodeSelectorRequirementsAsFieldSelector(nsm []v1.NodeSelectorRequirement) (
return fields.AndSelectors(selectors...), nil
}
// NodeSelectorRequirementKeysExistInNodeSelectorTerms checks if a NodeSelectorTerm with key is already specified in terms
func NodeSelectorRequirementKeysExistInNodeSelectorTerms(reqs []v1.NodeSelectorRequirement, terms []v1.NodeSelectorTerm) bool {
for _, req := range reqs {
for _, term := range terms {
for _, r := range term.MatchExpressions {
if r.Key == req.Key {
return true
}
}
}
}
return false
}
// MatchNodeSelectorTerms checks whether the node labels and fields match node selector terms in ORed;
// nil or empty term matches no objects.
func MatchNodeSelectorTerms(

View File

@ -968,3 +968,193 @@ func TestMatchTopologySelectorTerms(t *testing.T) {
})
}
}
func TestNodeSelectorRequirementKeyExistsInNodeSelectorTerms(t *testing.T) {
tests := []struct {
name string
reqs []v1.NodeSelectorRequirement
terms []v1.NodeSelectorTerm
exists bool
}{
{
name: "empty set of keys in empty set of terms",
reqs: []v1.NodeSelectorRequirement{},
terms: []v1.NodeSelectorTerm{},
exists: false,
},
{
name: "key existence in terms with all keys specified",
reqs: []v1.NodeSelectorRequirement{
{
Key: "key1",
Operator: v1.NodeSelectorOpIn,
Values: []string{"test-value1"},
},
{
Key: "key2",
Operator: v1.NodeSelectorOpIn,
Values: []string{"test-value2"},
},
},
terms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "key2",
Operator: v1.NodeSelectorOpIn,
Values: []string{"test-value2"},
},
{
Key: "key3",
Operator: v1.NodeSelectorOpIn,
Values: []string{"test-value3"},
},
},
},
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "key1",
Operator: v1.NodeSelectorOpIn,
Values: []string{"test-value11, test-value12"},
},
{
Key: "key4",
Operator: v1.NodeSelectorOpIn,
Values: []string{"test-value41, test-value42"},
},
},
},
},
exists: true,
},
{
name: "key existence in terms with one of the keys specfied",
reqs: []v1.NodeSelectorRequirement{
{
Key: "key1",
Operator: v1.NodeSelectorOpIn,
Values: []string{"test-value1"},
},
{
Key: "key2",
Operator: v1.NodeSelectorOpIn,
Values: []string{"test-value2"},
},
{
Key: "key3",
Operator: v1.NodeSelectorOpIn,
Values: []string{"test-value3"},
},
{
Key: "key6",
Operator: v1.NodeSelectorOpIn,
Values: []string{"test-value6"},
},
},
terms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "key2",
Operator: v1.NodeSelectorOpIn,
Values: []string{"test-value2"},
}, {
Key: "key4",
Operator: v1.NodeSelectorOpIn,
Values: []string{"test-value4"},
},
},
},
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "key5",
Operator: v1.NodeSelectorOpIn,
Values: []string{"test-value5"},
},
},
},
},
exists: true,
},
{
name: "key existence in terms without any of the keys specified",
reqs: []v1.NodeSelectorRequirement{
{
Key: "key2",
Operator: v1.NodeSelectorOpIn,
Values: []string{"test-value2"},
},
{
Key: "key3",
Operator: v1.NodeSelectorOpIn,
Values: []string{"test-value3"},
},
},
terms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "key4",
Operator: v1.NodeSelectorOpIn,
Values: []string{"test-value"},
},
{
Key: "key5",
Operator: v1.NodeSelectorOpIn,
Values: []string{"test-value"},
},
},
},
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "key6",
Operator: v1.NodeSelectorOpIn,
Values: []string{"test-value"},
},
},
},
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "key7",
Operator: v1.NodeSelectorOpIn,
Values: []string{"test-value"},
},
{
Key: "key8",
Operator: v1.NodeSelectorOpIn,
Values: []string{"test-value"},
},
},
},
},
exists: false,
},
{
name: "key existence in empty set of terms",
reqs: []v1.NodeSelectorRequirement{
{
Key: "key2",
Operator: v1.NodeSelectorOpIn,
Values: []string{"test-value2"},
},
{
Key: "key3",
Operator: v1.NodeSelectorOpIn,
Values: []string{"test-value3"},
},
},
terms: []v1.NodeSelectorTerm{},
exists: false,
},
}
for _, test := range tests {
keyExists := NodeSelectorRequirementKeysExistInNodeSelectorTerms(test.reqs, test.terms)
if test.exists != keyExists {
t.Errorf("test %s failed. Expected %v but got %v", test.name, test.exists, keyExists)
}
}
}

View File

@ -15,12 +15,15 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/controller/cloud",
deps = [
"//pkg/api/v1/node:go_default_library",
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/cloudprovider:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/util/node:go_default_library",
"//pkg/features:go_default_library",
"//pkg/kubelet/apis:go_default_library",
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/util/node:go_default_library",
"//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
@ -30,6 +33,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",
@ -57,11 +61,13 @@ go_test(
"//pkg/controller/testutil:go_default_library",
"//pkg/kubelet/apis:go_default_library",
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/scheme:go_default_library",

View File

@ -34,6 +34,11 @@ import (
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
utilfeature "k8s.io/apiserver/pkg/util/feature"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/features"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
"k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
@ -70,7 +75,7 @@ func NewPersistentVolumeLabelController(
kubeClient: kubeClient,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pvLabels"),
}
pvlc.syncHandler = pvlc.addLabels
pvlc.syncHandler = pvlc.addLabelsAndAffinity
pvlc.pvlIndexer, pvlc.pvlController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
@ -166,7 +171,7 @@ func (pvlc *PersistentVolumeLabelController) processNextWorkItem() bool {
// AddLabels adds appropriate labels to persistent volumes and sets the
// volume as available if successful.
func (pvlc *PersistentVolumeLabelController) addLabels(key string) error {
func (pvlc *PersistentVolumeLabelController) addLabelsAndAffinity(key string) error {
_, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return fmt.Errorf("error getting name of volume %q to get volume from informer: %v", key, err)
@ -178,10 +183,10 @@ func (pvlc *PersistentVolumeLabelController) addLabels(key string) error {
return fmt.Errorf("error getting volume %s from informer: %v", name, err)
}
return pvlc.addLabelsToVolume(volume)
return pvlc.addLabelsAndAffinityToVolume(volume)
}
func (pvlc *PersistentVolumeLabelController) addLabelsToVolume(vol *v1.PersistentVolume) error {
func (pvlc *PersistentVolumeLabelController) addLabelsAndAffinityToVolume(vol *v1.PersistentVolume) error {
var volumeLabels map[string]string
// Only add labels if the next pending initializer.
if needsInitialization(vol.Initializers, initializerName) {
@ -202,11 +207,52 @@ func (pvlc *PersistentVolumeLabelController) addLabelsToVolume(vol *v1.Persisten
func (pvlc *PersistentVolumeLabelController) createPatch(vol *v1.PersistentVolume, volLabels map[string]string) ([]byte, error) {
volName := vol.Name
newVolume := vol.DeepCopyObject().(*v1.PersistentVolume)
populateAffinity := utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) && len(volLabels) != 0
if newVolume.Labels == nil {
newVolume.Labels = make(map[string]string)
}
requirements := make([]v1.NodeSelectorRequirement, 0)
for k, v := range volLabels {
newVolume.Labels[k] = v
// Set NodeSelectorRequirements based on the labels
if populateAffinity {
var values []string
if k == kubeletapis.LabelZoneFailureDomain {
zones, err := volumeutil.LabelZonesToSet(v)
if err != nil {
return nil, fmt.Errorf("failed to convert label string for Zone: %s to a Set", v)
}
values = zones.List()
} else {
values = []string{v}
}
requirements = append(requirements, v1.NodeSelectorRequirement{Key: k, Operator: v1.NodeSelectorOpIn, Values: values})
}
}
if populateAffinity {
if newVolume.Spec.NodeAffinity == nil {
newVolume.Spec.NodeAffinity = new(v1.VolumeNodeAffinity)
}
if newVolume.Spec.NodeAffinity.Required == nil {
newVolume.Spec.NodeAffinity.Required = new(v1.NodeSelector)
}
if len(newVolume.Spec.NodeAffinity.Required.NodeSelectorTerms) == 0 {
// Need atleast one term pre-allocated whose MatchExpressions can be appended to
newVolume.Spec.NodeAffinity.Required.NodeSelectorTerms = make([]v1.NodeSelectorTerm, 1)
}
// Populate NodeAffinity with requirements if there are no conflicting keys found
if v1helper.NodeSelectorRequirementKeysExistInNodeSelectorTerms(requirements, newVolume.Spec.NodeAffinity.Required.NodeSelectorTerms) {
glog.V(4).Info("NodeSelectorRequirements for cloud labels %v conflict with existing NodeAffinity %v. Skipping addition of NodeSelectorRequirements for cloud labels.",
requirements, newVolume.Spec.NodeAffinity)
} else {
for _, req := range requirements {
for i := range newVolume.Spec.NodeAffinity.Required.NodeSelectorTerms {
newVolume.Spec.NodeAffinity.Required.NodeSelectorTerms[i].MatchExpressions = append(newVolume.Spec.NodeAffinity.Required.NodeSelectorTerms[i].MatchExpressions, req)
}
}
}
}
newVolume.Initializers = removeInitializer(newVolume.Initializers, initializerName)
glog.V(4).Infof("removed initializer on PersistentVolume %s", newVolume.Name)

View File

@ -18,6 +18,7 @@ package cloud
import (
"encoding/json"
"reflect"
"testing"
"time"
@ -26,8 +27,11 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
)
@ -71,29 +75,292 @@ func TestCreatePatch(t *testing.T) {
},
},
}
testCases := map[string]struct {
vol v1.PersistentVolume
labels map[string]string
}{
"non-cloud PV": {
vol: ignoredPV,
labels: nil,
expectedAffinitya1b2MergedWithAWSPV := v1.VolumeNodeAffinity{
Required: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "a",
Operator: v1.NodeSelectorOpIn,
Values: []string{"1"},
},
{
Key: "b",
Operator: v1.NodeSelectorOpIn,
Values: []string{"2"},
},
},
},
},
},
"no labels": {
vol: awsPV,
labels: nil,
}
expectedAffinityZone1MergedWithAWSPV := v1.VolumeNodeAffinity{
Required: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: kubeletapis.LabelZoneFailureDomain,
Operator: v1.NodeSelectorOpIn,
Values: []string{"1"},
},
},
},
},
},
"cloudprovider returns nil, nil": {
vol: awsPV,
labels: nil,
}
expectedAffinityZonesMergedWithAWSPV := v1.VolumeNodeAffinity{
Required: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: kubeletapis.LabelZoneFailureDomain,
Operator: v1.NodeSelectorOpIn,
Values: []string{"1", "2", "3"},
},
},
},
},
},
"cloudprovider labels": {
vol: awsPV,
labels: map[string]string{"a": "1", "b": "2"},
}
awsPVWithAffinity := v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: "awsPV",
Initializers: &metav1.Initializers{
Pending: []metav1.Initializer{
{
Name: initializerName,
},
},
},
},
Spec: v1.PersistentVolumeSpec{
PersistentVolumeSource: v1.PersistentVolumeSource{
AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{
VolumeID: "123",
},
},
NodeAffinity: &v1.VolumeNodeAffinity{
Required: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "c",
Operator: v1.NodeSelectorOpIn,
Values: []string{"val1", "val2"},
},
{
Key: "d",
Operator: v1.NodeSelectorOpIn,
Values: []string{"val3"},
},
},
},
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "e",
Operator: v1.NodeSelectorOpIn,
Values: []string{"val4", "val5"},
},
},
},
},
},
},
},
}
expectedAffinitya1b2MergedWithAWSPVWithAffinity := v1.VolumeNodeAffinity{
Required: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "c",
Operator: v1.NodeSelectorOpIn,
Values: []string{"val1", "val2"},
},
{
Key: "d",
Operator: v1.NodeSelectorOpIn,
Values: []string{"val3"},
},
{
Key: "a",
Operator: v1.NodeSelectorOpIn,
Values: []string{"1"},
},
{
Key: "b",
Operator: v1.NodeSelectorOpIn,
Values: []string{"2"},
},
},
},
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "e",
Operator: v1.NodeSelectorOpIn,
Values: []string{"val4", "val5"},
},
{
Key: "a",
Operator: v1.NodeSelectorOpIn,
Values: []string{"1"},
},
{
Key: "b",
Operator: v1.NodeSelectorOpIn,
Values: []string{"2"},
},
},
},
},
},
}
expectedAffinityZone1MergedWithAWSPVWithAffinity := v1.VolumeNodeAffinity{
Required: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "c",
Operator: v1.NodeSelectorOpIn,
Values: []string{"val1", "val2"},
},
{
Key: "d",
Operator: v1.NodeSelectorOpIn,
Values: []string{"val3"},
},
{
Key: kubeletapis.LabelZoneFailureDomain,
Operator: v1.NodeSelectorOpIn,
Values: []string{"1"},
},
},
},
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "e",
Operator: v1.NodeSelectorOpIn,
Values: []string{"val4", "val5"},
},
{
Key: kubeletapis.LabelZoneFailureDomain,
Operator: v1.NodeSelectorOpIn,
Values: []string{"1"},
},
},
},
},
},
}
expectedAffinityZonesMergedWithAWSPVWithAffinity := v1.VolumeNodeAffinity{
Required: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "c",
Operator: v1.NodeSelectorOpIn,
Values: []string{"val1", "val2"},
},
{
Key: "d",
Operator: v1.NodeSelectorOpIn,
Values: []string{"val3"},
},
{
Key: kubeletapis.LabelZoneFailureDomain,
Operator: v1.NodeSelectorOpIn,
Values: []string{"1", "2", "3"},
},
},
},
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "e",
Operator: v1.NodeSelectorOpIn,
Values: []string{"val4", "val5"},
},
{
Key: kubeletapis.LabelZoneFailureDomain,
Operator: v1.NodeSelectorOpIn,
Values: []string{"1", "2", "3"},
},
},
},
},
},
}
zones, _ := volumeutil.ZonesToSet("1,2,3")
testCases := map[string]struct {
vol v1.PersistentVolume
labels map[string]string
expectedAffinity *v1.VolumeNodeAffinity
}{
"non-cloud PV": {
vol: ignoredPV,
labels: nil,
expectedAffinity: nil,
},
"no labels": {
vol: awsPV,
labels: nil,
expectedAffinity: nil,
},
"cloudprovider returns nil, nil": {
vol: awsPV,
labels: nil,
expectedAffinity: nil,
},
"cloudprovider labels": {
vol: awsPV,
labels: map[string]string{"a": "1", "b": "2"},
expectedAffinity: &expectedAffinitya1b2MergedWithAWSPV,
},
"cloudprovider labels pre-existing affinity non-conflicting": {
vol: awsPVWithAffinity,
labels: map[string]string{"a": "1", "b": "2"},
expectedAffinity: &expectedAffinitya1b2MergedWithAWSPVWithAffinity,
},
"cloudprovider labels pre-existing affinity conflicting": {
vol: awsPVWithAffinity,
labels: map[string]string{"a": "1", "c": "2"},
expectedAffinity: nil,
},
"cloudprovider singlezone": {
vol: awsPV,
labels: map[string]string{kubeletapis.LabelZoneFailureDomain: "1"},
expectedAffinity: &expectedAffinityZone1MergedWithAWSPV,
},
"cloudprovider singlezone pre-existing affinity non-conflicting": {
vol: awsPVWithAffinity,
labels: map[string]string{kubeletapis.LabelZoneFailureDomain: "1"},
expectedAffinity: &expectedAffinityZone1MergedWithAWSPVWithAffinity,
},
"cloudprovider multizone": {
vol: awsPV,
labels: map[string]string{kubeletapis.LabelZoneFailureDomain: volumeutil.ZonesSetToLabelValue(zones)},
expectedAffinity: &expectedAffinityZonesMergedWithAWSPV,
},
"cloudprovider multizone pre-existing affinity non-conflicting": {
vol: awsPVWithAffinity,
labels: map[string]string{kubeletapis.LabelZoneFailureDomain: volumeutil.ZonesSetToLabelValue(zones)},
expectedAffinity: &expectedAffinityZonesMergedWithAWSPVWithAffinity,
},
}
utilfeature.DefaultFeatureGate.Set("VolumeScheduling=true")
defer utilfeature.DefaultFeatureGate.Set("VolumeScheduling=false")
for d, tc := range testCases {
cloud := &fakecloud.FakeCloud{}
client := fake.NewSimpleClientset()
@ -104,16 +371,20 @@ func TestCreatePatch(t *testing.T) {
}
obj := &v1.PersistentVolume{}
json.Unmarshal(patch, obj)
if tc.labels != nil {
for k, v := range tc.labels {
if obj.ObjectMeta.Labels[k] != v {
t.Errorf("%s: label %s expected %s got %s", d, k, v, obj.ObjectMeta.Labels[k])
}
}
}
if obj.ObjectMeta.Initializers != nil {
t.Errorf("%s: initializer wasn't removed: %v", d, obj.ObjectMeta.Initializers)
}
if tc.labels == nil {
continue
}
for k, v := range tc.labels {
if obj.ObjectMeta.Labels[k] != v {
t.Errorf("%s: label %s expected %s got %s", d, k, v, obj.ObjectMeta.Labels[k])
}
}
if !reflect.DeepEqual(tc.expectedAffinity, obj.Spec.NodeAffinity) {
t.Errorf("Expected affinity %v does not match target affinity %v", tc.expectedAffinity, obj.Spec.NodeAffinity)
}
}
}
@ -132,32 +403,35 @@ func TestAddLabelsToVolume(t *testing.T) {
}
testCases := map[string]struct {
vol v1.PersistentVolume
initializers *metav1.Initializers
shouldLabel bool
vol v1.PersistentVolume
initializers *metav1.Initializers
shouldLabelAndSetAffinity bool
}{
"PV without initializer": {
vol: pv,
initializers: nil,
shouldLabel: false,
vol: pv,
initializers: nil,
shouldLabelAndSetAffinity: false,
},
"PV with initializer to remove": {
vol: pv,
initializers: &metav1.Initializers{Pending: []metav1.Initializer{{Name: initializerName}}},
shouldLabel: true,
vol: pv,
initializers: &metav1.Initializers{Pending: []metav1.Initializer{{Name: initializerName}}},
shouldLabelAndSetAffinity: true,
},
"PV with other initializers only": {
vol: pv,
initializers: &metav1.Initializers{Pending: []metav1.Initializer{{Name: "OtherInit"}}},
shouldLabel: false,
vol: pv,
initializers: &metav1.Initializers{Pending: []metav1.Initializer{{Name: "OtherInit"}}},
shouldLabelAndSetAffinity: false,
},
"PV with other initializers first": {
vol: pv,
initializers: &metav1.Initializers{Pending: []metav1.Initializer{{Name: "OtherInit"}, {Name: initializerName}}},
shouldLabel: false,
vol: pv,
initializers: &metav1.Initializers{Pending: []metav1.Initializer{{Name: "OtherInit"}, {Name: initializerName}}},
shouldLabelAndSetAffinity: false,
},
}
utilfeature.DefaultFeatureGate.Set("VolumeScheduling=true")
defer utilfeature.DefaultFeatureGate.Set("VolumeScheduling=false")
for d, tc := range testCases {
labeledCh := make(chan bool, 1)
client := fake.NewSimpleClientset()
@ -168,6 +442,22 @@ func TestAddLabelsToVolume(t *testing.T) {
if obj.ObjectMeta.Labels["a"] != "1" {
return false, nil, nil
}
if obj.Spec.NodeAffinity == nil {
return false, nil, nil
}
if obj.Spec.NodeAffinity.Required == nil {
return false, nil, nil
}
if len(obj.Spec.NodeAffinity.Required.NodeSelectorTerms) == 0 {
return false, nil, nil
}
reqs := obj.Spec.NodeAffinity.Required.NodeSelectorTerms[0].MatchExpressions
if len(reqs) != 1 {
return false, nil, nil
}
if reqs[0].Key != "a" || reqs[0].Values[0] != "1" || reqs[0].Operator != v1.NodeSelectorOpIn {
return false, nil, nil
}
labeledCh <- true
return true, nil, nil
})
@ -177,16 +467,16 @@ func TestAddLabelsToVolume(t *testing.T) {
}
pvlController := &PersistentVolumeLabelController{kubeClient: client, cloud: fakeCloud}
tc.vol.ObjectMeta.Initializers = tc.initializers
pvlController.addLabelsToVolume(&tc.vol)
pvlController.addLabelsAndAffinityToVolume(&tc.vol)
select {
case l := <-labeledCh:
if l != tc.shouldLabel {
t.Errorf("%s: label of pv failed. expected %t got %t", d, tc.shouldLabel, l)
if l != tc.shouldLabelAndSetAffinity {
t.Errorf("%s: label and affinity setting of pv failed. expected %t got %t", d, tc.shouldLabelAndSetAffinity, l)
}
case <-time.After(500 * time.Millisecond):
if tc.shouldLabel != false {
t.Errorf("%s: timed out waiting for label notification", d)
if tc.shouldLabelAndSetAffinity != false {
t.Errorf("%s: timed out waiting for label and affinity setting notification", d)
}
}
}

View File

@ -18,10 +18,13 @@ go_library(
"//pkg/cloudprovider:go_default_library",
"//pkg/cloudprovider/providers/aws:go_default_library",
"//pkg/cloudprovider/providers/gce:go_default_library",
"//pkg/features:go_default_library",
"//pkg/kubeapiserver/admission:go_default_library",
"//pkg/kubelet/apis:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/admission:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
],
)
@ -33,10 +36,13 @@ go_test(
deps = [
"//pkg/apis/core:go_default_library",
"//pkg/cloudprovider/providers/aws:go_default_library",
"//pkg/kubelet/apis:go_default_library",
"//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/admission:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
],
)

View File

@ -24,13 +24,16 @@ import (
"github.com/golang/glog"
"k8s.io/apiserver/pkg/admission"
utilfeature "k8s.io/apiserver/pkg/util/feature"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/cloudprovider/providers/aws"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
"k8s.io/kubernetes/pkg/features"
kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
vol "k8s.io/kubernetes/pkg/volume"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
)
const (
@ -79,6 +82,19 @@ func (l *persistentVolumeLabel) SetCloudConfig(cloudConfig []byte) {
l.cloudConfig = cloudConfig
}
func nodeSelectorRequirementKeysExistInNodeSelectorTerms(reqs []api.NodeSelectorRequirement, terms []api.NodeSelectorTerm) bool {
for _, req := range reqs {
for _, term := range terms {
for _, r := range term.MatchExpressions {
if r.Key == req.Key {
return true
}
}
}
}
return false
}
func (l *persistentVolumeLabel) Admit(a admission.Attributes) (err error) {
if a.GetResource().GroupResource() != api.Resource("persistentvolumes") {
return nil
@ -108,6 +124,7 @@ func (l *persistentVolumeLabel) Admit(a admission.Attributes) (err error) {
volumeLabels = labels
}
requirements := make([]api.NodeSelectorRequirement, 0)
if len(volumeLabels) != 0 {
if volume.Labels == nil {
volume.Labels = make(map[string]string)
@ -117,6 +134,42 @@ func (l *persistentVolumeLabel) Admit(a admission.Attributes) (err error) {
// This should be OK because they are in the kubernetes.io namespace
// i.e. we own them
volume.Labels[k] = v
// Set NodeSelectorRequirements based on the labels
var values []string
if k == kubeletapis.LabelZoneFailureDomain {
zones, err := volumeutil.LabelZonesToSet(v)
if err != nil {
return admission.NewForbidden(a, fmt.Errorf("failed to convert label string for Zone: %s to a Set", v))
}
values = zones.UnsortedList()
} else {
values = []string{v}
}
requirements = append(requirements, api.NodeSelectorRequirement{Key: k, Operator: api.NodeSelectorOpIn, Values: values})
}
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeScheduling) {
if volume.Spec.NodeAffinity == nil {
volume.Spec.NodeAffinity = new(api.VolumeNodeAffinity)
}
if volume.Spec.NodeAffinity.Required == nil {
volume.Spec.NodeAffinity.Required = new(api.NodeSelector)
}
if len(volume.Spec.NodeAffinity.Required.NodeSelectorTerms) == 0 {
// Need atleast one term pre-allocated whose MatchExpressions can be appended to
volume.Spec.NodeAffinity.Required.NodeSelectorTerms = make([]api.NodeSelectorTerm, 1)
}
if nodeSelectorRequirementKeysExistInNodeSelectorTerms(requirements, volume.Spec.NodeAffinity.Required.NodeSelectorTerms) {
glog.V(4).Info("NodeSelectorRequirements for cloud labels %v conflict with existing NodeAffinity %v. Skipping addition of NodeSelectorRequirements for cloud labels.",
requirements, volume.Spec.NodeAffinity)
} else {
for _, req := range requirements {
for i := range volume.Spec.NodeAffinity.Required.NodeSelectorTerms {
volume.Spec.NodeAffinity.Required.NodeSelectorTerms[i].MatchExpressions = append(volume.Spec.NodeAffinity.Required.NodeSelectorTerms[i].MatchExpressions, req)
}
}
}
}
}

View File

@ -20,13 +20,18 @@ import (
"testing"
"fmt"
"reflect"
"sort"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apiserver/pkg/admission"
utilfeature "k8s.io/apiserver/pkg/util/feature"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/cloudprovider/providers/aws"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
)
type mockVolumes struct {
@ -83,6 +88,16 @@ func mockVolumeLabels(labels map[string]string) *mockVolumes {
return &mockVolumes{volumeLabels: labels}
}
func getNodeSelectorRequirementWithKey(key string, term api.NodeSelectorTerm) (*api.NodeSelectorRequirement, error) {
for _, r := range term.MatchExpressions {
if r.Key != key {
continue
}
return &r, nil
}
return nil, fmt.Errorf("key %s not found", key)
}
// TestAdmission
func TestAdmission(t *testing.T) {
pvHandler := newPersistentVolumeLabel()
@ -107,6 +122,8 @@ func TestAdmission(t *testing.T) {
},
},
}
utilfeature.DefaultFeatureGate.Set("VolumeScheduling=true")
defer utilfeature.DefaultFeatureGate.Set("VolumeScheduling=false")
// Non-cloud PVs are ignored
err := handler.Admit(admission.NewAttributesRecord(&ignoredPV, nil, api.Kind("PersistentVolume").WithVersion("version"), ignoredPV.Namespace, ignoredPV.Name, api.Resource("persistentvolumes").WithVersion("version"), "", admission.Create, nil))
@ -137,6 +154,9 @@ func TestAdmission(t *testing.T) {
if len(awsPV.ObjectMeta.Labels) != 0 {
t.Errorf("Unexpected number of labels")
}
if awsPV.Spec.NodeAffinity != nil {
t.Errorf("Unexpected NodeAffinity found")
}
// Don't panic if the cloudprovider returns nil, nil
pvHandler.ebsVolumes = mockVolumeFailure(nil)
@ -145,17 +165,45 @@ func TestAdmission(t *testing.T) {
t.Errorf("Expected no error when cloud provider returns empty labels")
}
// Labels from the cloudprovider should be applied to the volume
// Labels from the cloudprovider should be applied to the volume as labels and node affinity expressions
labels = make(map[string]string)
labels["a"] = "1"
labels["b"] = "2"
zones, _ := volumeutil.ZonesToSet("1,2,3")
labels[kubeletapis.LabelZoneFailureDomain] = volumeutil.ZonesSetToLabelValue(zones)
pvHandler.ebsVolumes = mockVolumeLabels(labels)
err = handler.Admit(admission.NewAttributesRecord(&awsPV, nil, api.Kind("PersistentVolume").WithVersion("version"), awsPV.Namespace, awsPV.Name, api.Resource("persistentvolumes").WithVersion("version"), "", admission.Create, nil))
if err != nil {
t.Errorf("Expected no error when creating aws pv")
}
if awsPV.Labels["a"] != "1" || awsPV.Labels["b"] != "2" {
t.Errorf("Expected label a to be added when creating aws pv")
t.Errorf("Expected label a and b to be added when creating aws pv")
}
if awsPV.Spec.NodeAffinity == nil {
t.Errorf("Unexpected nil NodeAffinity found")
}
if len(awsPV.Spec.NodeAffinity.Required.NodeSelectorTerms) != 1 {
t.Errorf("Unexpected number of NodeSelectorTerms")
}
term := awsPV.Spec.NodeAffinity.Required.NodeSelectorTerms[0]
if len(term.MatchExpressions) != 3 {
t.Errorf("Unexpected number of NodeSelectorRequirements in volume NodeAffinity: %d", len(term.MatchExpressions))
}
r, _ := getNodeSelectorRequirementWithKey("a", term)
if r == nil || r.Values[0] != "1" || r.Operator != api.NodeSelectorOpIn {
t.Errorf("NodeSelectorRequirement a-in-1 not found in volume NodeAffinity")
}
r, _ = getNodeSelectorRequirementWithKey("b", term)
if r == nil || r.Values[0] != "2" || r.Operator != api.NodeSelectorOpIn {
t.Errorf("NodeSelectorRequirement b-in-2 not found in volume NodeAffinity")
}
r, _ = getNodeSelectorRequirementWithKey(kubeletapis.LabelZoneFailureDomain, term)
if r == nil {
t.Errorf("NodeSelectorRequirement %s-in-%v not found in volume NodeAffinity", kubeletapis.LabelZoneFailureDomain, zones)
}
sort.Strings(r.Values)
if !reflect.DeepEqual(r.Values, zones.List()) {
t.Errorf("ZoneFailureDomain elements %v does not match zone labels %v", r.Values, zones)
}
// User-provided labels should be honored, but cloudprovider labels replace them when they overlap
@ -173,4 +221,50 @@ func TestAdmission(t *testing.T) {
t.Errorf("Expected (non-conflicting) user provided labels to be honored when creating aws pv")
}
// if a conflicting affinity is already specified, leave affinity in-tact
labels = make(map[string]string)
labels["a"] = "1"
labels["b"] = "2"
labels["c"] = "3"
pvHandler.ebsVolumes = mockVolumeLabels(labels)
err = handler.Admit(admission.NewAttributesRecord(&awsPV, nil, api.Kind("PersistentVolume").WithVersion("version"), awsPV.Namespace, awsPV.Name, api.Resource("persistentvolumes").WithVersion("version"), "", admission.Create, nil))
if err != nil {
t.Errorf("Expected no error when creating aws pv")
}
if awsPV.Spec.NodeAffinity == nil {
t.Errorf("Unexpected nil NodeAffinity found")
}
if awsPV.Spec.NodeAffinity.Required == nil {
t.Errorf("Unexpected nil NodeAffinity.Required %v", awsPV.Spec.NodeAffinity.Required)
}
r, _ = getNodeSelectorRequirementWithKey("c", awsPV.Spec.NodeAffinity.Required.NodeSelectorTerms[0])
if r != nil {
t.Errorf("NodeSelectorRequirement c not expected in volume NodeAffinity")
}
// if a non-conflicting affinity is specified, check for new affinity being added
labels = make(map[string]string)
labels["e"] = "1"
labels["f"] = "2"
labels["g"] = "3"
pvHandler.ebsVolumes = mockVolumeLabels(labels)
err = handler.Admit(admission.NewAttributesRecord(&awsPV, nil, api.Kind("PersistentVolume").WithVersion("version"), awsPV.Namespace, awsPV.Name, api.Resource("persistentvolumes").WithVersion("version"), "", admission.Create, nil))
if err != nil {
t.Errorf("Expected no error when creating aws pv")
}
if awsPV.Spec.NodeAffinity == nil {
t.Errorf("Unexpected nil NodeAffinity found")
}
if awsPV.Spec.NodeAffinity.Required == nil {
t.Errorf("Unexpected nil NodeAffinity.Required %v", awsPV.Spec.NodeAffinity.Required)
}
// populate old entries
labels["a"] = "1"
labels["b"] = "2"
for k, v := range labels {
r, _ = getNodeSelectorRequirementWithKey(k, awsPV.Spec.NodeAffinity.Required.NodeSelectorTerms[0])
if r == nil || r.Values[0] != v || r.Operator != api.NodeSelectorOpIn {
t.Errorf("NodeSelectorRequirement %s-in-%v not found in volume NodeAffinity", k, v)
}
}
}