2016-05-17 12:55:07 +00:00
|
|
|
/*
|
|
|
|
Copyright 2016 The Kubernetes Authors All rights reserved.
|
|
|
|
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
you may not use this file except in compliance with the License.
|
|
|
|
You may obtain a copy of the License at
|
|
|
|
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
See the License for the specific language governing permissions and
|
|
|
|
limitations under the License.
|
|
|
|
*/
|
|
|
|
|
|
|
|
package persistentvolume
|
|
|
|
|
|
|
|
import (
|
|
|
|
"errors"
|
|
|
|
"fmt"
|
|
|
|
"reflect"
|
|
|
|
"strconv"
|
2016-05-17 12:55:14 +00:00
|
|
|
"strings"
|
2016-05-17 12:55:07 +00:00
|
|
|
"sync"
|
2016-05-17 12:55:22 +00:00
|
|
|
"sync/atomic"
|
2016-05-17 12:55:07 +00:00
|
|
|
"testing"
|
2016-05-17 12:55:15 +00:00
|
|
|
"time"
|
2016-05-17 12:55:07 +00:00
|
|
|
|
|
|
|
"github.com/golang/glog"
|
|
|
|
|
|
|
|
"k8s.io/kubernetes/pkg/api"
|
|
|
|
"k8s.io/kubernetes/pkg/api/resource"
|
|
|
|
"k8s.io/kubernetes/pkg/api/testapi"
|
2016-05-19 20:52:29 +00:00
|
|
|
"k8s.io/kubernetes/pkg/api/unversioned"
|
2016-05-17 12:55:07 +00:00
|
|
|
"k8s.io/kubernetes/pkg/client/cache"
|
|
|
|
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
|
|
|
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
|
2016-05-17 12:55:14 +00:00
|
|
|
"k8s.io/kubernetes/pkg/client/record"
|
2016-05-17 12:55:07 +00:00
|
|
|
"k8s.io/kubernetes/pkg/client/testing/core"
|
|
|
|
"k8s.io/kubernetes/pkg/controller/framework"
|
|
|
|
"k8s.io/kubernetes/pkg/conversion"
|
|
|
|
"k8s.io/kubernetes/pkg/runtime"
|
|
|
|
"k8s.io/kubernetes/pkg/types"
|
|
|
|
"k8s.io/kubernetes/pkg/util/diff"
|
2016-05-17 12:55:22 +00:00
|
|
|
vol "k8s.io/kubernetes/pkg/volume"
|
2016-05-17 12:55:07 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
// This is a unit test framework for persistent volume controller.
|
|
|
|
// It fills the controller with test claims/volumes and can simulate these
|
|
|
|
// scenarios:
|
|
|
|
// 1) Call syncClaim/syncVolume once.
|
|
|
|
// 2) Call syncClaim/syncVolume several times (both simulating "claim/volume
|
|
|
|
// modified" events and periodic sync), until the controller settles down and
|
|
|
|
// does not modify anything.
|
|
|
|
// 3) Simulate almost real API server/etcd and call add/update/delete
|
|
|
|
// volume/claim.
|
|
|
|
// In all these scenarios, when the test finishes, the framework can compare
|
|
|
|
// resulting claims/volumes with list of expected claims/volumes and report
|
|
|
|
// differences.
|
|
|
|
|
|
|
|
// controllerTest contains a single controller test input.
|
|
|
|
// Each test has initial set of volumes and claims that are filled into the
|
|
|
|
// controller before the test starts. The test then contains a reference to
|
|
|
|
// function to call as the actual test. Available functions are:
|
|
|
|
// - testSyncClaim - calls syncClaim on the first claim in initialClaims.
|
|
|
|
// - testSyncClaimError - calls syncClaim on the first claim in initialClaims
|
|
|
|
// and expects an error to be returned.
|
|
|
|
// - testSyncVolume - calls syncVolume on the first volume in initialVolumes.
|
|
|
|
// - any custom function for specialized tests.
|
|
|
|
// The test then contains list of volumes/claims that are expected at the end
|
2016-05-17 12:55:14 +00:00
|
|
|
// of the test and list of generated events.
|
2016-05-17 12:55:07 +00:00
|
|
|
type controllerTest struct {
|
|
|
|
// Name of the test, for logging
|
|
|
|
name string
|
|
|
|
// Initial content of controller volume cache.
|
|
|
|
initialVolumes []*api.PersistentVolume
|
|
|
|
// Expected content of controller volume cache at the end of the test.
|
|
|
|
expectedVolumes []*api.PersistentVolume
|
|
|
|
// Initial content of controller claim cache.
|
|
|
|
initialClaims []*api.PersistentVolumeClaim
|
|
|
|
// Expected content of controller claim cache at the end of the test.
|
|
|
|
expectedClaims []*api.PersistentVolumeClaim
|
2016-05-17 12:55:14 +00:00
|
|
|
// Expected events - any event with prefix will pass, we don't check full
|
|
|
|
// event message.
|
|
|
|
expectedEvents []string
|
2016-05-17 12:55:28 +00:00
|
|
|
// Errors to produce on matching action
|
|
|
|
errors []reactorError
|
2016-05-17 12:55:07 +00:00
|
|
|
// Function to call as the test.
|
|
|
|
test testCall
|
|
|
|
}
|
|
|
|
|
|
|
|
type testCall func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error
|
|
|
|
|
|
|
|
const testNamespace = "default"
|
2016-05-17 12:55:26 +00:00
|
|
|
const mockPluginName = "MockVolumePlugin"
|
2016-05-17 12:55:07 +00:00
|
|
|
|
|
|
|
var versionConflictError = errors.New("VersionError")
|
|
|
|
var novolumes []*api.PersistentVolume
|
|
|
|
var noclaims []*api.PersistentVolumeClaim
|
2016-05-17 12:55:14 +00:00
|
|
|
var noevents = []string{}
|
2016-05-17 12:55:28 +00:00
|
|
|
var noerrors = []reactorError{}
|
2016-05-17 12:55:07 +00:00
|
|
|
|
|
|
|
// volumeReactor is a core.Reactor that simulates etcd and API server. It
|
|
|
|
// stores:
|
|
|
|
// - Latest version of claims volumes saved by the controller.
|
|
|
|
// - Queue of all saves (to simulate "volume/claim updated" events). This queue
|
|
|
|
// contains all intermediate state of an object - e.g. a claim.VolumeName
|
|
|
|
// is updated first and claim.Phase second. This queue will then contain both
|
|
|
|
// updates as separate entries.
|
|
|
|
// - Number of changes since the last call to volumeReactor.syncAll().
|
|
|
|
// - Optionally, volume and claim event sources. When set, all changed
|
|
|
|
// volumes/claims are sent as Modify event to these sources. These sources can
|
|
|
|
// be linked back to the controller watcher as "volume/claim updated" events.
|
2016-05-17 12:55:28 +00:00
|
|
|
// - Optionally, list of error that should be returned by reactor, simulating
|
|
|
|
// etcd / API server failures. These errors are evaluated in order and every
|
|
|
|
// error is returned only once. I.e. when the reactor finds matching
|
|
|
|
// reactorError, it return appropriate error and removes the reactorError from
|
|
|
|
// the list.
|
2016-05-17 12:55:07 +00:00
|
|
|
type volumeReactor struct {
|
|
|
|
volumes map[string]*api.PersistentVolume
|
|
|
|
claims map[string]*api.PersistentVolumeClaim
|
|
|
|
changedObjects []interface{}
|
|
|
|
changedSinceLastSync int
|
|
|
|
ctrl *PersistentVolumeController
|
|
|
|
volumeSource *framework.FakeControllerSource
|
|
|
|
claimSource *framework.FakeControllerSource
|
|
|
|
lock sync.Mutex
|
2016-05-17 12:55:28 +00:00
|
|
|
errors []reactorError
|
|
|
|
}
|
|
|
|
|
|
|
|
// reactorError is an error that is returned by test reactor (=simulated
|
|
|
|
// etcd+/API server) when an action performed by the reactor matches given verb
|
|
|
|
// ("get", "update", "create", "delete" or "*"") on given resource
|
|
|
|
// ("persistentvolumes", "persistentvolumeclaims" or "*").
|
|
|
|
type reactorError struct {
|
|
|
|
verb string
|
|
|
|
resource string
|
|
|
|
error error
|
2016-05-17 12:55:07 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// React is a callback called by fake kubeClient from the controller.
|
|
|
|
// In other words, every claim/volume change performed by the controller ends
|
|
|
|
// here.
|
|
|
|
// This callback checks versions of the updated objects and refuse those that
|
|
|
|
// are too old (simulating real etcd).
|
|
|
|
// All updated objects are stored locally to keep track of object versions and
|
|
|
|
// to evaluate test results.
|
|
|
|
// All updated objects are also inserted into changedObjects queue and
|
|
|
|
// optionally sent back to the controller via its watchers.
|
|
|
|
func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Object, err error) {
|
|
|
|
r.lock.Lock()
|
|
|
|
defer r.lock.Unlock()
|
|
|
|
|
|
|
|
glog.V(4).Infof("reactor got operation %q on %q", action.GetVerb(), action.GetResource())
|
|
|
|
|
2016-05-17 12:55:28 +00:00
|
|
|
// Inject error when requested
|
|
|
|
err = r.injectReactError(action)
|
|
|
|
if err != nil {
|
|
|
|
return true, nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Test did not requst to inject an error, continue simulating API server.
|
2016-05-17 12:55:07 +00:00
|
|
|
switch {
|
2016-05-17 12:55:26 +00:00
|
|
|
case action.Matches("create", "persistentvolumes"):
|
|
|
|
obj := action.(core.UpdateAction).GetObject()
|
|
|
|
volume := obj.(*api.PersistentVolume)
|
|
|
|
|
|
|
|
// check the volume does not exist
|
|
|
|
_, found := r.volumes[volume.Name]
|
|
|
|
if found {
|
|
|
|
return true, nil, fmt.Errorf("Cannot create volume %s: volume already exists", volume.Name)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Store the updated object to appropriate places.
|
|
|
|
if r.volumeSource != nil {
|
|
|
|
r.volumeSource.Add(volume)
|
|
|
|
}
|
|
|
|
r.volumes[volume.Name] = volume
|
|
|
|
r.changedObjects = append(r.changedObjects, volume)
|
|
|
|
r.changedSinceLastSync++
|
|
|
|
glog.V(4).Infof("created volume %s", volume.Name)
|
|
|
|
return true, volume, nil
|
|
|
|
|
2016-05-17 12:55:07 +00:00
|
|
|
case action.Matches("update", "persistentvolumes"):
|
|
|
|
obj := action.(core.UpdateAction).GetObject()
|
|
|
|
volume := obj.(*api.PersistentVolume)
|
|
|
|
|
|
|
|
// Check and bump object version
|
|
|
|
storedVolume, found := r.volumes[volume.Name]
|
|
|
|
if found {
|
|
|
|
storedVer, _ := strconv.Atoi(storedVolume.ResourceVersion)
|
|
|
|
requestedVer, _ := strconv.Atoi(volume.ResourceVersion)
|
|
|
|
if storedVer != requestedVer {
|
|
|
|
return true, obj, versionConflictError
|
|
|
|
}
|
|
|
|
volume.ResourceVersion = strconv.Itoa(storedVer + 1)
|
2016-05-17 12:55:15 +00:00
|
|
|
} else {
|
|
|
|
return true, nil, fmt.Errorf("Cannot update volume %s: volume not found", volume.Name)
|
2016-05-17 12:55:07 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Store the updated object to appropriate places.
|
|
|
|
if r.volumeSource != nil {
|
|
|
|
r.volumeSource.Modify(volume)
|
|
|
|
}
|
|
|
|
r.volumes[volume.Name] = volume
|
|
|
|
r.changedObjects = append(r.changedObjects, volume)
|
|
|
|
r.changedSinceLastSync++
|
|
|
|
glog.V(4).Infof("saved updated volume %s", volume.Name)
|
|
|
|
return true, volume, nil
|
|
|
|
|
|
|
|
case action.Matches("update", "persistentvolumeclaims"):
|
|
|
|
obj := action.(core.UpdateAction).GetObject()
|
|
|
|
claim := obj.(*api.PersistentVolumeClaim)
|
|
|
|
|
|
|
|
// Check and bump object version
|
|
|
|
storedClaim, found := r.claims[claim.Name]
|
|
|
|
if found {
|
|
|
|
storedVer, _ := strconv.Atoi(storedClaim.ResourceVersion)
|
|
|
|
requestedVer, _ := strconv.Atoi(claim.ResourceVersion)
|
|
|
|
if storedVer != requestedVer {
|
|
|
|
return true, obj, versionConflictError
|
|
|
|
}
|
|
|
|
claim.ResourceVersion = strconv.Itoa(storedVer + 1)
|
2016-05-17 12:55:15 +00:00
|
|
|
} else {
|
|
|
|
return true, nil, fmt.Errorf("Cannot update claim %s: claim not found", claim.Name)
|
2016-05-17 12:55:07 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Store the updated object to appropriate places.
|
|
|
|
r.claims[claim.Name] = claim
|
|
|
|
if r.claimSource != nil {
|
|
|
|
r.claimSource.Modify(claim)
|
|
|
|
}
|
|
|
|
r.changedObjects = append(r.changedObjects, claim)
|
|
|
|
r.changedSinceLastSync++
|
|
|
|
glog.V(4).Infof("saved updated claim %s", claim.Name)
|
|
|
|
return true, claim, nil
|
2016-05-17 12:55:22 +00:00
|
|
|
|
|
|
|
case action.Matches("get", "persistentvolumes"):
|
|
|
|
name := action.(core.GetAction).GetName()
|
|
|
|
volume, found := r.volumes[name]
|
|
|
|
if found {
|
|
|
|
glog.V(4).Infof("GetVolume: found %s", volume.Name)
|
|
|
|
return true, volume, nil
|
|
|
|
} else {
|
|
|
|
glog.V(4).Infof("GetVolume: volume %s not found", name)
|
|
|
|
return true, nil, fmt.Errorf("Cannot find volume %s", name)
|
|
|
|
}
|
2016-05-17 12:55:23 +00:00
|
|
|
|
|
|
|
case action.Matches("delete", "persistentvolumes"):
|
|
|
|
name := action.(core.DeleteAction).GetName()
|
|
|
|
glog.V(4).Infof("deleted volume %s", name)
|
|
|
|
_, found := r.volumes[name]
|
|
|
|
if found {
|
|
|
|
delete(r.volumes, name)
|
|
|
|
return true, nil, nil
|
|
|
|
} else {
|
|
|
|
return true, nil, fmt.Errorf("Cannot delete volume %s: not found", name)
|
|
|
|
}
|
|
|
|
|
|
|
|
case action.Matches("delete", "persistentvolumeclaims"):
|
|
|
|
name := action.(core.DeleteAction).GetName()
|
|
|
|
glog.V(4).Infof("deleted claim %s", name)
|
|
|
|
_, found := r.volumes[name]
|
|
|
|
if found {
|
|
|
|
delete(r.claims, name)
|
|
|
|
return true, nil, nil
|
|
|
|
} else {
|
|
|
|
return true, nil, fmt.Errorf("Cannot delete claim %s: not found", name)
|
|
|
|
}
|
2016-05-17 12:55:07 +00:00
|
|
|
}
|
2016-05-17 12:55:23 +00:00
|
|
|
|
2016-05-17 12:55:07 +00:00
|
|
|
return false, nil, nil
|
|
|
|
}
|
|
|
|
|
2016-05-17 12:55:28 +00:00
|
|
|
// injectReactError returns an error when the test requested given action to
|
|
|
|
// fail. nil is returned otherwise.
|
|
|
|
func (r *volumeReactor) injectReactError(action core.Action) error {
|
|
|
|
if len(r.errors) == 0 {
|
|
|
|
// No more errors to inject, everything should succeed.
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
for i, expected := range r.errors {
|
|
|
|
glog.V(4).Infof("trying to match %q %q with %q %q", expected.verb, expected.resource, action.GetVerb(), action.GetResource())
|
|
|
|
if action.Matches(expected.verb, expected.resource) {
|
|
|
|
// That's the action we're waiting for, remove it from injectedErrors
|
|
|
|
r.errors = append(r.errors[:i], r.errors[i+1:]...)
|
|
|
|
glog.V(4).Infof("reactor found matching error at index %d: %q %q, returning %v", i, expected.verb, expected.resource, expected.error)
|
|
|
|
return expected.error
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-05-17 12:55:07 +00:00
|
|
|
// checkVolumes compares all expectedVolumes with set of volumes at the end of
|
|
|
|
// the test and reports differences.
|
|
|
|
func (r *volumeReactor) checkVolumes(t *testing.T, expectedVolumes []*api.PersistentVolume) error {
|
|
|
|
r.lock.Lock()
|
|
|
|
defer r.lock.Unlock()
|
|
|
|
|
|
|
|
expectedMap := make(map[string]*api.PersistentVolume)
|
|
|
|
gotMap := make(map[string]*api.PersistentVolume)
|
|
|
|
// Clear any ResourceVersion from both sets
|
|
|
|
for _, v := range expectedVolumes {
|
|
|
|
v.ResourceVersion = ""
|
|
|
|
expectedMap[v.Name] = v
|
|
|
|
}
|
|
|
|
for _, v := range r.volumes {
|
|
|
|
// We must clone the volume because of golang race check - it was
|
|
|
|
// written by the controller without any locks on it.
|
|
|
|
clone, _ := conversion.NewCloner().DeepCopy(v)
|
|
|
|
v = clone.(*api.PersistentVolume)
|
|
|
|
v.ResourceVersion = ""
|
|
|
|
if v.Spec.ClaimRef != nil {
|
|
|
|
v.Spec.ClaimRef.ResourceVersion = ""
|
|
|
|
}
|
|
|
|
gotMap[v.Name] = v
|
|
|
|
}
|
|
|
|
if !reflect.DeepEqual(expectedMap, gotMap) {
|
|
|
|
// Print ugly but useful diff of expected and received objects for
|
|
|
|
// easier debugging.
|
|
|
|
return fmt.Errorf("Volume check failed [A-expected, B-got]: %s", diff.ObjectDiff(expectedMap, gotMap))
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// checkClaims compares all expectedClaims with set of claims at the end of the
|
|
|
|
// test and reports differences.
|
|
|
|
func (r *volumeReactor) checkClaims(t *testing.T, expectedClaims []*api.PersistentVolumeClaim) error {
|
|
|
|
r.lock.Lock()
|
|
|
|
defer r.lock.Unlock()
|
|
|
|
|
|
|
|
expectedMap := make(map[string]*api.PersistentVolumeClaim)
|
|
|
|
gotMap := make(map[string]*api.PersistentVolumeClaim)
|
|
|
|
for _, c := range expectedClaims {
|
|
|
|
c.ResourceVersion = ""
|
|
|
|
expectedMap[c.Name] = c
|
|
|
|
}
|
|
|
|
for _, c := range r.claims {
|
|
|
|
// We must clone the claim because of golang race check - it was
|
|
|
|
// written by the controller without any locks on it.
|
|
|
|
clone, _ := conversion.NewCloner().DeepCopy(c)
|
|
|
|
c = clone.(*api.PersistentVolumeClaim)
|
|
|
|
c.ResourceVersion = ""
|
|
|
|
gotMap[c.Name] = c
|
|
|
|
}
|
|
|
|
if !reflect.DeepEqual(expectedMap, gotMap) {
|
|
|
|
// Print ugly but useful diff of expected and received objects for
|
|
|
|
// easier debugging.
|
|
|
|
return fmt.Errorf("Claim check failed [A-expected, B-got result]: %s", diff.ObjectDiff(expectedMap, gotMap))
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-05-17 12:55:14 +00:00
|
|
|
// checkEvents compares all expectedEvents with events generated during the test
|
|
|
|
// and reports differences.
|
|
|
|
func checkEvents(t *testing.T, expectedEvents []string, ctrl *PersistentVolumeController) error {
|
|
|
|
var err error
|
|
|
|
|
2016-06-01 08:12:47 +00:00
|
|
|
// Read recorded events - wait up to 1 minute to get all the expected ones
|
|
|
|
// (just in case some goroutines are slower with writing)
|
|
|
|
timer := time.NewTimer(time.Minute)
|
|
|
|
|
2016-05-17 12:55:14 +00:00
|
|
|
fakeRecorder := ctrl.eventRecorder.(*record.FakeRecorder)
|
|
|
|
gotEvents := []string{}
|
|
|
|
finished := false
|
2016-06-01 08:12:47 +00:00
|
|
|
for len(gotEvents) < len(expectedEvents) && !finished {
|
2016-05-17 12:55:14 +00:00
|
|
|
select {
|
|
|
|
case event, ok := <-fakeRecorder.Events:
|
|
|
|
if ok {
|
|
|
|
glog.V(5).Infof("event recorder got event %s", event)
|
|
|
|
gotEvents = append(gotEvents, event)
|
|
|
|
} else {
|
|
|
|
glog.V(5).Infof("event recorder finished")
|
|
|
|
finished = true
|
|
|
|
}
|
2016-06-01 08:12:47 +00:00
|
|
|
case _, _ = <-timer.C:
|
|
|
|
glog.V(5).Infof("event recorder timeout")
|
2016-05-17 12:55:14 +00:00
|
|
|
finished = true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Evaluate the events
|
|
|
|
for i, expected := range expectedEvents {
|
|
|
|
if len(gotEvents) <= i {
|
|
|
|
t.Errorf("Event %q not emitted", expected)
|
|
|
|
err = fmt.Errorf("Events do not match")
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
received := gotEvents[i]
|
|
|
|
if !strings.HasPrefix(received, expected) {
|
|
|
|
t.Errorf("Unexpected event received, expected %q, got %q", expected, received)
|
|
|
|
err = fmt.Errorf("Events do not match")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
for i := len(expectedEvents); i < len(gotEvents); i++ {
|
|
|
|
t.Errorf("Unexpected event received: %q", gotEvents[i])
|
|
|
|
err = fmt.Errorf("Events do not match")
|
|
|
|
}
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2016-05-17 12:55:07 +00:00
|
|
|
// popChange returns one recorded updated object, either *api.PersistentVolume
|
|
|
|
// or *api.PersistentVolumeClaim. Returns nil when there are no changes.
|
|
|
|
func (r *volumeReactor) popChange() interface{} {
|
|
|
|
r.lock.Lock()
|
|
|
|
defer r.lock.Unlock()
|
|
|
|
|
|
|
|
if len(r.changedObjects) == 0 {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// For debugging purposes, print the queue
|
|
|
|
for _, obj := range r.changedObjects {
|
|
|
|
switch obj.(type) {
|
|
|
|
case *api.PersistentVolume:
|
|
|
|
vol, _ := obj.(*api.PersistentVolume)
|
|
|
|
glog.V(4).Infof("reactor queue: %s", vol.Name)
|
|
|
|
case *api.PersistentVolumeClaim:
|
|
|
|
claim, _ := obj.(*api.PersistentVolumeClaim)
|
|
|
|
glog.V(4).Infof("reactor queue: %s", claim.Name)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Pop the first item from the queue and return it
|
|
|
|
obj := r.changedObjects[0]
|
|
|
|
r.changedObjects = r.changedObjects[1:]
|
|
|
|
return obj
|
|
|
|
}
|
|
|
|
|
|
|
|
// syncAll simulates the controller periodic sync of volumes and claim. It
|
|
|
|
// simply adds all these objects to the internal queue of updates. This method
|
|
|
|
// should be used when the test manually calls syncClaim/syncVolume. Test that
|
|
|
|
// use real controller loop (ctrl.Run()) will get periodic sync automatically.
|
|
|
|
func (r *volumeReactor) syncAll() {
|
|
|
|
r.lock.Lock()
|
|
|
|
defer r.lock.Unlock()
|
|
|
|
|
|
|
|
for _, c := range r.claims {
|
|
|
|
r.changedObjects = append(r.changedObjects, c)
|
|
|
|
}
|
|
|
|
for _, v := range r.volumes {
|
|
|
|
r.changedObjects = append(r.changedObjects, v)
|
|
|
|
}
|
|
|
|
r.changedSinceLastSync = 0
|
|
|
|
}
|
|
|
|
|
|
|
|
func (r *volumeReactor) getChangeCount() int {
|
|
|
|
r.lock.Lock()
|
|
|
|
defer r.lock.Unlock()
|
|
|
|
return r.changedSinceLastSync
|
|
|
|
}
|
|
|
|
|
2016-05-17 12:55:19 +00:00
|
|
|
func (r *volumeReactor) getOperationCount() int {
|
|
|
|
r.ctrl.runningOperationsMapLock.Lock()
|
|
|
|
defer r.ctrl.runningOperationsMapLock.Unlock()
|
|
|
|
return len(r.ctrl.runningOperations)
|
|
|
|
}
|
|
|
|
|
2016-05-17 12:55:15 +00:00
|
|
|
// waitTest waits until all tests, controllers and other goroutines do their
|
|
|
|
// job and no new actions are registered for 10 milliseconds.
|
|
|
|
func (r *volumeReactor) waitTest() {
|
|
|
|
// Check every 10ms if the controller does something and stop if it's
|
|
|
|
// idle.
|
|
|
|
oldChanges := -1
|
|
|
|
for {
|
|
|
|
time.Sleep(10 * time.Millisecond)
|
|
|
|
changes := r.getChangeCount()
|
2016-05-17 12:55:19 +00:00
|
|
|
if changes == oldChanges && r.getOperationCount() == 0 {
|
2016-05-17 12:55:15 +00:00
|
|
|
// No changes for last 10ms -> controller must be idle.
|
|
|
|
break
|
|
|
|
}
|
|
|
|
oldChanges = changes
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-06-01 06:35:33 +00:00
|
|
|
// deleteVolumeEvent simulates that a volume has been deleted in etcd and
|
|
|
|
// the controller receives 'volume deleted' event.
|
|
|
|
func (r *volumeReactor) deleteVolumeEvent(volume *api.PersistentVolume) {
|
|
|
|
r.lock.Lock()
|
|
|
|
defer r.lock.Unlock()
|
|
|
|
|
|
|
|
// Remove the volume from list of resulting volumes.
|
|
|
|
delete(r.volumes, volume.Name)
|
|
|
|
|
|
|
|
// Generate deletion event. Cloned volume is needed to prevent races (and we
|
|
|
|
// would get a clone from etcd too).
|
|
|
|
clone, _ := conversion.NewCloner().DeepCopy(volume)
|
|
|
|
volumeClone := clone.(*api.PersistentVolume)
|
|
|
|
r.volumeSource.Delete(volumeClone)
|
|
|
|
}
|
|
|
|
|
|
|
|
// deleteClaimEvent simulates that a claim has been deleted in etcd and the
|
|
|
|
// controller receives 'claim deleted' event.
|
|
|
|
func (r *volumeReactor) deleteClaimEvent(claim *api.PersistentVolumeClaim) {
|
|
|
|
r.lock.Lock()
|
|
|
|
defer r.lock.Unlock()
|
|
|
|
|
|
|
|
// Remove the claim from list of resulting claims.
|
|
|
|
delete(r.claims, claim.Name)
|
|
|
|
|
|
|
|
// Generate deletion event. Cloned volume is needed to prevent races (and we
|
|
|
|
// would get a clone from etcd too).
|
|
|
|
clone, _ := conversion.NewCloner().DeepCopy(claim)
|
|
|
|
claimClone := clone.(*api.PersistentVolumeClaim)
|
|
|
|
r.claimSource.Delete(claimClone)
|
|
|
|
}
|
|
|
|
|
|
|
|
// addVolumeEvent simulates that a volume has been added in etcd and the
|
|
|
|
// controller receives 'volume added' event.
|
|
|
|
func (r *volumeReactor) addVolumeEvent(volume *api.PersistentVolume) {
|
|
|
|
r.lock.Lock()
|
|
|
|
defer r.lock.Unlock()
|
|
|
|
|
|
|
|
r.volumes[volume.Name] = volume
|
|
|
|
// Generate event. No cloning is needed, this claim is not stored in the
|
|
|
|
// controller cache yet.
|
|
|
|
r.volumeSource.Add(volume)
|
|
|
|
}
|
|
|
|
|
|
|
|
// addClaimEvent simulates that a claim has been deleted in etcd and the
|
|
|
|
// controller receives 'claim added' event.
|
|
|
|
func (r *volumeReactor) addClaimEvent(claim *api.PersistentVolumeClaim) {
|
|
|
|
r.lock.Lock()
|
|
|
|
defer r.lock.Unlock()
|
|
|
|
|
|
|
|
r.claims[claim.Name] = claim
|
|
|
|
// Generate event. No cloning is needed, this claim is not stored in the
|
|
|
|
// controller cache yet.
|
|
|
|
r.claimSource.Add(claim)
|
|
|
|
}
|
|
|
|
|
2016-05-17 12:55:28 +00:00
|
|
|
func newVolumeReactor(client *fake.Clientset, ctrl *PersistentVolumeController, volumeSource, claimSource *framework.FakeControllerSource, errors []reactorError) *volumeReactor {
|
2016-05-17 12:55:07 +00:00
|
|
|
reactor := &volumeReactor{
|
|
|
|
volumes: make(map[string]*api.PersistentVolume),
|
|
|
|
claims: make(map[string]*api.PersistentVolumeClaim),
|
|
|
|
ctrl: ctrl,
|
|
|
|
volumeSource: volumeSource,
|
|
|
|
claimSource: claimSource,
|
2016-05-17 12:55:28 +00:00
|
|
|
errors: errors,
|
2016-05-17 12:55:07 +00:00
|
|
|
}
|
|
|
|
client.AddReactor("*", "*", reactor.React)
|
|
|
|
return reactor
|
|
|
|
}
|
|
|
|
|
2016-05-17 12:55:34 +00:00
|
|
|
func newTestController(kubeClient clientset.Interface, volumeSource, claimSource cache.ListerWatcher) *PersistentVolumeController {
|
2016-05-17 12:55:30 +00:00
|
|
|
if volumeSource == nil {
|
|
|
|
volumeSource = framework.NewFakeControllerSource()
|
|
|
|
}
|
|
|
|
if claimSource == nil {
|
|
|
|
claimSource = framework.NewFakeControllerSource()
|
|
|
|
}
|
2016-05-17 12:55:34 +00:00
|
|
|
ctrl := NewPersistentVolumeController(
|
|
|
|
kubeClient,
|
|
|
|
5*time.Second, // sync period
|
|
|
|
nil, // provisioner
|
|
|
|
[]vol.VolumePlugin{}, // recyclers
|
|
|
|
nil, // cloud
|
|
|
|
"",
|
|
|
|
volumeSource,
|
|
|
|
claimSource,
|
|
|
|
record.NewFakeRecorder(1000), // event recorder
|
|
|
|
)
|
|
|
|
|
|
|
|
// Speed up the test
|
|
|
|
ctrl.createProvisionedPVInterval = 5 * time.Millisecond
|
2016-05-17 12:55:07 +00:00
|
|
|
return ctrl
|
|
|
|
}
|
|
|
|
|
2016-05-17 12:55:22 +00:00
|
|
|
func addRecyclePlugin(ctrl *PersistentVolumeController, expectedRecycleCalls []error) {
|
|
|
|
plugin := &mockVolumePlugin{
|
|
|
|
recycleCalls: expectedRecycleCalls,
|
|
|
|
}
|
|
|
|
ctrl.recyclePluginMgr.InitPlugins([]vol.VolumePlugin{plugin}, ctrl)
|
|
|
|
}
|
|
|
|
|
|
|
|
func addDeletePlugin(ctrl *PersistentVolumeController, expectedDeleteCalls []error) {
|
|
|
|
plugin := &mockVolumePlugin{
|
|
|
|
deleteCalls: expectedDeleteCalls,
|
|
|
|
}
|
|
|
|
ctrl.recyclePluginMgr.InitPlugins([]vol.VolumePlugin{plugin}, ctrl)
|
|
|
|
}
|
|
|
|
|
2016-05-17 12:55:26 +00:00
|
|
|
func addProvisionPlugin(ctrl *PersistentVolumeController, expectedDeleteCalls []error) {
|
|
|
|
plugin := &mockVolumePlugin{
|
|
|
|
provisionCalls: expectedDeleteCalls,
|
|
|
|
}
|
|
|
|
ctrl.provisioner = plugin
|
|
|
|
}
|
|
|
|
|
2016-05-17 12:55:07 +00:00
|
|
|
// newVolume returns a new volume with given attributes
|
2016-05-17 12:55:22 +00:00
|
|
|
func newVolume(name, capacity, boundToClaimUID, boundToClaimName string, phase api.PersistentVolumePhase, reclaimPolicy api.PersistentVolumeReclaimPolicy, annotations ...string) *api.PersistentVolume {
|
2016-05-17 12:55:07 +00:00
|
|
|
volume := api.PersistentVolume{
|
|
|
|
ObjectMeta: api.ObjectMeta{
|
|
|
|
Name: name,
|
|
|
|
ResourceVersion: "1",
|
|
|
|
},
|
|
|
|
Spec: api.PersistentVolumeSpec{
|
|
|
|
Capacity: api.ResourceList{
|
|
|
|
api.ResourceName(api.ResourceStorage): resource.MustParse(capacity),
|
|
|
|
},
|
|
|
|
PersistentVolumeSource: api.PersistentVolumeSource{
|
|
|
|
GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{},
|
|
|
|
},
|
2016-05-17 12:55:22 +00:00
|
|
|
AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce, api.ReadOnlyMany},
|
|
|
|
PersistentVolumeReclaimPolicy: reclaimPolicy,
|
2016-05-17 12:55:07 +00:00
|
|
|
},
|
|
|
|
Status: api.PersistentVolumeStatus{
|
|
|
|
Phase: phase,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
if boundToClaimName != "" {
|
|
|
|
volume.Spec.ClaimRef = &api.ObjectReference{
|
|
|
|
Kind: "PersistentVolumeClaim",
|
|
|
|
APIVersion: "v1",
|
|
|
|
UID: types.UID(boundToClaimUID),
|
|
|
|
Namespace: testNamespace,
|
|
|
|
Name: boundToClaimName,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(annotations) > 0 {
|
|
|
|
volume.Annotations = make(map[string]string)
|
|
|
|
for _, a := range annotations {
|
2016-05-17 12:55:26 +00:00
|
|
|
if a != annDynamicallyProvisioned {
|
|
|
|
volume.Annotations[a] = "yes"
|
|
|
|
} else {
|
|
|
|
volume.Annotations[a] = mockPluginName
|
|
|
|
}
|
2016-05-17 12:55:07 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return &volume
|
|
|
|
}
|
|
|
|
|
2016-05-19 20:52:29 +00:00
|
|
|
// withLabels applies the given labels to the first volume in the array and
|
|
|
|
// returns the array. Meant to be used to compose volumes specified inline in
|
|
|
|
// a test.
|
|
|
|
func withLabels(labels map[string]string, volumes []*api.PersistentVolume) []*api.PersistentVolume {
|
|
|
|
volumes[0].Labels = labels
|
|
|
|
return volumes
|
|
|
|
}
|
|
|
|
|
|
|
|
// withLabelSelector sets the label selector of the first claim in the array
|
|
|
|
// to be MatchLabels of the given label set and returns the array. Meant
|
|
|
|
// to be used to compose claims specified inline in a test.
|
|
|
|
func withLabelSelector(labels map[string]string, claims []*api.PersistentVolumeClaim) []*api.PersistentVolumeClaim {
|
|
|
|
claims[0].Spec.Selector = &unversioned.LabelSelector{
|
|
|
|
MatchLabels: labels,
|
|
|
|
}
|
|
|
|
|
|
|
|
return claims
|
|
|
|
}
|
|
|
|
|
2016-05-17 12:55:07 +00:00
|
|
|
// newVolumeArray returns array with a single volume that would be returned by
|
|
|
|
// newVolume() with the same parameters.
|
2016-05-17 12:55:22 +00:00
|
|
|
func newVolumeArray(name, capacity, boundToClaimUID, boundToClaimName string, phase api.PersistentVolumePhase, reclaimPolicy api.PersistentVolumeReclaimPolicy, annotations ...string) []*api.PersistentVolume {
|
2016-05-17 12:55:07 +00:00
|
|
|
return []*api.PersistentVolume{
|
2016-05-17 12:55:22 +00:00
|
|
|
newVolume(name, capacity, boundToClaimUID, boundToClaimName, phase, reclaimPolicy, annotations...),
|
2016-05-17 12:55:07 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// newClaim returns a new claim with given attributes
|
|
|
|
func newClaim(name, claimUID, capacity, boundToVolume string, phase api.PersistentVolumeClaimPhase, annotations ...string) *api.PersistentVolumeClaim {
|
|
|
|
claim := api.PersistentVolumeClaim{
|
|
|
|
ObjectMeta: api.ObjectMeta{
|
|
|
|
Name: name,
|
|
|
|
Namespace: testNamespace,
|
|
|
|
UID: types.UID(claimUID),
|
|
|
|
ResourceVersion: "1",
|
|
|
|
},
|
|
|
|
Spec: api.PersistentVolumeClaimSpec{
|
|
|
|
AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce, api.ReadOnlyMany},
|
|
|
|
Resources: api.ResourceRequirements{
|
|
|
|
Requests: api.ResourceList{
|
|
|
|
api.ResourceName(api.ResourceStorage): resource.MustParse(capacity),
|
|
|
|
},
|
|
|
|
},
|
|
|
|
VolumeName: boundToVolume,
|
|
|
|
},
|
|
|
|
Status: api.PersistentVolumeClaimStatus{
|
|
|
|
Phase: phase,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
// Make sure api.GetReference(claim) works
|
|
|
|
claim.ObjectMeta.SelfLink = testapi.Default.SelfLink("pvc", name)
|
|
|
|
|
|
|
|
if len(annotations) > 0 {
|
|
|
|
claim.Annotations = make(map[string]string)
|
|
|
|
for _, a := range annotations {
|
|
|
|
claim.Annotations[a] = "yes"
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return &claim
|
|
|
|
}
|
|
|
|
|
|
|
|
// newClaimArray returns array with a single claim that would be returned by
|
|
|
|
// newClaim() with the same parameters.
|
|
|
|
func newClaimArray(name, claimUID, capacity, boundToVolume string, phase api.PersistentVolumeClaimPhase, annotations ...string) []*api.PersistentVolumeClaim {
|
|
|
|
return []*api.PersistentVolumeClaim{
|
|
|
|
newClaim(name, claimUID, capacity, boundToVolume, phase, annotations...),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func testSyncClaim(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error {
|
|
|
|
return ctrl.syncClaim(test.initialClaims[0])
|
|
|
|
}
|
|
|
|
|
|
|
|
func testSyncClaimError(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error {
|
|
|
|
err := ctrl.syncClaim(test.initialClaims[0])
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
return fmt.Errorf("syncClaim succeeded when failure was expected")
|
|
|
|
}
|
|
|
|
|
|
|
|
func testSyncVolume(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error {
|
|
|
|
return ctrl.syncVolume(test.initialVolumes[0])
|
|
|
|
}
|
|
|
|
|
2016-05-17 12:55:22 +00:00
|
|
|
type operationType string
|
|
|
|
|
|
|
|
const operationDelete = "Delete"
|
|
|
|
const operationRecycle = "Recycle"
|
2016-05-17 12:55:26 +00:00
|
|
|
const operationProvision = "Provision"
|
2016-05-17 12:55:22 +00:00
|
|
|
|
|
|
|
// wrapTestWithControllerConfig returns a testCall that:
|
2016-05-17 12:55:26 +00:00
|
|
|
// - configures controller with recycler, deleter or provisioner which will
|
|
|
|
// return provided errors when a volume is deleted, recycled or provisioned
|
2016-05-17 12:55:22 +00:00
|
|
|
// - calls given testCall
|
|
|
|
func wrapTestWithControllerConfig(operation operationType, expectedOperationCalls []error, toWrap testCall) testCall {
|
|
|
|
expected := expectedOperationCalls
|
|
|
|
|
|
|
|
return func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error {
|
|
|
|
switch operation {
|
|
|
|
case operationDelete:
|
|
|
|
addDeletePlugin(ctrl, expected)
|
|
|
|
case operationRecycle:
|
|
|
|
addRecyclePlugin(ctrl, expected)
|
2016-05-17 12:55:26 +00:00
|
|
|
case operationProvision:
|
|
|
|
addProvisionPlugin(ctrl, expected)
|
2016-05-17 12:55:22 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return toWrap(ctrl, reactor, test)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// wrapTestWithInjectedOperation returns a testCall that:
|
|
|
|
// - starts the controller and lets it run original testCall until
|
|
|
|
// scheduleOperation() call. It blocks the controller there and calls the
|
|
|
|
// injected function to simulate that something is happenning when the
|
|
|
|
// controller waits for the operation lock. Controller is then resumed and we
|
|
|
|
// check how it behaves.
|
|
|
|
func wrapTestWithInjectedOperation(toWrap testCall, injectBeforeOperation func(ctrl *PersistentVolumeController, reactor *volumeReactor)) testCall {
|
|
|
|
|
|
|
|
return func(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest) error {
|
|
|
|
// Inject a hook before async operation starts
|
|
|
|
ctrl.preOperationHook = func(operationName string, arg interface{}) {
|
|
|
|
// Inside the hook, run the function to inject
|
|
|
|
glog.V(4).Infof("reactor: scheduleOperation reached, injecting call")
|
|
|
|
injectBeforeOperation(ctrl, reactor)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Run the tested function (typically syncClaim/syncVolume) in a
|
|
|
|
// separate goroutine.
|
|
|
|
var testError error
|
|
|
|
var testFinished int32
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
testError = toWrap(ctrl, reactor, test)
|
|
|
|
// Let the "main" test function know that syncVolume has finished.
|
|
|
|
atomic.StoreInt32(&testFinished, 1)
|
|
|
|
}()
|
|
|
|
|
|
|
|
// Wait for the controler to finish the test function.
|
|
|
|
for atomic.LoadInt32(&testFinished) == 0 {
|
|
|
|
time.Sleep(time.Millisecond * 10)
|
|
|
|
}
|
|
|
|
|
|
|
|
return testError
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-05-17 12:55:07 +00:00
|
|
|
func evaluateTestResults(ctrl *PersistentVolumeController, reactor *volumeReactor, test controllerTest, t *testing.T) {
|
|
|
|
// Evaluate results
|
|
|
|
if err := reactor.checkClaims(t, test.expectedClaims); err != nil {
|
|
|
|
t.Errorf("Test %q: %v", test.name, err)
|
|
|
|
|
|
|
|
}
|
|
|
|
if err := reactor.checkVolumes(t, test.expectedVolumes); err != nil {
|
|
|
|
t.Errorf("Test %q: %v", test.name, err)
|
|
|
|
}
|
2016-05-17 12:55:14 +00:00
|
|
|
|
|
|
|
if err := checkEvents(t, test.expectedEvents, ctrl); err != nil {
|
|
|
|
t.Errorf("Test %q: %v", test.name, err)
|
|
|
|
}
|
2016-05-17 12:55:07 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Test single call to syncClaim and syncVolume methods.
|
|
|
|
// For all tests:
|
|
|
|
// 1. Fill in the controller with initial data
|
|
|
|
// 2. Call the tested function (syncClaim/syncVolume) via
|
|
|
|
// controllerTest.testCall *once*.
|
|
|
|
// 3. Compare resulting volumes and claims with expected volumes and claims.
|
|
|
|
func runSyncTests(t *testing.T, tests []controllerTest) {
|
|
|
|
for _, test := range tests {
|
|
|
|
glog.V(4).Infof("starting test %q", test.name)
|
|
|
|
|
|
|
|
// Initialize the controller
|
|
|
|
client := &fake.Clientset{}
|
2016-05-17 12:55:34 +00:00
|
|
|
ctrl := newTestController(client, nil, nil)
|
2016-05-17 12:55:28 +00:00
|
|
|
reactor := newVolumeReactor(client, ctrl, nil, nil, test.errors)
|
2016-05-17 12:55:07 +00:00
|
|
|
for _, claim := range test.initialClaims {
|
|
|
|
ctrl.claims.Add(claim)
|
|
|
|
reactor.claims[claim.Name] = claim
|
|
|
|
}
|
|
|
|
for _, volume := range test.initialVolumes {
|
|
|
|
ctrl.volumes.store.Add(volume)
|
|
|
|
reactor.volumes[volume.Name] = volume
|
|
|
|
}
|
|
|
|
|
|
|
|
// Run the tested functions
|
|
|
|
err := test.test(ctrl, reactor, test)
|
|
|
|
if err != nil {
|
|
|
|
t.Errorf("Test %q failed: %v", test.name, err)
|
|
|
|
}
|
|
|
|
|
2016-05-17 12:55:22 +00:00
|
|
|
// Wait for all goroutines to finish
|
|
|
|
reactor.waitTest()
|
|
|
|
|
2016-05-17 12:55:07 +00:00
|
|
|
evaluateTestResults(ctrl, reactor, test, t)
|
|
|
|
}
|
|
|
|
}
|
2016-05-17 12:55:12 +00:00
|
|
|
|
|
|
|
// Test multiple calls to syncClaim/syncVolume and periodic sync of all
|
|
|
|
// volume/claims. For all tests, the test follows this pattern:
|
|
|
|
// 0. Load the controller with initial data.
|
|
|
|
// 1. Call controllerTest.testCall() once as in TestSync()
|
|
|
|
// 2. For all volumes/claims changed by previous syncVolume/syncClaim calls,
|
|
|
|
// call appropriate syncVolume/syncClaim (simulating "volume/claim changed"
|
|
|
|
// events). Go to 2. if these calls change anything.
|
|
|
|
// 3. When all changes are processed and no new changes were made, call
|
|
|
|
// syncVolume/syncClaim on all volumes/claims (simulating "periodic sync").
|
|
|
|
// 4. If some changes were done by step 3., go to 2. (simulation of
|
|
|
|
// "volume/claim updated" events, eventually performing step 3. again)
|
|
|
|
// 5. When 3. does not do any changes, finish the tests and compare final set
|
|
|
|
// of volumes/claims with expected claims/volumes and report differences.
|
|
|
|
// Some limit of calls in enforced to prevent endless loops.
|
|
|
|
func runMultisyncTests(t *testing.T, tests []controllerTest) {
|
|
|
|
for _, test := range tests {
|
|
|
|
glog.V(4).Infof("starting multisync test %q", test.name)
|
|
|
|
|
|
|
|
// Initialize the controller
|
|
|
|
client := &fake.Clientset{}
|
2016-05-17 12:55:34 +00:00
|
|
|
ctrl := newTestController(client, nil, nil)
|
2016-05-17 12:55:28 +00:00
|
|
|
reactor := newVolumeReactor(client, ctrl, nil, nil, test.errors)
|
2016-05-17 12:55:12 +00:00
|
|
|
for _, claim := range test.initialClaims {
|
|
|
|
ctrl.claims.Add(claim)
|
|
|
|
reactor.claims[claim.Name] = claim
|
|
|
|
}
|
|
|
|
for _, volume := range test.initialVolumes {
|
|
|
|
ctrl.volumes.store.Add(volume)
|
|
|
|
reactor.volumes[volume.Name] = volume
|
|
|
|
}
|
|
|
|
|
|
|
|
// Run the tested function
|
|
|
|
err := test.test(ctrl, reactor, test)
|
|
|
|
if err != nil {
|
|
|
|
t.Errorf("Test %q failed: %v", test.name, err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Simulate any "changed" events and "periodical sync" until we reach a
|
|
|
|
// stable state.
|
|
|
|
firstSync := true
|
|
|
|
counter := 0
|
|
|
|
for {
|
|
|
|
counter++
|
|
|
|
glog.V(4).Infof("test %q: iteration %d", test.name, counter)
|
|
|
|
|
|
|
|
if counter > 100 {
|
|
|
|
t.Errorf("Test %q failed: too many iterations", test.name)
|
|
|
|
break
|
|
|
|
}
|
|
|
|
|
2016-05-17 12:55:22 +00:00
|
|
|
// Wait for all goroutines to finish
|
|
|
|
reactor.waitTest()
|
|
|
|
|
2016-05-17 12:55:12 +00:00
|
|
|
obj := reactor.popChange()
|
|
|
|
if obj == nil {
|
|
|
|
// Nothing was changed, should we exit?
|
|
|
|
if firstSync || reactor.changedSinceLastSync > 0 {
|
|
|
|
// There were some changes after the last "periodic sync".
|
|
|
|
// Simulate "periodic sync" of everything (until it produces
|
|
|
|
// no changes).
|
|
|
|
firstSync = false
|
|
|
|
glog.V(4).Infof("test %q: simulating periodical sync of all claims and volumes", test.name)
|
|
|
|
reactor.syncAll()
|
|
|
|
} else {
|
|
|
|
// Last sync did not produce any updates, the test reached
|
|
|
|
// stable state -> finish.
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// There were some changes, process them
|
|
|
|
switch obj.(type) {
|
|
|
|
case *api.PersistentVolumeClaim:
|
|
|
|
claim := obj.(*api.PersistentVolumeClaim)
|
|
|
|
// Simulate "claim updated" event
|
|
|
|
ctrl.claims.Update(claim)
|
|
|
|
err = ctrl.syncClaim(claim)
|
|
|
|
if err != nil {
|
|
|
|
if err == versionConflictError {
|
|
|
|
// Ignore version errors
|
|
|
|
glog.V(4).Infof("test intentionaly ignores version error.")
|
|
|
|
} else {
|
|
|
|
t.Errorf("Error calling syncClaim: %v", err)
|
|
|
|
// Finish the loop on the first error
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// Process generated changes
|
|
|
|
continue
|
|
|
|
case *api.PersistentVolume:
|
|
|
|
volume := obj.(*api.PersistentVolume)
|
|
|
|
// Simulate "volume updated" event
|
|
|
|
ctrl.volumes.store.Update(volume)
|
|
|
|
err = ctrl.syncVolume(volume)
|
|
|
|
if err != nil {
|
|
|
|
if err == versionConflictError {
|
|
|
|
// Ignore version errors
|
|
|
|
glog.V(4).Infof("test intentionaly ignores version error.")
|
|
|
|
} else {
|
|
|
|
t.Errorf("Error calling syncVolume: %v", err)
|
|
|
|
// Finish the loop on the first error
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// Process generated changes
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
}
|
|
|
|
evaluateTestResults(ctrl, reactor, test, t)
|
|
|
|
glog.V(4).Infof("test %q finished after %d iterations", test.name, counter)
|
|
|
|
}
|
|
|
|
}
|
2016-05-17 12:55:22 +00:00
|
|
|
|
|
|
|
// Dummy volume plugin for provisioning, deletion and recycling. It contains
|
|
|
|
// lists of expected return values to simulate errors.
|
|
|
|
type mockVolumePlugin struct {
|
|
|
|
provisionCalls []error
|
|
|
|
provisionCallCounter int
|
|
|
|
deleteCalls []error
|
|
|
|
deleteCallCounter int
|
|
|
|
recycleCalls []error
|
|
|
|
recycleCallCounter int
|
2016-05-17 12:55:24 +00:00
|
|
|
provisionOptions vol.VolumeOptions
|
2016-05-17 12:55:22 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
var _ vol.VolumePlugin = &mockVolumePlugin{}
|
2016-05-19 10:58:25 +00:00
|
|
|
var _ vol.RecyclableVolumePlugin = &mockVolumePlugin{}
|
|
|
|
var _ vol.DeletableVolumePlugin = &mockVolumePlugin{}
|
|
|
|
var _ vol.ProvisionableVolumePlugin = &mockVolumePlugin{}
|
2016-05-17 12:55:22 +00:00
|
|
|
|
|
|
|
func (plugin *mockVolumePlugin) Init(host vol.VolumeHost) error {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (plugin *mockVolumePlugin) Name() string {
|
2016-05-17 12:55:26 +00:00
|
|
|
return mockPluginName
|
2016-05-17 12:55:22 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (plugin *mockVolumePlugin) CanSupport(spec *vol.Spec) bool {
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
|
|
|
func (plugin *mockVolumePlugin) NewMounter(spec *vol.Spec, podRef *api.Pod, opts vol.VolumeOptions) (vol.Mounter, error) {
|
|
|
|
return nil, fmt.Errorf("Mounter is not supported by this plugin")
|
|
|
|
}
|
|
|
|
|
|
|
|
func (plugin *mockVolumePlugin) NewUnmounter(name string, podUID types.UID) (vol.Unmounter, error) {
|
|
|
|
return nil, fmt.Errorf("Unmounter is not supported by this plugin")
|
|
|
|
}
|
|
|
|
|
|
|
|
// Provisioner interfaces
|
|
|
|
|
|
|
|
func (plugin *mockVolumePlugin) NewProvisioner(options vol.VolumeOptions) (vol.Provisioner, error) {
|
|
|
|
if len(plugin.provisionCalls) > 0 {
|
|
|
|
// mockVolumePlugin directly implements Provisioner interface
|
|
|
|
glog.V(4).Infof("mock plugin NewProvisioner called, returning mock provisioner")
|
2016-05-17 12:55:24 +00:00
|
|
|
plugin.provisionOptions = options
|
2016-05-17 12:55:22 +00:00
|
|
|
return plugin, nil
|
|
|
|
} else {
|
|
|
|
return nil, fmt.Errorf("Mock plugin error: no provisionCalls configured")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-05-17 12:55:24 +00:00
|
|
|
func (plugin *mockVolumePlugin) Provision() (*api.PersistentVolume, error) {
|
2016-05-17 12:55:22 +00:00
|
|
|
if len(plugin.provisionCalls) <= plugin.provisionCallCounter {
|
2016-05-17 12:55:24 +00:00
|
|
|
return nil, fmt.Errorf("Mock plugin error: unexpected provisioner call %d", plugin.provisionCallCounter)
|
2016-05-17 12:55:22 +00:00
|
|
|
}
|
|
|
|
|
2016-05-17 12:55:24 +00:00
|
|
|
var pv *api.PersistentVolume
|
|
|
|
err := plugin.provisionCalls[plugin.provisionCallCounter]
|
|
|
|
if err == nil {
|
2016-05-17 12:55:26 +00:00
|
|
|
// Create a fake PV with known GCE volume (to match expected volume)
|
2016-05-17 12:55:24 +00:00
|
|
|
pv = &api.PersistentVolume{
|
|
|
|
ObjectMeta: api.ObjectMeta{
|
|
|
|
Name: plugin.provisionOptions.PVName,
|
|
|
|
},
|
|
|
|
Spec: api.PersistentVolumeSpec{
|
|
|
|
Capacity: api.ResourceList{
|
|
|
|
api.ResourceName(api.ResourceStorage): plugin.provisionOptions.Capacity,
|
|
|
|
},
|
2016-05-17 12:55:26 +00:00
|
|
|
AccessModes: plugin.provisionOptions.AccessModes,
|
|
|
|
PersistentVolumeReclaimPolicy: plugin.provisionOptions.PersistentVolumeReclaimPolicy,
|
2016-05-17 12:55:24 +00:00
|
|
|
PersistentVolumeSource: api.PersistentVolumeSource{
|
2016-05-17 12:55:26 +00:00
|
|
|
GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{},
|
2016-05-17 12:55:24 +00:00
|
|
|
},
|
|
|
|
},
|
|
|
|
}
|
2016-05-17 12:55:22 +00:00
|
|
|
}
|
2016-05-17 12:55:24 +00:00
|
|
|
|
2016-05-17 12:55:22 +00:00
|
|
|
plugin.provisionCallCounter++
|
2016-05-17 12:55:24 +00:00
|
|
|
glog.V(4).Infof("mock plugin Provision call nr. %d, returning %v: %v", plugin.provisionCallCounter, pv, err)
|
|
|
|
return pv, err
|
2016-05-17 12:55:22 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Deleter interfaces
|
|
|
|
|
|
|
|
func (plugin *mockVolumePlugin) NewDeleter(spec *vol.Spec) (vol.Deleter, error) {
|
|
|
|
if len(plugin.deleteCalls) > 0 {
|
|
|
|
// mockVolumePlugin directly implements Deleter interface
|
|
|
|
glog.V(4).Infof("mock plugin NewDeleter called, returning mock deleter")
|
|
|
|
return plugin, nil
|
|
|
|
} else {
|
|
|
|
return nil, fmt.Errorf("Mock plugin error: no deleteCalls configured")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (plugin *mockVolumePlugin) Delete() error {
|
|
|
|
if len(plugin.deleteCalls) <= plugin.deleteCallCounter {
|
|
|
|
return fmt.Errorf("Mock plugin error: unexpected deleter call %d", plugin.deleteCallCounter)
|
|
|
|
}
|
|
|
|
ret := plugin.deleteCalls[plugin.deleteCallCounter]
|
|
|
|
plugin.deleteCallCounter++
|
|
|
|
glog.V(4).Infof("mock plugin Delete call nr. %d, returning %v", plugin.deleteCallCounter, ret)
|
|
|
|
return ret
|
|
|
|
}
|
|
|
|
|
|
|
|
// Volume interfaces
|
|
|
|
|
|
|
|
func (plugin *mockVolumePlugin) GetPath() string {
|
|
|
|
return ""
|
|
|
|
}
|
|
|
|
|
|
|
|
func (plugin *mockVolumePlugin) GetMetrics() (*vol.Metrics, error) {
|
|
|
|
return nil, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Recycler interfaces
|
|
|
|
|
2016-05-19 10:58:25 +00:00
|
|
|
func (plugin *mockVolumePlugin) NewRecycler(pvName string, spec *vol.Spec) (vol.Recycler, error) {
|
2016-05-17 12:55:22 +00:00
|
|
|
if len(plugin.recycleCalls) > 0 {
|
|
|
|
// mockVolumePlugin directly implements Recycler interface
|
|
|
|
glog.V(4).Infof("mock plugin NewRecycler called, returning mock recycler")
|
|
|
|
return plugin, nil
|
|
|
|
} else {
|
|
|
|
return nil, fmt.Errorf("Mock plugin error: no recycleCalls configured")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (plugin *mockVolumePlugin) Recycle() error {
|
|
|
|
if len(plugin.recycleCalls) <= plugin.recycleCallCounter {
|
|
|
|
return fmt.Errorf("Mock plugin error: unexpected recycle call %d", plugin.recycleCallCounter)
|
|
|
|
}
|
|
|
|
ret := plugin.recycleCalls[plugin.recycleCallCounter]
|
|
|
|
plugin.recycleCallCounter++
|
|
|
|
glog.V(4).Infof("mock plugin Recycle call nr. %d, returning %v", plugin.recycleCallCounter, ret)
|
|
|
|
return ret
|
|
|
|
}
|