diff --git a/cmd/kube-controller-manager/app/BUILD b/cmd/kube-controller-manager/app/BUILD index 779cdb718b..3f4ef57af3 100644 --- a/cmd/kube-controller-manager/app/BUILD +++ b/cmd/kube-controller-manager/app/BUILD @@ -76,6 +76,7 @@ go_library( "//pkg/controller/volume/expand:go_default_library", "//pkg/controller/volume/persistentvolume:go_default_library", "//pkg/controller/volume/pvcprotection:go_default_library", + "//pkg/controller/volume/pvprotection:go_default_library", "//pkg/features:go_default_library", "//pkg/quota/generic:go_default_library", "//pkg/quota/install:go_default_library", diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index d5a50d77d4..c5db0d375a 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -384,6 +384,7 @@ func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc controllers["persistentvolume-expander"] = startVolumeExpandController controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController controllers["pvc-protection"] = startPVCProtectionController + controllers["pv-protection"] = startPVProtectionController return controllers } diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index 13618b1cbc..b800abcfe4 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -55,6 +55,7 @@ import ( "k8s.io/kubernetes/pkg/controller/volume/expand" persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/volume/persistentvolume" "k8s.io/kubernetes/pkg/controller/volume/pvcprotection" + "k8s.io/kubernetes/pkg/controller/volume/pvprotection" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/quota/generic" quotainstall "k8s.io/kubernetes/pkg/quota/install" @@ -403,3 +404,14 @@ func startPVCProtectionController(ctx ControllerContext) (bool, error) { } return false, nil } + +func startPVProtectionController(ctx ControllerContext) (bool, error) { + if utilfeature.DefaultFeatureGate.Enabled(features.StorageProtection) { + go pvprotection.NewPVProtectionController( + ctx.InformerFactory.Core().V1().PersistentVolumes(), + ctx.ClientBuilder.ClientOrDie("pv-protection-controller"), + ).Run(1, ctx.Stop) + return true, nil + } + return false, nil +} diff --git a/pkg/controller/BUILD b/pkg/controller/BUILD index 7c2db32b5d..7910e5e0c7 100644 --- a/pkg/controller/BUILD +++ b/pkg/controller/BUILD @@ -136,6 +136,7 @@ filegroup( "//pkg/controller/volume/expand:all-srcs", "//pkg/controller/volume/persistentvolume:all-srcs", "//pkg/controller/volume/pvcprotection:all-srcs", + "//pkg/controller/volume/pvprotection:all-srcs", ], tags = ["automanaged"], ) diff --git a/pkg/controller/volume/pvprotection/BUILD b/pkg/controller/volume/pvprotection/BUILD new file mode 100644 index 0000000000..87ad0890ce --- /dev/null +++ b/pkg/controller/volume/pvprotection/BUILD @@ -0,0 +1,60 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = ["pv_protection_controller.go"], + importpath = "k8s.io/kubernetes/pkg/controller/volume/pvprotection", + visibility = ["//visibility:public"], + deps = [ + "//pkg/controller:go_default_library", + "//pkg/util/metrics:go_default_library", + "//pkg/util/slice:go_default_library", + "//pkg/volume/util:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//vendor/k8s.io/client-go/informers/core/v1:go_default_library", + "//vendor/k8s.io/client-go/kubernetes:go_default_library", + "//vendor/k8s.io/client-go/listers/core/v1:go_default_library", + "//vendor/k8s.io/client-go/tools/cache:go_default_library", + "//vendor/k8s.io/client-go/util/workqueue:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) + +go_test( + name = "go_default_test", + srcs = ["pv_protection_controller_test.go"], + embed = [":go_default_library"], + importpath = "k8s.io/kubernetes/pkg/controller/volume/pvprotection", + deps = [ + "//pkg/controller:go_default_library", + "//pkg/volume/util:go_default_library", + "//vendor/github.com/davecgh/go-spew/spew:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + "//vendor/k8s.io/client-go/informers:go_default_library", + "//vendor/k8s.io/client-go/kubernetes/fake:go_default_library", + "//vendor/k8s.io/client-go/testing:go_default_library", + ], +) diff --git a/pkg/controller/volume/pvprotection/pv_protection_controller.go b/pkg/controller/volume/pvprotection/pv_protection_controller.go new file mode 100644 index 0000000000..cd4b384141 --- /dev/null +++ b/pkg/controller/volume/pvprotection/pv_protection_controller.go @@ -0,0 +1,208 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pvprotection + +import ( + "fmt" + "time" + + "github.com/golang/glog" + "k8s.io/api/core/v1" + apierrs "k8s.io/apimachinery/pkg/api/errors" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + coreinformers "k8s.io/client-go/informers/core/v1" + clientset "k8s.io/client-go/kubernetes" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/util/metrics" + "k8s.io/kubernetes/pkg/util/slice" + volumeutil "k8s.io/kubernetes/pkg/volume/util" +) + +// Controller is controller that removes PVProtectionFinalizer +// from PVs that are not bound to PVCs. +type Controller struct { + client clientset.Interface + + pvLister corelisters.PersistentVolumeLister + pvListerSynced cache.InformerSynced + + queue workqueue.RateLimitingInterface +} + +// NewPVProtectionController returns a new *Controller. +func NewPVProtectionController(pvInformer coreinformers.PersistentVolumeInformer, cl clientset.Interface) *Controller { + e := &Controller{ + client: cl, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pvprotection"), + } + if cl != nil && cl.CoreV1().RESTClient().GetRateLimiter() != nil { + metrics.RegisterMetricAndTrackRateLimiterUsage("persistentvolume_protection_controller", cl.CoreV1().RESTClient().GetRateLimiter()) + } + + e.pvLister = pvInformer.Lister() + e.pvListerSynced = pvInformer.Informer().HasSynced + pvInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: e.pvAddedUpdated, + UpdateFunc: func(old, new interface{}) { + e.pvAddedUpdated(new) + }, + }) + + return e +} + +// Run runs the controller goroutines. +func (c *Controller) Run(workers int, stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + defer c.queue.ShutDown() + + glog.Infof("Starting PV protection controller") + defer glog.Infof("Shutting down PV protection controller") + + if !controller.WaitForCacheSync("PV protection", stopCh, c.pvListerSynced) { + return + } + + for i := 0; i < workers; i++ { + go wait.Until(c.runWorker, time.Second, stopCh) + } + + <-stopCh +} + +func (c *Controller) runWorker() { + for c.processNextWorkItem() { + } +} + +// processNextWorkItem deals with one pvcKey off the queue. It returns false when it's time to quit. +func (c *Controller) processNextWorkItem() bool { + pvKey, quit := c.queue.Get() + if quit { + return false + } + defer c.queue.Done(pvKey) + + pvName := pvKey.(string) + + err := c.processPV(pvName) + if err == nil { + c.queue.Forget(pvKey) + return true + } + + utilruntime.HandleError(fmt.Errorf("PV %v failed with : %v", pvKey, err)) + c.queue.AddRateLimited(pvKey) + + return true +} + +func (c *Controller) processPV(pvName string) error { + glog.V(4).Infof("Processing PV %s", pvName) + startTime := time.Now() + defer func() { + glog.V(4).Infof("Finished processing PV %s (%v)", pvName, time.Now().Sub(startTime)) + }() + + pv, err := c.pvLister.Get(pvName) + if apierrs.IsNotFound(err) { + glog.V(4).Infof("PV %s not found, ignoring", pvName) + return nil + } + if err != nil { + return err + } + + if isDeletionCandidate(pv) { + // PV should be deleted. Check if it's used and remove finalizer if + // it's not. + isUsed := c.isBeingUsed(pv) + if !isUsed { + return c.removeFinalizer(pv) + } + } + + if needToAddFinalizer(pv) { + // PV is not being deleted -> it should have the finalizer. The + // finalizer should be added by admission plugin, this is just to add + // the finalizer to old PVs that were created before the admission + // plugin was enabled. + return c.addFinalizer(pv) + } + return nil +} + +func (c *Controller) addFinalizer(pv *v1.PersistentVolume) error { + pvClone := pv.DeepCopy() + pvClone.ObjectMeta.Finalizers = append(pvClone.ObjectMeta.Finalizers, volumeutil.PVProtectionFinalizer) + _, err := c.client.CoreV1().PersistentVolumes().Update(pvClone) + if err != nil { + glog.V(3).Infof("Error adding protection finalizer to PV %s: %v", pv.Name) + return err + } + glog.V(3).Infof("Added protection finalizer to PV %s", pv.Name) + return nil +} + +func (c *Controller) removeFinalizer(pv *v1.PersistentVolume) error { + pvClone := pv.DeepCopy() + pvClone.ObjectMeta.Finalizers = slice.RemoveString(pvClone.ObjectMeta.Finalizers, volumeutil.PVProtectionFinalizer, nil) + _, err := c.client.CoreV1().PersistentVolumes().Update(pvClone) + if err != nil { + glog.V(3).Infof("Error removing protection finalizer from PV %s: %v", pv.Name, err) + return err + } + glog.V(3).Infof("Removed protection finalizer from PV %s", pv.Name) + return nil +} + +func (c *Controller) isBeingUsed(pv *v1.PersistentVolume) bool { + // check if PV is being bound to a PVC by its status + // the status will be updated by PV controller + if pv.Status.Phase == v1.VolumeBound { + // the PV is being used now + return true + } + + return false +} + +// pvAddedUpdated reacts to pv added/updated events +func (c *Controller) pvAddedUpdated(obj interface{}) { + pv, ok := obj.(*v1.PersistentVolume) + if !ok { + utilruntime.HandleError(fmt.Errorf("PV informer returned non-PV object: %#v", obj)) + return + } + glog.V(4).Infof("Got event on PV %s", pv.Name) + + if needToAddFinalizer(pv) || isDeletionCandidate(pv) { + c.queue.Add(pv.Name) + } +} + +func isDeletionCandidate(pv *v1.PersistentVolume) bool { + return pv.ObjectMeta.DeletionTimestamp != nil && slice.ContainsString(pv.ObjectMeta.Finalizers, volumeutil.PVProtectionFinalizer, nil) +} + +func needToAddFinalizer(pv *v1.PersistentVolume) bool { + return pv.ObjectMeta.DeletionTimestamp == nil && !slice.ContainsString(pv.ObjectMeta.Finalizers, volumeutil.PVProtectionFinalizer, nil) +} diff --git a/pkg/controller/volume/pvprotection/pv_protection_controller_test.go b/pkg/controller/volume/pvprotection/pv_protection_controller_test.go new file mode 100644 index 0000000000..8f0969a09e --- /dev/null +++ b/pkg/controller/volume/pvprotection/pv_protection_controller_test.go @@ -0,0 +1,257 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pvprotection + +import ( + "errors" + "reflect" + "testing" + "time" + + "github.com/davecgh/go-spew/spew" + "github.com/golang/glog" + + "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" + clienttesting "k8s.io/client-go/testing" + "k8s.io/kubernetes/pkg/controller" + volumeutil "k8s.io/kubernetes/pkg/volume/util" +) + +const defaultPVName = "default-pv" + +type reaction struct { + verb string + resource string + reactorfn clienttesting.ReactionFunc +} + +func pv() *v1.PersistentVolume { + return &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: defaultPVName, + }, + } +} + +func boundPV() *v1.PersistentVolume { + return &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: defaultPVName, + }, + Status: v1.PersistentVolumeStatus{ + Phase: v1.VolumeBound, + }, + } +} + +func withProtectionFinalizer(pv *v1.PersistentVolume) *v1.PersistentVolume { + pv.Finalizers = append(pv.Finalizers, volumeutil.PVProtectionFinalizer) + return pv +} + +func generateUpdateErrorFunc(t *testing.T, failures int) clienttesting.ReactionFunc { + i := 0 + return func(action clienttesting.Action) (bool, runtime.Object, error) { + i++ + if i <= failures { + // Update fails + update, ok := action.(clienttesting.UpdateAction) + + if !ok { + t.Fatalf("Reactor got non-update action: %+v", action) + } + acc, _ := meta.Accessor(update.GetObject()) + return true, nil, apierrors.NewForbidden(update.GetResource().GroupResource(), acc.GetName(), errors.New("Mock error")) + } + // Update succeeds + return false, nil, nil + } +} + +func deleted(pv *v1.PersistentVolume) *v1.PersistentVolume { + pv.DeletionTimestamp = &metav1.Time{} + return pv +} + +func TestPVProtectionController(t *testing.T) { + pvVer := schema.GroupVersionResource{ + Group: v1.GroupName, + Version: "v1", + Resource: "persistentvolumes", + } + tests := []struct { + name string + // Object to insert into fake kubeclient before the test starts. + initialObjects []runtime.Object + // Optional client reactors. + reactors []reaction + // PV event to simulate. This PV will be automatically added to + // initalObjects. + updatedPV *v1.PersistentVolume + // List of expected kubeclient actions that should happen during the + // test. + expectedActions []clienttesting.Action + }{ + // PV events + // + { + name: "PV without finalizer -> finalizer is added", + updatedPV: pv(), + expectedActions: []clienttesting.Action{ + clienttesting.NewUpdateAction(pvVer, "", withProtectionFinalizer(pv())), + }, + }, + { + name: "PVC with finalizer -> no action", + updatedPV: withProtectionFinalizer(pv()), + expectedActions: []clienttesting.Action{}, + }, + { + name: "saving PVC finalizer fails -> controller retries", + updatedPV: pv(), + reactors: []reaction{ + { + verb: "update", + resource: "persistentvolumes", + reactorfn: generateUpdateErrorFunc(t, 2 /* update fails twice*/), + }, + }, + expectedActions: []clienttesting.Action{ + // This fails + clienttesting.NewUpdateAction(pvVer, "", withProtectionFinalizer(pv())), + // This fails too + clienttesting.NewUpdateAction(pvVer, "", withProtectionFinalizer(pv())), + // This succeeds + clienttesting.NewUpdateAction(pvVer, "", withProtectionFinalizer(pv())), + }, + }, + { + name: "deleted PV with finalizer -> finalizer is removed", + updatedPV: deleted(withProtectionFinalizer(pv())), + expectedActions: []clienttesting.Action{ + clienttesting.NewUpdateAction(pvVer, "", deleted(pv())), + }, + }, + { + name: "finalizer removal fails -> controller retries", + updatedPV: deleted(withProtectionFinalizer(pv())), + reactors: []reaction{ + { + verb: "update", + resource: "persistentvolumes", + reactorfn: generateUpdateErrorFunc(t, 2 /* update fails twice*/), + }, + }, + expectedActions: []clienttesting.Action{ + // Fails + clienttesting.NewUpdateAction(pvVer, "", deleted(pv())), + // Fails too + clienttesting.NewUpdateAction(pvVer, "", deleted(pv())), + // Succeeds + clienttesting.NewUpdateAction(pvVer, "", deleted(pv())), + }, + }, + { + name: "deleted PVC with finalizer + PV is bound -> finalizer is not removed", + updatedPV: deleted(withProtectionFinalizer(boundPV())), + expectedActions: []clienttesting.Action{}, + }, + } + + for _, test := range tests { + // Create client with initial data + objs := test.initialObjects + if test.updatedPV != nil { + objs = append(objs, test.updatedPV) + } + + client := fake.NewSimpleClientset(objs...) + + // Create informers + informers := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc()) + pvInformer := informers.Core().V1().PersistentVolumes() + + // Populate the informers with initial objects so the controller can + // Get() it. + for _, obj := range objs { + switch obj.(type) { + case *v1.PersistentVolume: + pvInformer.Informer().GetStore().Add(obj) + default: + t.Fatalf("Unknown initalObject type: %+v", obj) + } + } + + // Add reactor to inject test errors. + for _, reactor := range test.reactors { + client.Fake.PrependReactor(reactor.verb, reactor.resource, reactor.reactorfn) + } + + // Create the controller + ctrl := NewPVProtectionController(pvInformer, client) + + // Start the test by simulating an event + if test.updatedPV != nil { + ctrl.pvAddedUpdated(test.updatedPV) + } + + // Process the controller queue until we get expected results + timeout := time.Now().Add(10 * time.Second) + lastReportedActionCount := 0 + for { + if time.Now().After(timeout) { + t.Errorf("Test %q: timed out", test.name) + break + } + if ctrl.queue.Len() > 0 { + glog.V(5).Infof("Test %q: %d events queue, processing one", test.name, ctrl.queue.Len()) + ctrl.processNextWorkItem() + } + if ctrl.queue.Len() > 0 { + // There is still some work in the queue, process it now + continue + } + currentActionCount := len(client.Actions()) + if currentActionCount < len(test.expectedActions) { + // Do not log evey wait, only when the action count changes. + if lastReportedActionCount < currentActionCount { + glog.V(5).Infof("Test %q: got %d actions out of %d, waiting for the rest", test.name, currentActionCount, len(test.expectedActions)) + lastReportedActionCount = currentActionCount + } + // The test expected more to happen, wait for the actions. + // Most probably it's exponential backoff + time.Sleep(10 * time.Millisecond) + continue + } + break + } + actions := client.Actions() + + if !reflect.DeepEqual(actions, test.expectedActions) { + t.Errorf("Test %q: action not expected\nExpected:\n%s\ngot:\n%s", test.name, spew.Sdump(test.expectedActions), spew.Sdump(actions)) + } + + } + +}