diff --git a/cmd/cloud-controller-manager/app/controllermanager.go b/cmd/cloud-controller-manager/app/controllermanager.go index 9eedb9f6c2..a44c87e2f8 100644 --- a/cmd/cloud-controller-manager/app/controllermanager.go +++ b/cmd/cloud-controller-manager/app/controllermanager.go @@ -44,7 +44,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/controller" - nodecontroller "k8s.io/kubernetes/pkg/controller/cloud" + cloudcontrollers "k8s.io/kubernetes/pkg/controller/cloud" routecontroller "k8s.io/kubernetes/pkg/controller/route" servicecontroller "k8s.io/kubernetes/pkg/controller/service" "k8s.io/kubernetes/pkg/util/configz" @@ -211,7 +211,7 @@ func StartControllers(s *options.CloudControllerManagerServer, kubeconfig *restc sharedInformers := informers.NewSharedInformerFactory(versionedClient, resyncPeriod(s)()) // Start the CloudNodeController - nodeController := nodecontroller.NewCloudNodeController( + nodeController := cloudcontrollers.NewCloudNodeController( sharedInformers.Core().V1().Nodes(), client("cloud-node-controller"), cloud, s.NodeMonitorPeriod.Duration, @@ -220,6 +220,12 @@ func StartControllers(s *options.CloudControllerManagerServer, kubeconfig *restc nodeController.Run() time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) + // Start the PersistentVolumeLabelController + pvlController := cloudcontrollers.NewPersistentVolumeLabelController(client("pvl-controller"), cloud) + threads := 5 + go pvlController.Run(threads, stop) + time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) + // Start the service controller serviceController, err := servicecontroller.New( cloud, diff --git a/examples/cloud-controller-manager/persistent-volume-label-initializer-config.yaml b/examples/cloud-controller-manager/persistent-volume-label-initializer-config.yaml new file mode 100644 index 0000000000..4a2576cc2a --- /dev/null +++ b/examples/cloud-controller-manager/persistent-volume-label-initializer-config.yaml @@ -0,0 +1,13 @@ +kind: InitializerConfiguration +apiVersion: admissionregistration.k8s.io/v1alpha1 +metadata: + name: pvlabel.kubernetes.io +initializers: + - name: pvlabel.kubernetes.io + rules: + - apiGroups: + - "" + apiVersions: + - "*" + resources: + - persistentvolumes diff --git a/pkg/controller/cloud/BUILD b/pkg/controller/cloud/BUILD index f588d085f7..c5b1f5b010 100644 --- a/pkg/controller/cloud/BUILD +++ b/pkg/controller/cloud/BUILD @@ -8,35 +8,53 @@ load( go_library( name = "go_default_library", - srcs = ["node_controller.go"], + srcs = [ + "node_controller.go", + "pvlcontroller.go", + ], + tags = ["automanaged"], deps = [ "//pkg/api/v1/node:go_default_library", "//pkg/cloudprovider:go_default_library", + "//pkg/cloudprovider/providers/aws:go_default_library", + "//pkg/cloudprovider/providers/gce:go_default_library", + "//pkg/controller:go_default_library", "//pkg/kubelet/apis:go_default_library", "//pkg/util/node:go_default_library", + "//pkg/volume:go_default_library", "//plugin/pkg/scheduler/algorithm:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", "//vendor/k8s.io/client-go/informers/core/v1:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library", "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", + "//vendor/k8s.io/client-go/listers/core/v1:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/client-go/tools/record:go_default_library", "//vendor/k8s.io/client-go/util/retry:go_default_library", + "//vendor/k8s.io/client-go/util/workqueue:go_default_library", ], ) go_test( name = "go_default_test", - srcs = ["node_controller_test.go"], + srcs = [ + "node_controller_test.go", + "pvlcontroller_test.go", + ], library = ":go_default_library", deps = [ "//pkg/cloudprovider:go_default_library", + "//pkg/cloudprovider/providers/aws:go_default_library", "//pkg/cloudprovider/providers/fake:go_default_library", "//pkg/controller:go_default_library", "//pkg/controller/testutil:go_default_library", @@ -45,11 +63,13 @@ go_test( "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/client-go/informers:go_default_library", "//vendor/k8s.io/client-go/kubernetes/fake:go_default_library", "//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library", + "//vendor/k8s.io/client-go/testing:go_default_library", "//vendor/k8s.io/client-go/tools/record:go_default_library", ], ) diff --git a/pkg/controller/cloud/pvlcontroller.go b/pkg/controller/cloud/pvlcontroller.go new file mode 100644 index 0000000000..d2c18e5902 --- /dev/null +++ b/pkg/controller/cloud/pvlcontroller.go @@ -0,0 +1,373 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cloud + +import ( + "encoding/json" + "fmt" + "sync" + "time" + + "github.com/golang/glog" + + "k8s.io/api/core/v1" + + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/strategicpatch" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + + "k8s.io/client-go/kubernetes" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + + "k8s.io/kubernetes/pkg/cloudprovider" + "k8s.io/kubernetes/pkg/cloudprovider/providers/aws" + "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" + "k8s.io/kubernetes/pkg/controller" + kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" + vol "k8s.io/kubernetes/pkg/volume" +) + +const initializerName = "pvlabel.kubernetes.io" + +// PersistentVolumeLabelController handles adding labels to persistent volumes when they are created +type PersistentVolumeLabelController struct { + // Control access to cloud volumes + mutex sync.Mutex + ebsVolumes aws.Volumes + gceCloudProvider *gce.GCECloud + + cloud cloudprovider.Interface + kubeClient kubernetes.Interface + pvlController cache.Controller + pvlIndexer cache.Indexer + volumeLister corelisters.PersistentVolumeLister + + syncHandler func(key string) error + + // queue is where incoming work is placed to de-dup and to allow "easy" rate limited requeues on errors + queue workqueue.RateLimitingInterface +} + +// NewPersistentVolumeLabelController creates a PersistentVolumeLabelController object +func NewPersistentVolumeLabelController( + kubeClient kubernetes.Interface, + cloud cloudprovider.Interface) *PersistentVolumeLabelController { + + pvlc := &PersistentVolumeLabelController{ + cloud: cloud, + kubeClient: kubeClient, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pvLabels"), + } + pvlc.syncHandler = pvlc.addLabels + pvlc.pvlIndexer, pvlc.pvlController = cache.NewIndexerInformer( + &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + options.IncludeUninitialized = true + return kubeClient.CoreV1().PersistentVolumes().List(options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + options.IncludeUninitialized = true + return kubeClient.CoreV1().PersistentVolumes().Watch(options) + }, + }, + &v1.PersistentVolume{}, + 0, + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err == nil { + pvlc.queue.Add(key) + } + }, + }, + cache.Indexers{}, + ) + pvlc.volumeLister = corelisters.NewPersistentVolumeLister(pvlc.pvlIndexer) + + return pvlc +} + +// Run starts a controller that adds labels to persistent volumes +func (pvlc *PersistentVolumeLabelController) Run(threadiness int, stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + defer pvlc.queue.ShutDown() + + glog.Infof("Starting PersistentVolumeLabelController") + defer glog.Infof("Shutting down PersistentVolumeLabelController") + + go pvlc.pvlController.Run(stopCh) + + if !controller.WaitForCacheSync("persistent volume label", stopCh, pvlc.pvlController.HasSynced) { + return + } + + // start up your worker threads based on threadiness. Some controllers have multiple kinds of workers + for i := 0; i < threadiness; i++ { + // runWorker will loop until "something bad" happens. The .Until will then rekick the worker + // after one second + go wait.Until(pvlc.runWorker, time.Second, stopCh) + } + + // wait until we're told to stop + <-stopCh +} + +func (pvlc *PersistentVolumeLabelController) runWorker() { + // hot loop until we're told to stop. processNextWorkItem will automatically wait until there's work + // available, so we don't worry about secondary waits + for pvlc.processNextWorkItem() { + } +} + +// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit. +func (pvlc *PersistentVolumeLabelController) processNextWorkItem() bool { + // pull the next work item from queue. It should be a key we use to lookup something in a cache + keyObj, quit := pvlc.queue.Get() + if quit { + return false + } + // you always have to indicate to the queue that you've completed a piece of work + defer pvlc.queue.Done(keyObj) + + key := keyObj.(string) + // do your work on the key. This method will contains your "do stuff" logic + err := pvlc.syncHandler(key) + if err == nil { + // if you had no error, tell the queue to stop tracking history for your key. This will + // reset things like failure counts for per-item rate limiting + pvlc.queue.Forget(key) + return true + } + + // there was a failure so be sure to report it. This method allows for pluggable error handling + // which can be used for things like cluster-monitoring + utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err)) + + // since we failed, we should requeue the item to work on later. This method will add a backoff + // to avoid hotlooping on particular items (they're probably still not going to work right away) + // and overall controller protection (everything I've done is broken, this controller needs to + // calm down or it can starve other useful work) cases. + pvlc.queue.AddRateLimited(key) + + return true +} + +// AddLabels adds appropriate labels to persistent volumes and sets the +// volume as available if successful. +func (pvlc *PersistentVolumeLabelController) addLabels(key string) error { + _, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return fmt.Errorf("error getting name of volume %q to get volume from informer: %v", key, err) + } + volume, err := pvlc.volumeLister.Get(name) + if errors.IsNotFound(err) { + return nil + } else if err != nil { + return fmt.Errorf("error getting volume %s from informer: %v", name, err) + } + + return pvlc.addLabelsToVolume(volume) +} + +func (pvlc *PersistentVolumeLabelController) addLabelsToVolume(vol *v1.PersistentVolume) error { + var volumeLabels map[string]string + + // Only add labels if in the list of initializers + if needsInitialization(vol.Initializers, initializerName) { + if vol.Spec.AWSElasticBlockStore != nil { + labels, err := pvlc.findAWSEBSLabels(vol) + if err != nil { + return fmt.Errorf("error querying AWS EBS volume %s: %v", vol.Spec.AWSElasticBlockStore.VolumeID, err) + } + volumeLabels = labels + } + if vol.Spec.GCEPersistentDisk != nil { + labels, err := pvlc.findGCEPDLabels(vol) + if err != nil { + return fmt.Errorf("error querying GCE PD volume %s: %v", vol.Spec.GCEPersistentDisk.PDName, err) + } + volumeLabels = labels + } + return pvlc.updateVolume(vol, volumeLabels) + } + + return nil +} + +func (pvlc *PersistentVolumeLabelController) findAWSEBSLabels(volume *v1.PersistentVolume) (map[string]string, error) { + // Ignore any volumes that are being provisioned + if volume.Spec.AWSElasticBlockStore.VolumeID == vol.ProvisionedVolumeName { + return nil, nil + } + ebsVolumes, err := pvlc.getEBSVolumes() + if err != nil { + return nil, err + } + + // TODO: GetVolumeLabels is actually a method on the Volumes interface + // If that gets standardized we can refactor to reduce code duplication + spec := aws.KubernetesVolumeID(volume.Spec.AWSElasticBlockStore.VolumeID) + labels, err := ebsVolumes.GetVolumeLabels(spec) + if err != nil { + return nil, err + } + + return labels, nil +} + +// getEBSVolumes returns the AWS Volumes interface for ebs +func (pvlc *PersistentVolumeLabelController) getEBSVolumes() (aws.Volumes, error) { + pvlc.mutex.Lock() + defer pvlc.mutex.Unlock() + + if pvlc.ebsVolumes == nil { + awsCloudProvider := pvlc.cloud.(*aws.Cloud) + awsCloudProvider, ok := pvlc.cloud.(*aws.Cloud) + if !ok { + // GetCloudProvider has gone very wrong + return nil, fmt.Errorf("error retrieving AWS cloud provider") + } + pvlc.ebsVolumes = awsCloudProvider + } + return pvlc.ebsVolumes, nil +} + +func (pvlc *PersistentVolumeLabelController) findGCEPDLabels(volume *v1.PersistentVolume) (map[string]string, error) { + // Ignore any volumes that are being provisioned + if volume.Spec.GCEPersistentDisk.PDName == vol.ProvisionedVolumeName { + return nil, nil + } + + provider, err := pvlc.getGCECloudProvider() + if err != nil { + return nil, err + } + + // If the zone is already labeled, honor the hint + zone := volume.Labels[kubeletapis.LabelZoneFailureDomain] + + labels, err := provider.GetAutoLabelsForPD(volume.Spec.GCEPersistentDisk.PDName, zone) + if err != nil { + return nil, err + } + + return labels, nil +} + +// getGCECloudProvider returns the GCE cloud provider, for use for querying volume labels +func (pvlc *PersistentVolumeLabelController) getGCECloudProvider() (*gce.GCECloud, error) { + pvlc.mutex.Lock() + defer pvlc.mutex.Unlock() + + if pvlc.gceCloudProvider == nil { + gceCloudProvider, ok := pvlc.cloud.(*gce.GCECloud) + if !ok { + // GetCloudProvider has gone very wrong + return nil, fmt.Errorf("error retrieving GCE cloud provider") + } + pvlc.gceCloudProvider = gceCloudProvider + } + return pvlc.gceCloudProvider, nil +} + +func (pvlc *PersistentVolumeLabelController) createPatch(vol *v1.PersistentVolume, volLabels map[string]string) ([]byte, error) { + volName := vol.Name + newVolume := vol.DeepCopyObject().(*v1.PersistentVolume) + if newVolume.Labels == nil { + newVolume.Labels = make(map[string]string) + } + for k, v := range volLabels { + newVolume.Labels[k] = v + } + newVolume.Initializers = removeInitializer(newVolume.Initializers, initializerName) + glog.V(4).Infof("removed initializer on PersistentVolume %s", newVolume.Name) + + oldData, err := json.Marshal(vol) + if err != nil { + return nil, fmt.Errorf("failed to marshal old persistentvolume %#v for persistentvolume %q: %v", vol, volName, err) + } + + newData, err := json.Marshal(newVolume) + if err != nil { + return nil, fmt.Errorf("failed to marshal new persistentvolume %#v for persistentvolume %q: %v", newVolume, volName, err) + } + + patch, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.PersistentVolume{}) + if err != nil { + return nil, fmt.Errorf("failed to create patch for persistentvolume %q: %v", volName, err) + } + return patch, nil +} + +func (pvlc *PersistentVolumeLabelController) updateVolume(vol *v1.PersistentVolume, volLabels map[string]string) error { + volName := vol.Name + glog.V(4).Infof("updating PersistentVolume %s", volName) + patchBytes, err := pvlc.createPatch(vol, volLabels) + if err != nil { + return err + } + + _, err = pvlc.kubeClient.Core().PersistentVolumes().Patch(string(volName), types.StrategicMergePatchType, patchBytes) + if err != nil { + return fmt.Errorf("failed to update PersistentVolume %s: %v", volName, err) + } + glog.V(4).Infof("updated PersistentVolume %s", volName) + + return err +} + +func removeInitializer(initializers *metav1.Initializers, name string) *metav1.Initializers { + if initializers == nil { + return nil + } + + var updated []metav1.Initializer + for _, pending := range initializers.Pending { + if pending.Name != name { + updated = append(updated, pending) + } + } + if len(updated) == len(initializers.Pending) { + return initializers + } + if len(updated) == 0 { + return nil + } + + return &metav1.Initializers{Pending: updated} +} + +func needsInitialization(initializers *metav1.Initializers, name string) bool { + hasInitializer := false + + if initializers != nil { + for _, pending := range initializers.Pending { + if pending.Name == name { + hasInitializer = true + break + } + } + } + return hasInitializer +} diff --git a/pkg/controller/cloud/pvlcontroller_test.go b/pkg/controller/cloud/pvlcontroller_test.go new file mode 100644 index 0000000000..89567349d3 --- /dev/null +++ b/pkg/controller/cloud/pvlcontroller_test.go @@ -0,0 +1,226 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cloud + +import ( + "encoding/json" + "fmt" + "testing" + "time" + + "k8s.io/api/core/v1" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + + "k8s.io/client-go/kubernetes/fake" + core "k8s.io/client-go/testing" + + "k8s.io/kubernetes/pkg/cloudprovider/providers/aws" + fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake" +) + +type mockVolumes struct { + volumeLabels map[string]string + volumeLabelsError error +} + +var _ aws.Volumes = &mockVolumes{} + +func (v *mockVolumes) AttachDisk(diskName aws.KubernetesVolumeID, nodeName types.NodeName, readOnly bool) (string, error) { + return "", fmt.Errorf("not implemented") +} + +func (v *mockVolumes) DetachDisk(diskName aws.KubernetesVolumeID, nodeName types.NodeName) (string, error) { + return "", fmt.Errorf("not implemented") +} + +func (v *mockVolumes) CreateDisk(volumeOptions *aws.VolumeOptions) (volumeName aws.KubernetesVolumeID, err error) { + return "", fmt.Errorf("not implemented") +} + +func (v *mockVolumes) DeleteDisk(volumeName aws.KubernetesVolumeID) (bool, error) { + return false, fmt.Errorf("not implemented") +} + +func (v *mockVolumes) GetVolumeLabels(volumeName aws.KubernetesVolumeID) (map[string]string, error) { + return v.volumeLabels, v.volumeLabelsError +} + +func (c *mockVolumes) GetDiskPath(volumeName aws.KubernetesVolumeID) (string, error) { + return "", fmt.Errorf("not implemented") +} + +func (c *mockVolumes) DiskIsAttached(volumeName aws.KubernetesVolumeID, nodeName types.NodeName) (bool, error) { + return false, fmt.Errorf("not implemented") +} + +func (c *mockVolumes) DisksAreAttached(nodeDisks map[types.NodeName][]aws.KubernetesVolumeID) (map[types.NodeName]map[aws.KubernetesVolumeID]bool, error) { + return nil, fmt.Errorf("not implemented") +} + +func TestCreatePatch(t *testing.T) { + ignoredPV := v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: "noncloud", + Initializers: &metav1.Initializers{ + Pending: []metav1.Initializer{ + { + Name: initializerName, + }, + }, + }, + }, + Spec: v1.PersistentVolumeSpec{ + PersistentVolumeSource: v1.PersistentVolumeSource{ + HostPath: &v1.HostPathVolumeSource{ + Path: "/", + }, + }, + }, + } + awsPV := v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: "awsPV", + Initializers: &metav1.Initializers{ + Pending: []metav1.Initializer{ + { + Name: initializerName, + }, + }, + }, + }, + Spec: v1.PersistentVolumeSpec{ + PersistentVolumeSource: v1.PersistentVolumeSource{ + AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{ + VolumeID: "123", + }, + }, + }, + } + + testCases := map[string]struct { + vol v1.PersistentVolume + labels map[string]string + }{ + "non-cloud PV": { + vol: ignoredPV, + labels: nil, + }, + "no labels": { + vol: awsPV, + labels: nil, + }, + "cloudprovider returns nil, nil": { + vol: awsPV, + labels: nil, + }, + "cloudprovider labels": { + vol: awsPV, + labels: map[string]string{"a": "1", "b": "2"}, + }, + } + + for d, tc := range testCases { + cloud := &fakecloud.FakeCloud{} + client := fake.NewSimpleClientset() + pvlController := NewPersistentVolumeLabelController(client, cloud) + patch, err := pvlController.createPatch(&tc.vol, tc.labels) + if err != nil { + t.Errorf("%s: createPatch returned err: %v", d, err) + } + obj := &v1.PersistentVolume{} + json.Unmarshal(patch, obj) + if tc.labels != nil { + for k, v := range tc.labels { + if obj.ObjectMeta.Labels[k] != v { + t.Errorf("%s: label %s expected %s got %s", d, k, v, obj.ObjectMeta.Labels[k]) + } + } + } + if obj.ObjectMeta.Initializers != nil { + t.Errorf("%s: initializer wasn't removed: %v", d, obj.ObjectMeta.Initializers) + } + } +} + +func TestAddLabelsToVolume(t *testing.T) { + pv := v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: "awsPV", + }, + Spec: v1.PersistentVolumeSpec{ + PersistentVolumeSource: v1.PersistentVolumeSource{ + AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{ + VolumeID: "123", + }, + }, + }, + } + + testCases := map[string]struct { + vol v1.PersistentVolume + initializers *metav1.Initializers + shouldLabel bool + }{ + "PV without initializer": { + vol: pv, + initializers: nil, + shouldLabel: false, + }, + "PV with initializer to remove": { + vol: pv, + initializers: &metav1.Initializers{Pending: []metav1.Initializer{{Name: initializerName}}}, + shouldLabel: true, + }, + "PV with other initializers": { + vol: pv, + initializers: &metav1.Initializers{Pending: []metav1.Initializer{{Name: "OtherInit"}}}, + shouldLabel: false, + }, + } + + for d, tc := range testCases { + labeledCh := make(chan bool, 1) + client := fake.NewSimpleClientset() + client.PrependReactor("patch", "persistentvolumes", func(action core.Action) (handled bool, ret runtime.Object, err error) { + patch := action.(core.PatchActionImpl).GetPatch() + obj := &v1.PersistentVolume{} + json.Unmarshal(patch, obj) + if obj.ObjectMeta.Labels["a"] != "1" { + return false, nil, nil + } + labeledCh <- true + return true, nil, nil + }) + pvlController := &PersistentVolumeLabelController{kubeClient: client, ebsVolumes: &mockVolumes{volumeLabels: map[string]string{"a": "1"}}} + tc.vol.ObjectMeta.Initializers = tc.initializers + pvlController.addLabelsToVolume(&tc.vol) + + select { + case l := <-labeledCh: + if l != tc.shouldLabel { + t.Errorf("%s: label of pv failed. expected %t got %t", d, tc.shouldLabel, l) + } + case <-time.After(500 * time.Millisecond): + if tc.shouldLabel != false { + t.Errorf("%s: timed out waiting for label notification", d) + } + } + } +}