diff --git a/test/e2e/storage/BUILD b/test/e2e/storage/BUILD index b18f6c52c6..34162b8673 100644 --- a/test/e2e/storage/BUILD +++ b/test/e2e/storage/BUILD @@ -60,6 +60,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/version:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/version:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authentication/serviceaccount:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", diff --git a/test/e2e/storage/persistent_volumes-local.go b/test/e2e/storage/persistent_volumes-local.go index 08d4cf0400..d6427b0b1d 100644 --- a/test/e2e/storage/persistent_volumes-local.go +++ b/test/e2e/storage/persistent_volumes-local.go @@ -21,6 +21,7 @@ import ( "path/filepath" "strconv" "strings" + "sync" "time" . "github.com/onsi/ginkgo" @@ -29,12 +30,14 @@ import ( appsv1 "k8s.io/api/apps/v1" "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" clientset "k8s.io/client-go/kubernetes" "k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/storage/utils" @@ -432,8 +435,203 @@ var _ = utils.SIGDescribe("PersistentVolumes-local ", func() { }) }) + Context("Stress with local volumes [Serial]", func() { + var ( + allLocalVolumes = make(map[string][]*localTestVolume) + volType = TmpfsLocalVolumeType + stopCh = make(chan struct{}) + wg sync.WaitGroup + ) + + const ( + volsPerNode = 10 // Make this non-divisable by volsPerPod to increase changes of partial binding failure + volsPerPod = 3 + podsFactor = 4 + ) + + BeforeEach(func() { + setupStorageClass(config, &waitMode) + for _, node := range config.nodes { + By(fmt.Sprintf("Setting up %d local volumes on node %q", volsPerNode, node.Name)) + allLocalVolumes[node.Name] = setupLocalVolumes(config, volType, &node, volsPerNode) + } + By(fmt.Sprintf("Create %d PVs", volsPerNode*len(config.nodes))) + var err error + for _, localVolumes := range allLocalVolumes { + for _, localVolume := range localVolumes { + pvConfig := makeLocalPVConfig(config, localVolume) + localVolume.pv, err = framework.CreatePV(config.client, framework.MakePersistentVolume(pvConfig)) + framework.ExpectNoError(err) + } + } + By("Start a goroutine to recycle unbound PVs") + wg.Add(1) + go func() { + defer wg.Done() + w, err := config.client.CoreV1().PersistentVolumes().Watch(metav1.ListOptions{}) + framework.ExpectNoError(err) + if w == nil { + return + } + defer w.Stop() + for { + select { + case event := <-w.ResultChan(): + if event.Type != watch.Modified { + continue + } + pv, ok := event.Object.(*v1.PersistentVolume) + if !ok { + continue + } + if pv.Status.Phase == v1.VolumeBound || pv.Status.Phase == v1.VolumeAvailable { + continue + } + pv, err = config.client.CoreV1().PersistentVolumes().Get(pv.Name, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + continue + } + // Delete and create a new PV for same local volume storage + By(fmt.Sprintf("Delete %q and create a new PV for same local volume storage", pv.Name)) + for _, localVolumes := range allLocalVolumes { + for _, localVolume := range localVolumes { + if localVolume.pv.Name != pv.Name { + continue + } + err = config.client.CoreV1().PersistentVolumes().Delete(pv.Name, &metav1.DeleteOptions{}) + framework.ExpectNoError(err) + pvConfig := makeLocalPVConfig(config, localVolume) + localVolume.pv, err = framework.CreatePV(config.client, framework.MakePersistentVolume(pvConfig)) + framework.ExpectNoError(err) + } + } + case <-stopCh: + return + } + } + }() + }) + + AfterEach(func() { + for nodeName, localVolumes := range allLocalVolumes { + By(fmt.Sprintf("Cleaning up %d local volumes on node %q", len(localVolumes), nodeName)) + cleanupLocalVolumes(config, localVolumes) + } + cleanupStorageClass(config) + By("Wait for recycle goroutine to finish") + close(stopCh) + wg.Wait() + }) + + It("should be able to process many pods and reuse local volumes", func() { + var ( + podsLock sync.Mutex + // Have one extra pod pending + numConcurrentPods = volsPerNode/volsPerPod*len(config.nodes) + 1 + totalPods = numConcurrentPods * podsFactor + numCreated = 0 + numFinished = 0 + pods = map[string]*v1.Pod{} + ) + + // Create pods gradually instead of all at once because scheduler has + // exponential backoff + By(fmt.Sprintf("Creating %v pods periodically", numConcurrentPods)) + stop := make(chan struct{}) + go wait.Until(func() { + podsLock.Lock() + defer podsLock.Unlock() + + if numCreated >= totalPods { + // Created all the pods for the test + return + } + + if len(pods) > numConcurrentPods/2 { + // Too many outstanding pods + return + } + + for i := 0; i < numConcurrentPods; i++ { + pvcs := []*v1.PersistentVolumeClaim{} + for j := 0; j < volsPerPod; j++ { + pvc := framework.MakePersistentVolumeClaim(makeLocalPVCConfig(config, volType), config.ns) + pvc, err := framework.CreatePVC(config.client, config.ns, pvc) + framework.ExpectNoError(err) + pvcs = append(pvcs, pvc) + } + + pod := framework.MakeSecPod(config.ns, pvcs, false, "sleep 1", false, false, selinuxLabel, nil) + pod, err := config.client.CoreV1().Pods(config.ns).Create(pod) + Expect(err).NotTo(HaveOccurred()) + pods[pod.Name] = pod + numCreated++ + } + }, 2*time.Second, stop) + + defer func() { + close(stop) + podsLock.Lock() + defer podsLock.Unlock() + + for _, pod := range pods { + if err := deletePodAndPVCs(config, pod); err != nil { + framework.Logf("Deleting pod %v failed: %v", pod.Name, err) + } + } + }() + + By("Waiting for all pods to complete successfully") + err := wait.PollImmediate(time.Second, 5*time.Minute, func() (done bool, err error) { + podsList, err := config.client.CoreV1().Pods(config.ns).List(metav1.ListOptions{}) + if err != nil { + return false, err + } + + podsLock.Lock() + defer podsLock.Unlock() + + for _, pod := range podsList.Items { + switch pod.Status.Phase { + case v1.PodSucceeded: + // Delete pod and its PVCs + if err := deletePodAndPVCs(config, &pod); err != nil { + return false, err + } + delete(pods, pod.Name) + numFinished++ + framework.Logf("%v/%v pods finished", numFinished, totalPods) + case v1.PodFailed: + case v1.PodUnknown: + return false, fmt.Errorf("pod %v is in %v phase", pod.Name, pod.Status.Phase) + } + } + + return numFinished == totalPods, nil + }) + Expect(err).ToNot(HaveOccurred()) + }) + }) }) +func deletePodAndPVCs(config *localTestConfig, pod *v1.Pod) error { + framework.Logf("Deleting pod %v", pod.Name) + if err := config.client.CoreV1().Pods(config.ns).Delete(pod.Name, nil); err != nil { + return err + } + + // Delete PVCs + for _, vol := range pod.Spec.Volumes { + pvcSource := vol.VolumeSource.PersistentVolumeClaim + if pvcSource != nil { + if err := framework.DeletePersistentVolumeClaim(config.client, pvcSource.ClaimName, config.ns); err != nil { + return err + } + } + } + return nil +} + type makeLocalPodWith func(config *localTestConfig, volume *localTestVolume, nodeName string) *v1.Pod func testPodWithNodeConflict(config *localTestConfig, testVolType localVolumeType, nodeName string, makeLocalPodFunc makeLocalPodWith, bindingMode storagev1.VolumeBindingMode) {