mirror of https://github.com/k3s-io/k3s
Merge pull request #59350 from jsafrane/recycler-wait
Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Do not recycle volumes that are used by pods **What this PR does / why we need it**: Recycler should wait until all pods that use a volume are finished. Consider this scenario: 1. User creates a PVC that's bound to a NFS PV. 2. User creates a pod that uses the PVC 3. User deletes the PVC. Now the PV gets `Released` (the PVC does not exists) and recycled, however the PV is still mounted to a running pod. PVC protection won't help us, because it puts finalizers on PVC that is under user's control and user can remove it. This PR checks that there is no pod that uses a PV before it recycles it. **Release note**: ```release-note NONE ``` /sig storagepull/6/head
commit
5cecc6ec68
|
@ -175,6 +175,7 @@ func startPersistentVolumeBinderController(ctx ControllerContext) (bool, error)
|
|||
VolumeInformer: ctx.InformerFactory.Core().V1().PersistentVolumes(),
|
||||
ClaimInformer: ctx.InformerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
ClassInformer: ctx.InformerFactory.Storage().V1().StorageClasses(),
|
||||
PodInformer: ctx.InformerFactory.Core().V1().Pods(),
|
||||
EnableDynamicProvisioning: ctx.Options.VolumeConfiguration.EnableDynamicProvisioning,
|
||||
}
|
||||
volumeController, volumeControllerErr := persistentvolumecontroller.NewController(params)
|
||||
|
|
|
@ -32,6 +32,7 @@ go_library(
|
|||
"//pkg/util/mount:go_default_library",
|
||||
"//pkg/volume:go_default_library",
|
||||
"//pkg/volume/util:go_default_library",
|
||||
"//pkg/volume/util/volumehelper:go_default_library",
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/api/storage/v1:go_default_library",
|
||||
|
@ -42,6 +43,7 @@ go_library(
|
|||
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
"//vendor/k8s.io/client-go/informers/core/v1:go_default_library",
|
||||
|
@ -96,6 +98,7 @@ go_test(
|
|||
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
|
||||
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
|
||||
"//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library",
|
||||
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/client-go/listers/storage/v1:go_default_library",
|
||||
"//vendor/k8s.io/client-go/testing:go_default_library",
|
||||
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
|
||||
|
|
|
@ -624,7 +624,7 @@ func TestSync(t *testing.T) {
|
|||
ObjectMeta: metav1.ObjectMeta{Name: classWait},
|
||||
VolumeBindingMode: &modeWait,
|
||||
},
|
||||
})
|
||||
}, []*v1.Pod{})
|
||||
}
|
||||
|
||||
func TestSyncAlphaBlockVolume(t *testing.T) {
|
||||
|
@ -776,7 +776,7 @@ func TestSyncAlphaBlockVolume(t *testing.T) {
|
|||
}
|
||||
defer utilfeature.DefaultFeatureGate.Set("BlockVolume=false")
|
||||
|
||||
runSyncTests(t, tests, []*storage.StorageClass{})
|
||||
runSyncTests(t, tests, []*storage.StorageClass{}, []*v1.Pod{})
|
||||
}
|
||||
|
||||
// Test multiple calls to syncClaim/syncVolume and periodic sync of all
|
||||
|
|
|
@ -192,7 +192,7 @@ func TestDeleteSync(t *testing.T) {
|
|||
},
|
||||
},
|
||||
}
|
||||
runSyncTests(t, tests, []*storage.StorageClass{})
|
||||
runSyncTests(t, tests, []*storage.StorageClass{}, []*v1.Pod{})
|
||||
}
|
||||
|
||||
// Test multiple calls to syncClaim/syncVolume and periodic sync of all
|
||||
|
|
|
@ -41,6 +41,7 @@ import (
|
|||
"k8s.io/client-go/informers"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
storagelisters "k8s.io/client-go/listers/storage/v1"
|
||||
core "k8s.io/client-go/testing"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
@ -609,6 +610,7 @@ func newTestController(kubeClient clientset.Interface, informerFactory informers
|
|||
VolumeInformer: informerFactory.Core().V1().PersistentVolumes(),
|
||||
ClaimInformer: informerFactory.Core().V1().PersistentVolumeClaims(),
|
||||
ClassInformer: informerFactory.Storage().V1().StorageClasses(),
|
||||
PodInformer: informerFactory.Core().V1().Pods(),
|
||||
EventRecorder: record.NewFakeRecorder(1000),
|
||||
EnableDynamicProvisioning: enableDynamicProvisioning,
|
||||
}
|
||||
|
@ -939,7 +941,7 @@ func evaluateTestResults(ctrl *PersistentVolumeController, reactor *volumeReacto
|
|||
// 2. Call the tested function (syncClaim/syncVolume) via
|
||||
// controllerTest.testCall *once*.
|
||||
// 3. Compare resulting volumes and claims with expected volumes and claims.
|
||||
func runSyncTests(t *testing.T, tests []controllerTest, storageClasses []*storage.StorageClass) {
|
||||
func runSyncTests(t *testing.T, tests []controllerTest, storageClasses []*storage.StorageClass, pods []*v1.Pod) {
|
||||
for _, test := range tests {
|
||||
glog.V(4).Infof("starting test %q", test.name)
|
||||
|
||||
|
@ -966,6 +968,12 @@ func runSyncTests(t *testing.T, tests []controllerTest, storageClasses []*storag
|
|||
}
|
||||
ctrl.classLister = storagelisters.NewStorageClassLister(indexer)
|
||||
|
||||
podIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
|
||||
for _, pod := range pods {
|
||||
podIndexer.Add(pod)
|
||||
}
|
||||
ctrl.podLister = corelisters.NewPodLister(podIndexer)
|
||||
|
||||
// Run the tested functions
|
||||
err = test.test(ctrl, reactor, test)
|
||||
if err != nil {
|
||||
|
|
|
@ -416,7 +416,7 @@ func TestProvisionSync(t *testing.T) {
|
|||
noerrors, wrapTestWithProvisionCalls([]provisionCall{}, testSyncClaim),
|
||||
},
|
||||
}
|
||||
runSyncTests(t, tests, storageClasses)
|
||||
runSyncTests(t, tests, storageClasses, []*v1.Pod{})
|
||||
}
|
||||
|
||||
// Test multiple calls to syncClaim/syncVolume and periodic sync of all
|
||||
|
|
|
@ -26,6 +26,8 @@ import (
|
|||
storage "k8s.io/api/storage/v1"
|
||||
apierrs "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/kubernetes/scheme"
|
||||
|
@ -43,6 +45,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
|
||||
vol "k8s.io/kubernetes/pkg/volume"
|
||||
"k8s.io/kubernetes/pkg/volume/util"
|
||||
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
|
||||
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
@ -161,6 +164,8 @@ type PersistentVolumeController struct {
|
|||
claimListerSynced cache.InformerSynced
|
||||
classLister storagelisters.StorageClassLister
|
||||
classListerSynced cache.InformerSynced
|
||||
podLister corelisters.PodLister
|
||||
podListerSynced cache.InformerSynced
|
||||
|
||||
kubeClient clientset.Interface
|
||||
eventRecorder record.EventRecorder
|
||||
|
@ -1065,6 +1070,17 @@ func (ctrl *PersistentVolumeController) recycleVolumeOperation(arg interface{})
|
|||
glog.V(3).Infof("volume %q no longer needs recycling, skipping", volume.Name)
|
||||
return
|
||||
}
|
||||
pods, used, err := ctrl.isVolumeUsed(newVolume)
|
||||
if err != nil {
|
||||
glog.V(3).Infof("can't recycle volume %q: %v", volume.Name, err)
|
||||
return
|
||||
}
|
||||
if used {
|
||||
msg := fmt.Sprintf("Volume is used by pods: %s", strings.Join(pods, ","))
|
||||
glog.V(3).Infof("can't recycle volume %q: %s", volume.Name, msg)
|
||||
ctrl.eventRecorder.Event(volume, v1.EventTypeNormal, events.VolumeFailedRecycle, msg)
|
||||
return
|
||||
}
|
||||
|
||||
// Use the newest volume copy, this will save us from version conflicts on
|
||||
// saving.
|
||||
|
@ -1233,6 +1249,32 @@ func (ctrl *PersistentVolumeController) isVolumeReleased(volume *v1.PersistentVo
|
|||
return true, nil
|
||||
}
|
||||
|
||||
// isVolumeUsed returns list of pods that use given PV.
|
||||
func (ctrl *PersistentVolumeController) isVolumeUsed(pv *v1.PersistentVolume) ([]string, bool, error) {
|
||||
if pv.Spec.ClaimRef == nil {
|
||||
return nil, false, nil
|
||||
}
|
||||
claimName := pv.Spec.ClaimRef.Name
|
||||
|
||||
podNames := sets.NewString()
|
||||
pods, err := ctrl.podLister.Pods(pv.Spec.ClaimRef.Namespace).List(labels.Everything())
|
||||
if err != nil {
|
||||
return nil, false, fmt.Errorf("error listing pods: %s", err)
|
||||
}
|
||||
for _, pod := range pods {
|
||||
if volumehelper.IsPodTerminated(pod, pod.Status) {
|
||||
continue
|
||||
}
|
||||
for i := range pod.Spec.Volumes {
|
||||
usedPV := &pod.Spec.Volumes[i]
|
||||
if usedPV.PersistentVolumeClaim != nil && usedPV.PersistentVolumeClaim.ClaimName == claimName {
|
||||
podNames.Insert(pod.Namespace + "/" + pod.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
return podNames.List(), podNames.Len() != 0, nil
|
||||
}
|
||||
|
||||
// doDeleteVolume finds appropriate delete plugin and deletes given volume. It
|
||||
// returns 'true', when the volume was deleted and 'false' when the volume
|
||||
// cannot be deleted because of the deleter is external. No error should be
|
||||
|
|
|
@ -62,6 +62,7 @@ type ControllerParameters struct {
|
|||
VolumeInformer coreinformers.PersistentVolumeInformer
|
||||
ClaimInformer coreinformers.PersistentVolumeClaimInformer
|
||||
ClassInformer storageinformers.StorageClassInformer
|
||||
PodInformer coreinformers.PodInformer
|
||||
EventRecorder record.EventRecorder
|
||||
EnableDynamicProvisioning bool
|
||||
}
|
||||
|
@ -119,6 +120,8 @@ func NewController(p ControllerParameters) (*PersistentVolumeController, error)
|
|||
|
||||
controller.classLister = p.ClassInformer.Lister()
|
||||
controller.classListerSynced = p.ClassInformer.Informer().HasSynced
|
||||
controller.podLister = p.PodInformer.Lister()
|
||||
controller.podListerSynced = p.PodInformer.Informer().HasSynced
|
||||
return controller, nil
|
||||
}
|
||||
|
||||
|
@ -265,7 +268,7 @@ func (ctrl *PersistentVolumeController) Run(stopCh <-chan struct{}) {
|
|||
glog.Infof("Starting persistent volume controller")
|
||||
defer glog.Infof("Shutting down peristent volume controller")
|
||||
|
||||
if !controller.WaitForCacheSync("persistent volume", stopCh, ctrl.volumeListerSynced, ctrl.claimListerSynced, ctrl.classListerSynced) {
|
||||
if !controller.WaitForCacheSync("persistent volume", stopCh, ctrl.volumeListerSynced, ctrl.claimListerSynced, ctrl.classListerSynced, ctrl.podListerSynced) {
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@ import (
|
|||
|
||||
"k8s.io/api/core/v1"
|
||||
storage "k8s.io/api/storage/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
// Test single call to syncVolume, expecting recycling to happen.
|
||||
|
@ -29,6 +30,44 @@ import (
|
|||
// 2. Call the syncVolume *once*.
|
||||
// 3. Compare resulting volumes with expected volumes.
|
||||
func TestRecycleSync(t *testing.T) {
|
||||
runningPod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "runningPod",
|
||||
Namespace: testNamespace,
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Volumes: []v1.Volume{
|
||||
{
|
||||
Name: "vol1",
|
||||
VolumeSource: v1.VolumeSource{
|
||||
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
|
||||
ClaimName: "runningClaim",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Status: v1.PodStatus{
|
||||
Phase: v1.PodRunning,
|
||||
},
|
||||
}
|
||||
|
||||
pendingPod := runningPod.DeepCopy()
|
||||
pendingPod.Name = "pendingPod"
|
||||
pendingPod.Status.Phase = v1.PodPending
|
||||
pendingPod.Spec.Volumes[0].PersistentVolumeClaim.ClaimName = "pendingClaim"
|
||||
|
||||
completedPod := runningPod.DeepCopy()
|
||||
completedPod.Name = "completedPod"
|
||||
completedPod.Status.Phase = v1.PodSucceeded
|
||||
completedPod.Spec.Volumes[0].PersistentVolumeClaim.ClaimName = "completedClaim"
|
||||
|
||||
pods := []*v1.Pod{
|
||||
runningPod,
|
||||
pendingPod,
|
||||
completedPod,
|
||||
}
|
||||
|
||||
tests := []controllerTest{
|
||||
{
|
||||
// recycle volume bound by controller
|
||||
|
@ -160,8 +199,38 @@ func TestRecycleSync(t *testing.T) {
|
|||
noclaims,
|
||||
[]string{"Warning VolumeUnknownReclaimPolicy"}, noerrors, testSyncVolume,
|
||||
},
|
||||
{
|
||||
// volume is used by a running pod - failure expected
|
||||
"6-11 - used by running pod",
|
||||
newVolumeArray("volume6-11", "1Gi", "uid6-11", "runningClaim", v1.VolumeBound, v1.PersistentVolumeReclaimRecycle, classEmpty, annBoundByController),
|
||||
newVolumeArray("volume6-11", "1Gi", "uid6-11", "runningClaim", v1.VolumeReleased, v1.PersistentVolumeReclaimRecycle, classEmpty, annBoundByController),
|
||||
noclaims,
|
||||
noclaims,
|
||||
[]string{"Normal VolumeFailedRecycle"}, noerrors, testSyncVolume,
|
||||
},
|
||||
{
|
||||
// volume is used by a pending pod - failure expected
|
||||
"6-12 - used by pending pod",
|
||||
newVolumeArray("volume6-12", "1Gi", "uid6-12", "pendingClaim", v1.VolumeBound, v1.PersistentVolumeReclaimRecycle, classEmpty, annBoundByController),
|
||||
newVolumeArray("volume6-12", "1Gi", "uid6-12", "pendingClaim", v1.VolumeReleased, v1.PersistentVolumeReclaimRecycle, classEmpty, annBoundByController),
|
||||
noclaims,
|
||||
noclaims,
|
||||
[]string{"Normal VolumeFailedRecycle"}, noerrors, testSyncVolume,
|
||||
},
|
||||
{
|
||||
// volume is used by a completed pod - recycle succeeds
|
||||
"6-13 - used by completed pod",
|
||||
newVolumeArray("volume6-13", "1Gi", "uid6-13", "completedClaim", v1.VolumeBound, v1.PersistentVolumeReclaimRecycle, classEmpty, annBoundByController),
|
||||
newVolumeArray("volume6-13", "1Gi", "", "", v1.VolumeAvailable, v1.PersistentVolumeReclaimRecycle, classEmpty),
|
||||
noclaims,
|
||||
noclaims,
|
||||
noevents, noerrors,
|
||||
// Inject recycler into the controller and call syncVolume. The
|
||||
// recycler simulates one recycle() call that succeeds.
|
||||
wrapTestWithReclaimCalls(operationRecycle, []error{nil}, testSyncVolume),
|
||||
},
|
||||
}
|
||||
runSyncTests(t, tests, []*storage.StorageClass{})
|
||||
runSyncTests(t, tests, []*storage.StorageClass{}, pods)
|
||||
}
|
||||
|
||||
// Test multiple calls to syncClaim/syncVolume and periodic sync of all
|
||||
|
|
|
@ -136,6 +136,7 @@ func setupNodes(t *testing.T, nsName string, numberOfNodes int) *testConfig {
|
|||
VolumeInformer: informers.Core().V1().PersistentVolumes(),
|
||||
ClaimInformer: informers.Core().V1().PersistentVolumeClaims(),
|
||||
ClassInformer: informers.Storage().V1().StorageClasses(),
|
||||
PodInformer: informers.Core().V1().Pods(),
|
||||
EventRecorder: nil, // TODO: add one so we can test PV events
|
||||
EnableDynamicProvisioning: true,
|
||||
}
|
||||
|
|
|
@ -1135,6 +1135,7 @@ func createClients(ns *v1.Namespace, t *testing.T, s *httptest.Server, syncPerio
|
|||
VolumeInformer: informers.Core().V1().PersistentVolumes(),
|
||||
ClaimInformer: informers.Core().V1().PersistentVolumeClaims(),
|
||||
ClassInformer: informers.Storage().V1().StorageClasses(),
|
||||
PodInformer: informers.Core().V1().Pods(),
|
||||
EnableDynamicProvisioning: true,
|
||||
})
|
||||
if err != nil {
|
||||
|
|
Loading…
Reference in New Issue