Add a new stress test to replace old provisioner stress test

pull/564/head
Yecheng Fu 2019-01-23 13:49:41 +08:00
parent b7a33511e5
commit 513ae63bf6
2 changed files with 199 additions and 0 deletions

View File

@ -60,6 +60,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/version:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/version:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/version:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/version:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/serviceaccount:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authentication/serviceaccount:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",

View File

@ -21,6 +21,7 @@ import (
"path/filepath" "path/filepath"
"strconv" "strconv"
"strings" "strings"
"sync"
"time" "time"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
@ -29,12 +30,14 @@ import (
appsv1 "k8s.io/api/apps/v1" appsv1 "k8s.io/api/apps/v1"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
storagev1 "k8s.io/api/storage/v1" storagev1 "k8s.io/api/storage/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors" utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
"k8s.io/kubernetes/test/e2e/storage/utils" "k8s.io/kubernetes/test/e2e/storage/utils"
@ -432,8 +435,203 @@ var _ = utils.SIGDescribe("PersistentVolumes-local ", func() {
}) })
}) })
Context("Stress with local volumes [Serial]", func() {
var (
allLocalVolumes = make(map[string][]*localTestVolume)
volType = TmpfsLocalVolumeType
stopCh = make(chan struct{})
wg sync.WaitGroup
)
const (
volsPerNode = 10 // Make this non-divisable by volsPerPod to increase changes of partial binding failure
volsPerPod = 3
podsFactor = 4
)
BeforeEach(func() {
setupStorageClass(config, &waitMode)
for _, node := range config.nodes {
By(fmt.Sprintf("Setting up %d local volumes on node %q", volsPerNode, node.Name))
allLocalVolumes[node.Name] = setupLocalVolumes(config, volType, &node, volsPerNode)
}
By(fmt.Sprintf("Create %d PVs", volsPerNode*len(config.nodes)))
var err error
for _, localVolumes := range allLocalVolumes {
for _, localVolume := range localVolumes {
pvConfig := makeLocalPVConfig(config, localVolume)
localVolume.pv, err = framework.CreatePV(config.client, framework.MakePersistentVolume(pvConfig))
framework.ExpectNoError(err)
}
}
By("Start a goroutine to recycle unbound PVs")
wg.Add(1)
go func() {
defer wg.Done()
w, err := config.client.CoreV1().PersistentVolumes().Watch(metav1.ListOptions{})
framework.ExpectNoError(err)
if w == nil {
return
}
defer w.Stop()
for {
select {
case event := <-w.ResultChan():
if event.Type != watch.Modified {
continue
}
pv, ok := event.Object.(*v1.PersistentVolume)
if !ok {
continue
}
if pv.Status.Phase == v1.VolumeBound || pv.Status.Phase == v1.VolumeAvailable {
continue
}
pv, err = config.client.CoreV1().PersistentVolumes().Get(pv.Name, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
continue
}
// Delete and create a new PV for same local volume storage
By(fmt.Sprintf("Delete %q and create a new PV for same local volume storage", pv.Name))
for _, localVolumes := range allLocalVolumes {
for _, localVolume := range localVolumes {
if localVolume.pv.Name != pv.Name {
continue
}
err = config.client.CoreV1().PersistentVolumes().Delete(pv.Name, &metav1.DeleteOptions{})
framework.ExpectNoError(err)
pvConfig := makeLocalPVConfig(config, localVolume)
localVolume.pv, err = framework.CreatePV(config.client, framework.MakePersistentVolume(pvConfig))
framework.ExpectNoError(err)
}
}
case <-stopCh:
return
}
}
}()
}) })
AfterEach(func() {
for nodeName, localVolumes := range allLocalVolumes {
By(fmt.Sprintf("Cleaning up %d local volumes on node %q", len(localVolumes), nodeName))
cleanupLocalVolumes(config, localVolumes)
}
cleanupStorageClass(config)
By("Wait for recycle goroutine to finish")
close(stopCh)
wg.Wait()
})
It("should be able to process many pods and reuse local volumes", func() {
var (
podsLock sync.Mutex
// Have one extra pod pending
numConcurrentPods = volsPerNode/volsPerPod*len(config.nodes) + 1
totalPods = numConcurrentPods * podsFactor
numCreated = 0
numFinished = 0
pods = map[string]*v1.Pod{}
)
// Create pods gradually instead of all at once because scheduler has
// exponential backoff
By(fmt.Sprintf("Creating %v pods periodically", numConcurrentPods))
stop := make(chan struct{})
go wait.Until(func() {
podsLock.Lock()
defer podsLock.Unlock()
if numCreated >= totalPods {
// Created all the pods for the test
return
}
if len(pods) > numConcurrentPods/2 {
// Too many outstanding pods
return
}
for i := 0; i < numConcurrentPods; i++ {
pvcs := []*v1.PersistentVolumeClaim{}
for j := 0; j < volsPerPod; j++ {
pvc := framework.MakePersistentVolumeClaim(makeLocalPVCConfig(config, volType), config.ns)
pvc, err := framework.CreatePVC(config.client, config.ns, pvc)
framework.ExpectNoError(err)
pvcs = append(pvcs, pvc)
}
pod := framework.MakeSecPod(config.ns, pvcs, false, "sleep 1", false, false, selinuxLabel, nil)
pod, err := config.client.CoreV1().Pods(config.ns).Create(pod)
Expect(err).NotTo(HaveOccurred())
pods[pod.Name] = pod
numCreated++
}
}, 2*time.Second, stop)
defer func() {
close(stop)
podsLock.Lock()
defer podsLock.Unlock()
for _, pod := range pods {
if err := deletePodAndPVCs(config, pod); err != nil {
framework.Logf("Deleting pod %v failed: %v", pod.Name, err)
}
}
}()
By("Waiting for all pods to complete successfully")
err := wait.PollImmediate(time.Second, 5*time.Minute, func() (done bool, err error) {
podsList, err := config.client.CoreV1().Pods(config.ns).List(metav1.ListOptions{})
if err != nil {
return false, err
}
podsLock.Lock()
defer podsLock.Unlock()
for _, pod := range podsList.Items {
switch pod.Status.Phase {
case v1.PodSucceeded:
// Delete pod and its PVCs
if err := deletePodAndPVCs(config, &pod); err != nil {
return false, err
}
delete(pods, pod.Name)
numFinished++
framework.Logf("%v/%v pods finished", numFinished, totalPods)
case v1.PodFailed:
case v1.PodUnknown:
return false, fmt.Errorf("pod %v is in %v phase", pod.Name, pod.Status.Phase)
}
}
return numFinished == totalPods, nil
})
Expect(err).ToNot(HaveOccurred())
})
})
})
func deletePodAndPVCs(config *localTestConfig, pod *v1.Pod) error {
framework.Logf("Deleting pod %v", pod.Name)
if err := config.client.CoreV1().Pods(config.ns).Delete(pod.Name, nil); err != nil {
return err
}
// Delete PVCs
for _, vol := range pod.Spec.Volumes {
pvcSource := vol.VolumeSource.PersistentVolumeClaim
if pvcSource != nil {
if err := framework.DeletePersistentVolumeClaim(config.client, pvcSource.ClaimName, config.ns); err != nil {
return err
}
}
}
return nil
}
type makeLocalPodWith func(config *localTestConfig, volume *localTestVolume, nodeName string) *v1.Pod type makeLocalPodWith func(config *localTestConfig, volume *localTestVolume, nodeName string) *v1.Pod
func testPodWithNodeConflict(config *localTestConfig, testVolType localVolumeType, nodeName string, makeLocalPodFunc makeLocalPodWith, bindingMode storagev1.VolumeBindingMode) { func testPodWithNodeConflict(config *localTestConfig, testVolType localVolumeType, nodeName string, makeLocalPodFunc makeLocalPodWith, bindingMode storagev1.VolumeBindingMode) {