mirror of https://github.com/k3s-io/k3s
Merge pull request #71074 from jsafrane/volume-manager-races
Fix race between MountVolume and UnmountDevicepull/58/head
commit
f877b2257a
|
@ -24,11 +24,10 @@ import (
|
|||
"fmt"
|
||||
"sync"
|
||||
|
||||
"k8s.io/klog"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/klog"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
"k8s.io/kubernetes/pkg/volume/util"
|
||||
|
@ -568,7 +567,9 @@ func (asw *actualStateOfWorld) SetVolumeGloballyMounted(
|
|||
|
||||
volumeObj.globallyMounted = globallyMounted
|
||||
volumeObj.deviceMountPath = deviceMountPath
|
||||
volumeObj.devicePath = devicePath
|
||||
if devicePath != "" {
|
||||
volumeObj.devicePath = devicePath
|
||||
}
|
||||
asw.attachedVolumes[volumeName] = volumeObj
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -60,6 +60,7 @@ go_test(
|
|||
"//staging/src/k8s.io/client-go/testing:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
|
||||
"//vendor/github.com/stretchr/testify/assert:go_default_library",
|
||||
"//vendor/k8s.io/klog:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
|
|
|
@ -33,6 +33,7 @@ import (
|
|||
"k8s.io/client-go/kubernetes/fake"
|
||||
core "k8s.io/client-go/testing"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/klog"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/kubelet/volumemanager/cache"
|
||||
"k8s.io/kubernetes/pkg/util/mount"
|
||||
|
@ -1120,7 +1121,7 @@ func createTestClient() *fake.Clientset {
|
|||
VolumesAttached: []v1.AttachedVolume{
|
||||
{
|
||||
Name: "fake-plugin/fake-device1",
|
||||
DevicePath: "fake/path",
|
||||
DevicePath: "/fake/path",
|
||||
},
|
||||
}},
|
||||
}, nil
|
||||
|
@ -1161,3 +1162,96 @@ func createtestClientWithPVPVC(pv *v1.PersistentVolume, pvc *v1.PersistentVolume
|
|||
})
|
||||
return fakeClient
|
||||
}
|
||||
|
||||
func Test_Run_Positive_VolumeMountControllerAttachEnabledRace(t *testing.T) {
|
||||
// Arrange
|
||||
volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t)
|
||||
|
||||
dsw := cache.NewDesiredStateOfWorld(volumePluginMgr)
|
||||
asw := cache.NewActualStateOfWorld(nodeName, volumePluginMgr)
|
||||
kubeClient := createTestClient()
|
||||
fakeRecorder := &record.FakeRecorder{}
|
||||
fakeHandler := volumetesting.NewBlockVolumePathHandler()
|
||||
oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
|
||||
kubeClient,
|
||||
volumePluginMgr,
|
||||
fakeRecorder,
|
||||
false, /* checkNodeCapabilitiesBeforeMount */
|
||||
fakeHandler))
|
||||
reconciler := NewReconciler(
|
||||
kubeClient,
|
||||
true, /* controllerAttachDetachEnabled */
|
||||
reconcilerLoopSleepDuration,
|
||||
reconcilerSyncStatesSleepPeriod,
|
||||
waitForAttachTimeout,
|
||||
nodeName,
|
||||
dsw,
|
||||
asw,
|
||||
hasAddedPods,
|
||||
oex,
|
||||
&mount.FakeMounter{},
|
||||
volumePluginMgr,
|
||||
kubeletPodsDir)
|
||||
pod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "pod1",
|
||||
UID: "pod1uid",
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Volumes: []v1.Volume{
|
||||
{
|
||||
Name: "volume-name",
|
||||
VolumeSource: v1.VolumeSource{
|
||||
GCEPersistentDisk: &v1.GCEPersistentDiskVolumeSource{
|
||||
PDName: "fake-device1",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// Some steps are executes out of order in callbacks, follow the numbers.
|
||||
|
||||
// 1. Add a volume to DSW and wait until it's mounted
|
||||
volumeSpec := &volume.Spec{Volume: &pod.Spec.Volumes[0]}
|
||||
podName := util.GetUniquePodName(pod)
|
||||
generatedVolumeName, err := dsw.AddPodToVolume(
|
||||
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
|
||||
dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName})
|
||||
|
||||
if err != nil {
|
||||
t.Fatalf("AddPodToVolume failed. Expected: <no error> Actual: <%v>", err)
|
||||
}
|
||||
runReconciler(reconciler)
|
||||
waitForMount(t, fakePlugin, generatedVolumeName, asw)
|
||||
|
||||
finished := make(chan interface{})
|
||||
fakePlugin.UnmountDeviceHook = func(mountPath string) error {
|
||||
// Act:
|
||||
// 3. While a volume is being unmounted, add it back to the desired state of world
|
||||
klog.Infof("UnmountDevice called")
|
||||
generatedVolumeName, err = dsw.AddPodToVolume(
|
||||
podName, pod, volumeSpec, volumeSpec.Name(), "" /* volumeGidValue */)
|
||||
dsw.MarkVolumesReportedInUse([]v1.UniqueVolumeName{generatedVolumeName})
|
||||
return nil
|
||||
}
|
||||
|
||||
fakePlugin.WaitForAttachHook = func(spec *volume.Spec, devicePath string, pod *v1.Pod, spectimeout time.Duration) (string, error) {
|
||||
// Assert
|
||||
// 4. When the volume is mounted again, expect that UnmountDevice operation did not clear devicePath
|
||||
if devicePath == "" {
|
||||
t.Errorf("Expected WaitForAttach called with devicePath from Node.Status")
|
||||
close(finished)
|
||||
return "", fmt.Errorf("Expected devicePath from Node.Status")
|
||||
}
|
||||
close(finished)
|
||||
return devicePath, nil
|
||||
}
|
||||
|
||||
// 2. Delete the volume from DSW (and wait for callbacks)
|
||||
dsw.DeletePodFromVolume(podName, generatedVolumeName)
|
||||
|
||||
<-finished
|
||||
waitForMount(t, fakePlugin, generatedVolumeName, asw)
|
||||
}
|
||||
|
|
|
@ -250,6 +250,10 @@ type FakeVolumePlugin struct {
|
|||
LimitKey string
|
||||
ProvisionDelaySeconds int
|
||||
|
||||
// Add callbacks as needed
|
||||
WaitForAttachHook func(spec *Spec, devicePath string, pod *v1.Pod, spectimeout time.Duration) (string, error)
|
||||
UnmountDeviceHook func(globalMountPath string) error
|
||||
|
||||
Mounters []*FakeVolume
|
||||
Unmounters []*FakeVolume
|
||||
Attachers []*FakeVolume
|
||||
|
@ -269,7 +273,10 @@ var _ DeviceMountableVolumePlugin = &FakeVolumePlugin{}
|
|||
var _ FSResizableVolumePlugin = &FakeVolumePlugin{}
|
||||
|
||||
func (plugin *FakeVolumePlugin) getFakeVolume(list *[]*FakeVolume) *FakeVolume {
|
||||
volume := &FakeVolume{}
|
||||
volume := &FakeVolume{
|
||||
WaitForAttachHook: plugin.WaitForAttachHook,
|
||||
UnmountDeviceHook: plugin.UnmountDeviceHook,
|
||||
}
|
||||
*list = append(*list, volume)
|
||||
return volume
|
||||
}
|
||||
|
@ -551,6 +558,10 @@ type FakeVolume struct {
|
|||
Plugin *FakeVolumePlugin
|
||||
MetricsNil
|
||||
|
||||
// Add callbacks as needed
|
||||
WaitForAttachHook func(spec *Spec, devicePath string, pod *v1.Pod, spectimeout time.Duration) (string, error)
|
||||
UnmountDeviceHook func(globalMountPath string) error
|
||||
|
||||
SetUpCallCount int
|
||||
TearDownCallCount int
|
||||
AttachCallCount int
|
||||
|
@ -724,6 +735,9 @@ func (fv *FakeVolume) WaitForAttach(spec *Spec, devicePath string, pod *v1.Pod,
|
|||
fv.Lock()
|
||||
defer fv.Unlock()
|
||||
fv.WaitForAttachCallCount++
|
||||
if fv.WaitForAttachHook != nil {
|
||||
return fv.WaitForAttachHook(spec, devicePath, pod, spectimeout)
|
||||
}
|
||||
return "/dev/sdb", nil
|
||||
}
|
||||
|
||||
|
@ -776,6 +790,9 @@ func (fv *FakeVolume) UnmountDevice(globalMountPath string) error {
|
|||
fv.Lock()
|
||||
defer fv.Unlock()
|
||||
fv.UnmountDeviceCallCount++
|
||||
if fv.UnmountDeviceHook != nil {
|
||||
return fv.UnmountDeviceHook(globalMountPath)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue