mirror of https://github.com/k3s-io/k3s
Merge pull request #26153 from jsafrane/intergation-binder-stress
Automatic merge from submit-queue volume controller: add configurable integration test to stress the binder The test tries to bind configured nr. of PVs to the same nr. of PVCs. '100' is used by default, which should take ~1-3 seconds (depends on log level). Periodic sync is needed in rare cases, which may add another 10 seconds. - cache from #25881 will help here and sync should not be needed at all. The test is configurable and may be reused to measure binder performance. Set KUBE_INTEGRATION_PERSISTENTVOLUME_* env. variables as described in persistent_volume_test.go and run the tests: ``` # compile $ cd test/integration $ godep go test -tags 'integration no-docker' -c # run the tests $ KUBE_INTEGRATION_PERSISTENTVOLUME_SYNC_PERIOD=10s KUBE_INTEGRATION_PERSISTENTVOLUME_OBJECTS=1000 time ./integration.test -test.run TestPersistentVolumeMultiPVsPVCs -v 2 ``` Log level '2' is useful to get timestamps of various events like 'TestPersistentVolumeMultiPVsPVCs: start' and 'TestPersistentVolumeMultiPVsPVCs: claims are bound'.pull/6/head
commit
0418a2c0ad
|
@ -22,10 +22,13 @@ import (
|
|||
"fmt"
|
||||
"math/rand"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/resource"
|
||||
"k8s.io/kubernetes/pkg/api/testapi"
|
||||
|
@ -44,6 +47,61 @@ func init() {
|
|||
requireEtcd()
|
||||
}
|
||||
|
||||
// Several tests in this file are configurable by enviroment variables:
|
||||
// KUBE_INTEGRATION_PV_OBJECTS - nr. of PVs/PVCs to be created
|
||||
// (100 by default)
|
||||
// KUBE_INTEGRATION_PV_SYNC_PERIOD - volume controller sync period
|
||||
// (10s by default)
|
||||
// KUBE_INTEGRATION_PV_END_SLEEP - for how long should
|
||||
// TestPersistentVolumeMultiPVsPVCs sleep when it's finished (0s by
|
||||
// default). This is useful to test how long does it take for periodic sync
|
||||
// to process bound PVs/PVCs.
|
||||
//
|
||||
const defaultObjectCount = 100
|
||||
const defaultSyncPeriod = 10 * time.Second
|
||||
|
||||
func getObjectCount() int {
|
||||
objectCount := defaultObjectCount
|
||||
if s := os.Getenv("KUBE_INTEGRATION_PV_OBJECTS"); s != "" {
|
||||
var err error
|
||||
objectCount, err = strconv.Atoi(s)
|
||||
if err != nil {
|
||||
glog.Fatalf("cannot parse value of KUBE_INTEGRATION_PV_OBJECTS: %v", err)
|
||||
}
|
||||
}
|
||||
glog.V(2).Infof("using KUBE_INTEGRATION_PV_OBJECTS=%d", objectCount)
|
||||
return objectCount
|
||||
}
|
||||
|
||||
func getSyncPeriod() time.Duration {
|
||||
period := defaultSyncPeriod
|
||||
if s := os.Getenv("KUBE_INTEGRATION_PV_SYNC_PERIOD"); s != "" {
|
||||
var err error
|
||||
period, err = time.ParseDuration(s)
|
||||
if err != nil {
|
||||
glog.Fatalf("cannot parse value of KUBE_INTEGRATION_PV_SYNC_PERIOD: %v", err)
|
||||
}
|
||||
}
|
||||
glog.V(2).Infof("using KUBE_INTEGRATION_PV_SYNC_PERIOD=%v", period)
|
||||
return period
|
||||
}
|
||||
|
||||
func testSleep() {
|
||||
var period time.Duration
|
||||
if s := os.Getenv("KUBE_INTEGRATION_PV_END_SLEEP"); s != "" {
|
||||
var err error
|
||||
period, err = time.ParseDuration(s)
|
||||
if err != nil {
|
||||
glog.Fatalf("cannot parse value of KUBE_INTEGRATION_PV_END_SLEEP: %v", err)
|
||||
}
|
||||
}
|
||||
glog.V(2).Infof("using KUBE_INTEGRATION_PV_END_SLEEP=%v", period)
|
||||
if period != 0 {
|
||||
time.Sleep(period)
|
||||
glog.V(2).Infof("sleep finished")
|
||||
}
|
||||
}
|
||||
|
||||
func TestPersistentVolumeRecycler(t *testing.T) {
|
||||
_, s := framework.RunAMaster(t)
|
||||
defer s.Close()
|
||||
|
@ -72,16 +130,16 @@ func TestPersistentVolumeRecycler(t *testing.T) {
|
|||
}
|
||||
|
||||
// wait until the controller pairs the volume and claim
|
||||
waitForPersistentVolumePhase(watchPV, api.VolumeBound)
|
||||
waitForPersistentVolumeClaimPhase(watchPVC, api.ClaimBound)
|
||||
waitForPersistentVolumePhase(testClient, pv.Name, watchPV, api.VolumeBound)
|
||||
waitForPersistentVolumeClaimPhase(testClient, pvc.Name, watchPVC, api.ClaimBound)
|
||||
|
||||
// deleting a claim releases the volume, after which it can be recycled
|
||||
if err := testClient.PersistentVolumeClaims(api.NamespaceDefault).Delete(pvc.Name, nil); err != nil {
|
||||
t.Errorf("error deleting claim %s", pvc.Name)
|
||||
}
|
||||
|
||||
waitForPersistentVolumePhase(watchPV, api.VolumeReleased)
|
||||
waitForPersistentVolumePhase(watchPV, api.VolumeAvailable)
|
||||
waitForPersistentVolumePhase(testClient, pv.Name, watchPV, api.VolumeReleased)
|
||||
waitForPersistentVolumePhase(testClient, pv.Name, watchPV, api.VolumeAvailable)
|
||||
|
||||
// end of Recycler test.
|
||||
// Deleter test begins now.
|
||||
|
@ -100,16 +158,15 @@ func TestPersistentVolumeRecycler(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Errorf("Failed to create PersistentVolumeClaim: %v", err)
|
||||
}
|
||||
|
||||
waitForPersistentVolumePhase(watchPV, api.VolumeBound)
|
||||
waitForPersistentVolumeClaimPhase(watchPVC, api.ClaimBound)
|
||||
waitForPersistentVolumePhase(testClient, pv.Name, watchPV, api.VolumeBound)
|
||||
waitForPersistentVolumeClaimPhase(testClient, pvc.Name, watchPVC, api.ClaimBound)
|
||||
|
||||
// deleting a claim releases the volume, after which it can be recycled
|
||||
if err := testClient.PersistentVolumeClaims(api.NamespaceDefault).Delete(pvc.Name, nil); err != nil {
|
||||
t.Errorf("error deleting claim %s", pvc.Name)
|
||||
}
|
||||
|
||||
waitForPersistentVolumePhase(watchPV, api.VolumeReleased)
|
||||
waitForPersistentVolumePhase(testClient, pv.Name, watchPV, api.VolumeReleased)
|
||||
|
||||
for {
|
||||
event := <-watchPV.ResultChan()
|
||||
|
@ -149,8 +206,8 @@ func TestPersistentVolumeRecycler(t *testing.T) {
|
|||
t.Fatalf("Unexpected error creating pv: %v", err)
|
||||
}
|
||||
|
||||
waitForPersistentVolumePhase(watchPV, api.VolumeBound)
|
||||
waitForPersistentVolumeClaimPhase(watchPVC, api.ClaimBound)
|
||||
waitForPersistentVolumePhase(testClient, pv.Name, watchPV, api.VolumeBound)
|
||||
waitForAnyPersistentVolumeClaimPhase(watchPVC, api.ClaimBound)
|
||||
|
||||
pv, err = testClient.PersistentVolumes().Get(pv.Name)
|
||||
if err != nil {
|
||||
|
@ -164,6 +221,8 @@ func TestPersistentVolumeRecycler(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// TestPersistentVolumeMultiPVs tests binding of one PVC to 100 PVs with
|
||||
// different size.
|
||||
func TestPersistentVolumeMultiPVs(t *testing.T) {
|
||||
_, s := framework.RunAMaster(t)
|
||||
defer s.Close()
|
||||
|
@ -176,7 +235,7 @@ func TestPersistentVolumeMultiPVs(t *testing.T) {
|
|||
controller.Run()
|
||||
defer controller.Stop()
|
||||
|
||||
maxPVs := 100
|
||||
maxPVs := getObjectCount()
|
||||
pvs := make([]*api.PersistentVolume, maxPVs)
|
||||
for i := 0; i < maxPVs; i++ {
|
||||
// This PV will be claimed, released, and deleted
|
||||
|
@ -200,10 +259,10 @@ func TestPersistentVolumeMultiPVs(t *testing.T) {
|
|||
}
|
||||
t.Log("claim created")
|
||||
|
||||
// wait until the controller pairs the volume and claim
|
||||
waitForPersistentVolumePhase(watchPV, api.VolumeBound)
|
||||
// wait until the binder pairs the claim with a volume
|
||||
waitForAnyPersistentVolumePhase(watchPV, api.VolumeBound)
|
||||
t.Log("volume bound")
|
||||
waitForPersistentVolumeClaimPhase(watchPVC, api.ClaimBound)
|
||||
waitForPersistentVolumeClaimPhase(testClient, pvc.Name, watchPVC, api.ClaimBound)
|
||||
t.Log("claim bound")
|
||||
|
||||
// only one PV is bound
|
||||
|
@ -239,12 +298,94 @@ func TestPersistentVolumeMultiPVs(t *testing.T) {
|
|||
}
|
||||
t.Log("claim deleted")
|
||||
|
||||
waitForPersistentVolumePhase(watchPV, api.VolumeReleased)
|
||||
waitForAnyPersistentVolumePhase(watchPV, api.VolumeReleased)
|
||||
t.Log("volumes released")
|
||||
|
||||
deleteAllEtcdKeys()
|
||||
}
|
||||
|
||||
// TestPersistentVolumeMultiPVsPVCs tests binding of 100 PVC to 100 PVs.
|
||||
// This test is configurable by KUBE_INTEGRATION_PV_* variables.
|
||||
func TestPersistentVolumeMultiPVsPVCs(t *testing.T) {
|
||||
_, s := framework.RunAMaster(t)
|
||||
defer s.Close()
|
||||
|
||||
deleteAllEtcdKeys()
|
||||
testClient, binder, watchPV, watchPVC := createClients(t, s)
|
||||
defer watchPV.Stop()
|
||||
defer watchPVC.Stop()
|
||||
|
||||
binder.Run()
|
||||
defer binder.Stop()
|
||||
|
||||
objCount := getObjectCount()
|
||||
pvs := make([]*api.PersistentVolume, objCount)
|
||||
pvcs := make([]*api.PersistentVolumeClaim, objCount)
|
||||
for i := 0; i < objCount; i++ {
|
||||
// This PV will be claimed, released, and deleted
|
||||
pvs[i] = createPV("pv-"+strconv.Itoa(i), "/tmp/foo"+strconv.Itoa(i), "1G",
|
||||
[]api.PersistentVolumeAccessMode{api.ReadWriteOnce}, api.PersistentVolumeReclaimRetain)
|
||||
pvcs[i] = createPVC("pvc-"+strconv.Itoa(i), "1G", []api.PersistentVolumeAccessMode{api.ReadWriteOnce})
|
||||
}
|
||||
|
||||
// Create PVs first
|
||||
glog.V(2).Infof("TestPersistentVolumeMultiPVsPVCs: start")
|
||||
|
||||
// Create the volumes in a separate goroutine to pop events from
|
||||
// watchPV early - it seems it has limited capacity and it gets stuck
|
||||
// with >3000 volumes.
|
||||
go func() {
|
||||
for i := 0; i < objCount; i++ {
|
||||
_, _ = testClient.PersistentVolumes().Create(pvs[i])
|
||||
}
|
||||
}()
|
||||
// Wait for them to get Available
|
||||
for i := 0; i < objCount; i++ {
|
||||
waitForAnyPersistentVolumePhase(watchPV, api.VolumeAvailable)
|
||||
glog.V(1).Infof("%d volumes available", i+1)
|
||||
}
|
||||
glog.V(2).Infof("TestPersistentVolumeMultiPVsPVCs: volumes are Available")
|
||||
|
||||
// Create the claims, again in a separate goroutine.
|
||||
go func() {
|
||||
for i := 0; i < objCount; i++ {
|
||||
_, _ = testClient.PersistentVolumeClaims(api.NamespaceDefault).Create(pvcs[i])
|
||||
}
|
||||
}()
|
||||
|
||||
// wait until the binder pairs all volumes
|
||||
for i := 0; i < objCount; i++ {
|
||||
waitForAnyPersistentVolumeClaimPhase(watchPVC, api.ClaimBound)
|
||||
glog.V(1).Infof("%d claims bound", i+1)
|
||||
}
|
||||
glog.V(2).Infof("TestPersistentVolumeMultiPVsPVCs: claims are bound")
|
||||
|
||||
// check that everything is bound to something
|
||||
for i := 0; i < objCount; i++ {
|
||||
pv, err := testClient.PersistentVolumes().Get(pvs[i].Name)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error getting pv: %v", err)
|
||||
}
|
||||
if pv.Spec.ClaimRef == nil {
|
||||
t.Fatalf("PV %q is not bound", pv.Name)
|
||||
}
|
||||
glog.V(2).Infof("PV %q is bound to PVC %q", pv.Name, pv.Spec.ClaimRef.Name)
|
||||
|
||||
pvc, err := testClient.PersistentVolumeClaims(api.NamespaceDefault).Get(pvcs[i].Name)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error getting pvc: %v", err)
|
||||
}
|
||||
if pvc.Spec.VolumeName == "" {
|
||||
t.Fatalf("PVC %q is not bound", pvc.Name)
|
||||
}
|
||||
glog.V(2).Infof("PVC %q is bound to PV %q", pvc.Name, pvc.Spec.VolumeName)
|
||||
}
|
||||
testSleep()
|
||||
deleteAllEtcdKeys()
|
||||
}
|
||||
|
||||
// TestPersistentVolumeMultiPVsDiffAccessModes tests binding of one PVC to two
|
||||
// PVs with different access modes.
|
||||
func TestPersistentVolumeMultiPVsDiffAccessModes(t *testing.T) {
|
||||
_, s := framework.RunAMaster(t)
|
||||
defer s.Close()
|
||||
|
@ -282,9 +423,9 @@ func TestPersistentVolumeMultiPVsDiffAccessModes(t *testing.T) {
|
|||
t.Log("claim created")
|
||||
|
||||
// wait until the controller pairs the volume and claim
|
||||
waitForPersistentVolumePhase(watchPV, api.VolumeBound)
|
||||
waitForAnyPersistentVolumePhase(watchPV, api.VolumeBound)
|
||||
t.Log("volume bound")
|
||||
waitForPersistentVolumeClaimPhase(watchPVC, api.ClaimBound)
|
||||
waitForPersistentVolumeClaimPhase(testClient, pvc.Name, watchPVC, api.ClaimBound)
|
||||
t.Log("claim bound")
|
||||
|
||||
// only RWM PV is bound
|
||||
|
@ -312,36 +453,88 @@ func TestPersistentVolumeMultiPVsDiffAccessModes(t *testing.T) {
|
|||
}
|
||||
t.Log("claim deleted")
|
||||
|
||||
waitForPersistentVolumePhase(watchPV, api.VolumeReleased)
|
||||
waitForAnyPersistentVolumePhase(watchPV, api.VolumeReleased)
|
||||
t.Log("volume released")
|
||||
|
||||
deleteAllEtcdKeys()
|
||||
}
|
||||
|
||||
func waitForPersistentVolumePhase(w watch.Interface, phase api.PersistentVolumePhase) {
|
||||
func waitForPersistentVolumePhase(client *clientset.Clientset, pvName string, w watch.Interface, phase api.PersistentVolumePhase) {
|
||||
// Check if the volume is already in requested phase
|
||||
volume, err := client.Core().PersistentVolumes().Get(pvName)
|
||||
if err == nil && volume.Status.Phase == phase {
|
||||
return
|
||||
}
|
||||
|
||||
// Wait for the phase
|
||||
for {
|
||||
event := <-w.ResultChan()
|
||||
volume := event.Object.(*api.PersistentVolume)
|
||||
if volume.Status.Phase == phase {
|
||||
volume, ok := event.Object.(*api.PersistentVolume)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if volume.Status.Phase == phase && volume.Name == pvName {
|
||||
glog.V(2).Infof("volume %q is %s", volume.Name, phase)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func waitForPersistentVolumeClaimPhase(w watch.Interface, phase api.PersistentVolumeClaimPhase) {
|
||||
func waitForPersistentVolumeClaimPhase(client *clientset.Clientset, claimName string, w watch.Interface, phase api.PersistentVolumeClaimPhase) {
|
||||
// Check if the claim is already in requested phase
|
||||
claim, err := client.Core().PersistentVolumeClaims(api.NamespaceDefault).Get(claimName)
|
||||
if err == nil && claim.Status.Phase == phase {
|
||||
return
|
||||
}
|
||||
|
||||
// Wait for the phase
|
||||
for {
|
||||
event := <-w.ResultChan()
|
||||
claim := event.Object.(*api.PersistentVolumeClaim)
|
||||
claim, ok := event.Object.(*api.PersistentVolumeClaim)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if claim.Status.Phase == phase && claim.Name == claimName {
|
||||
glog.V(2).Infof("claim %q is %s", claim.Name, phase)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func waitForAnyPersistentVolumePhase(w watch.Interface, phase api.PersistentVolumePhase) {
|
||||
for {
|
||||
event := <-w.ResultChan()
|
||||
volume, ok := event.Object.(*api.PersistentVolume)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if volume.Status.Phase == phase {
|
||||
glog.V(2).Infof("volume %q is %s", volume.Name, phase)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func waitForAnyPersistentVolumeClaimPhase(w watch.Interface, phase api.PersistentVolumeClaimPhase) {
|
||||
for {
|
||||
event := <-w.ResultChan()
|
||||
claim, ok := event.Object.(*api.PersistentVolumeClaim)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if claim.Status.Phase == phase {
|
||||
glog.V(2).Infof("claim %q is %s", claim.Name, phase)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func createClients(t *testing.T, s *httptest.Server) (*clientset.Clientset, *persistentvolumecontroller.PersistentVolumeController, watch.Interface, watch.Interface) {
|
||||
// Use higher QPS and Burst, there is a test for race condition below, which
|
||||
// creates many claims and default values were too low.
|
||||
testClient := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}, QPS: 1000, Burst: 100000})
|
||||
// Use higher QPS and Burst, there is a test for race conditions which
|
||||
// creates many objects and default values were too low.
|
||||
binderClient := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}, QPS: 1000000, Burst: 1000000})
|
||||
testClient := clientset.NewForConfigOrDie(&restclient.Config{Host: s.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}, QPS: 1000000, Burst: 1000000})
|
||||
|
||||
host := volumetest.NewFakeVolumeHost("/tmp/fake", nil, nil)
|
||||
plugins := []volume.VolumePlugin{&volumetest.FakeVolumePlugin{
|
||||
PluginName: "plugin-name",
|
||||
|
@ -356,7 +549,9 @@ func createClients(t *testing.T, s *httptest.Server) (*clientset.Clientset, *per
|
|||
Detachers: nil,
|
||||
}}
|
||||
cloud := &fake_cloud.FakeCloud{}
|
||||
ctrl := persistentvolumecontroller.NewPersistentVolumeController(testClient, 10*time.Second, nil, plugins, cloud, "", nil, nil, nil)
|
||||
|
||||
syncPeriod := getSyncPeriod()
|
||||
ctrl := persistentvolumecontroller.NewPersistentVolumeController(binderClient, syncPeriod, nil, plugins, cloud, "", nil, nil, nil)
|
||||
|
||||
watchPV, err := testClient.PersistentVolumes().Watch(api.ListOptions{})
|
||||
if err != nil {
|
||||
|
@ -384,7 +579,10 @@ func createPV(name, path, cap string, mode []api.PersistentVolumeAccessMode, rec
|
|||
|
||||
func createPVC(name, cap string, mode []api.PersistentVolumeAccessMode) *api.PersistentVolumeClaim {
|
||||
return &api.PersistentVolumeClaim{
|
||||
ObjectMeta: api.ObjectMeta{Name: name},
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: name,
|
||||
Namespace: api.NamespaceDefault,
|
||||
},
|
||||
Spec: api.PersistentVolumeClaimSpec{
|
||||
Resources: api.ResourceRequirements{Requests: api.ResourceList{api.ResourceName(api.ResourceStorage): resource.MustParse(cap)}},
|
||||
AccessModes: mode,
|
||||
|
|
Loading…
Reference in New Issue