Merge pull request #41273 from wongma7/pv-controller-shared

Automatic merge from submit-queue (batch tested with PRs 41604, 41273, 41547)

Switch pv controller to shared informer

This is WIP because I still need to do something with bazel? and add 'get storageclasses' to the controller-manager rbac role

@jsafrane PTAL and make sure I did not break anything in the PV controller. Do we need to clone the volumes/claims we get from the shared informer before we use them? I could not find a place where we modify them but you would know for certain.

cc @ncdc because I copied what you did in your other PRs.
pull/6/head
Kubernetes Submit Queue 2017-02-17 07:20:35 -08:00 committed by GitHub
commit 7da78faf06
13 changed files with 216 additions and 635 deletions

View File

@ -483,6 +483,9 @@ func StartControllers(controllers map[string]InitFunc, s *options.CMServer, root
VolumePlugins: ProbeControllerVolumePlugins(cloud, s.VolumeConfiguration), VolumePlugins: ProbeControllerVolumePlugins(cloud, s.VolumeConfiguration),
Cloud: cloud, Cloud: cloud,
ClusterName: s.ClusterName, ClusterName: s.ClusterName,
VolumeInformer: newSharedInformers.Core().V1().PersistentVolumes(),
ClaimInformer: newSharedInformers.Core().V1().PersistentVolumeClaims(),
ClassInformer: newSharedInformers.Storage().V1beta1().StorageClasses(),
EnableDynamicProvisioning: s.VolumeConfiguration.EnableDynamicProvisioning, EnableDynamicProvisioning: s.VolumeConfiguration.EnableDynamicProvisioning,
} }
volumeController := persistentvolumecontroller.NewController(params) volumeController := persistentvolumecontroller.NewController(params)

View File

@ -23,6 +23,10 @@ go_library(
"//pkg/apis/storage/v1beta1:go_default_library", "//pkg/apis/storage/v1beta1:go_default_library",
"//pkg/apis/storage/v1beta1/util:go_default_library", "//pkg/apis/storage/v1beta1/util:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/informers/informers_generated/externalversions/core/v1:go_default_library",
"//pkg/client/informers/informers_generated/externalversions/storage/v1beta1:go_default_library",
"//pkg/client/listers/core/v1:go_default_library",
"//pkg/client/listers/storage/v1beta1:go_default_library",
"//pkg/cloudprovider:go_default_library", "//pkg/cloudprovider:go_default_library",
"//pkg/controller:go_default_library", "//pkg/controller:go_default_library",
"//pkg/util/goroutinemap:go_default_library", "//pkg/util/goroutinemap:go_default_library",
@ -34,10 +38,9 @@ go_library(
"//vendor:k8s.io/apimachinery/pkg/api/meta", "//vendor:k8s.io/apimachinery/pkg/api/meta",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/labels", "//vendor:k8s.io/apimachinery/pkg/labels",
"//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/types", "//vendor:k8s.io/apimachinery/pkg/types",
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/wait", "//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/apimachinery/pkg/watch",
"//vendor:k8s.io/client-go/kubernetes/typed/core/v1", "//vendor:k8s.io/client-go/kubernetes/typed/core/v1",
"//vendor:k8s.io/client-go/pkg/api/v1", "//vendor:k8s.io/client-go/pkg/api/v1",
"//vendor:k8s.io/client-go/tools/cache", "//vendor:k8s.io/client-go/tools/cache",
@ -67,7 +70,9 @@ go_test(
"//pkg/apis/storage/v1beta1/util:go_default_library", "//pkg/apis/storage/v1beta1/util:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/clientset_generated/clientset/fake:go_default_library", "//pkg/client/clientset_generated/clientset/fake:go_default_library",
"//pkg/controller/volume/persistentvolume/testing:go_default_library", "//pkg/client/informers/informers_generated/externalversions:go_default_library",
"//pkg/client/listers/storage/v1beta1:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/volume:go_default_library", "//pkg/volume:go_default_library",
"//vendor:github.com/golang/glog", "//vendor:github.com/golang/glog",
"//vendor:k8s.io/apimachinery/pkg/api/resource", "//vendor:k8s.io/apimachinery/pkg/api/resource",
@ -76,6 +81,7 @@ go_test(
"//vendor:k8s.io/apimachinery/pkg/types", "//vendor:k8s.io/apimachinery/pkg/types",
"//vendor:k8s.io/apimachinery/pkg/util/diff", "//vendor:k8s.io/apimachinery/pkg/util/diff",
"//vendor:k8s.io/apimachinery/pkg/util/wait", "//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/apimachinery/pkg/watch",
"//vendor:k8s.io/client-go/testing", "//vendor:k8s.io/client-go/testing",
"//vendor:k8s.io/client-go/tools/cache", "//vendor:k8s.io/client-go/tools/cache",
"//vendor:k8s.io/client-go/tools/record", "//vendor:k8s.io/client-go/tools/record",
@ -94,7 +100,6 @@ filegroup(
srcs = [ srcs = [
":package-srcs", ":package-srcs",
"//pkg/controller/volume/persistentvolume/options:all-srcs", "//pkg/controller/volume/persistentvolume/options:all-srcs",
"//pkg/controller/volume/persistentvolume/testing:all-srcs",
], ],
tags = ["automanaged"], tags = ["automanaged"],
) )

View File

@ -35,6 +35,7 @@ import (
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/diff" "k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
core "k8s.io/client-go/testing" core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
@ -45,7 +46,9 @@ import (
storageutil "k8s.io/kubernetes/pkg/apis/storage/v1beta1/util" storageutil "k8s.io/kubernetes/pkg/apis/storage/v1beta1/util"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
fcache "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/testing" informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
storagelisters "k8s.io/kubernetes/pkg/client/listers/storage/v1beta1"
"k8s.io/kubernetes/pkg/controller"
vol "k8s.io/kubernetes/pkg/volume" vol "k8s.io/kubernetes/pkg/volume"
) )
@ -112,9 +115,10 @@ var noerrors = []reactorError{}
// is updated first and claim.Phase second. This queue will then contain both // is updated first and claim.Phase second. This queue will then contain both
// updates as separate entries. // updates as separate entries.
// - Number of changes since the last call to volumeReactor.syncAll(). // - Number of changes since the last call to volumeReactor.syncAll().
// - Optionally, volume and claim event sources. When set, all changed // - Optionally, volume and claim fake watchers which should be the same ones
// volumes/claims are sent as Modify event to these sources. These sources can // used by the controller. Any time an event function like deleteVolumeEvent
// be linked back to the controller watcher as "volume/claim updated" events. // is called to simulate an event, the reactor's stores are updated and the
// controller is sent the event via the fake watcher.
// - Optionally, list of error that should be returned by reactor, simulating // - Optionally, list of error that should be returned by reactor, simulating
// etcd / API server failures. These errors are evaluated in order and every // etcd / API server failures. These errors are evaluated in order and every
// error is returned only once. I.e. when the reactor finds matching // error is returned only once. I.e. when the reactor finds matching
@ -126,8 +130,8 @@ type volumeReactor struct {
changedObjects []interface{} changedObjects []interface{}
changedSinceLastSync int changedSinceLastSync int
ctrl *PersistentVolumeController ctrl *PersistentVolumeController
volumeSource *fcache.FakePVControllerSource fakeVolumeWatch *watch.FakeWatcher
claimSource *fcache.FakePVCControllerSource fakeClaimWatch *watch.FakeWatcher
lock sync.Mutex lock sync.Mutex
errors []reactorError errors []reactorError
} }
@ -176,9 +180,6 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj
} }
// Store the updated object to appropriate places. // Store the updated object to appropriate places.
if r.volumeSource != nil {
r.volumeSource.Add(volume)
}
r.volumes[volume.Name] = volume r.volumes[volume.Name] = volume
r.changedObjects = append(r.changedObjects, volume) r.changedObjects = append(r.changedObjects, volume)
r.changedSinceLastSync++ r.changedSinceLastSync++
@ -203,9 +204,6 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj
} }
// Store the updated object to appropriate places. // Store the updated object to appropriate places.
if r.volumeSource != nil {
r.volumeSource.Modify(volume)
}
r.volumes[volume.Name] = volume r.volumes[volume.Name] = volume
r.changedObjects = append(r.changedObjects, volume) r.changedObjects = append(r.changedObjects, volume)
r.changedSinceLastSync++ r.changedSinceLastSync++
@ -231,9 +229,6 @@ func (r *volumeReactor) React(action core.Action) (handled bool, ret runtime.Obj
// Store the updated object to appropriate places. // Store the updated object to appropriate places.
r.claims[claim.Name] = claim r.claims[claim.Name] = claim
if r.claimSource != nil {
r.claimSource.Modify(claim)
}
r.changedObjects = append(r.changedObjects, claim) r.changedObjects = append(r.changedObjects, claim)
r.changedSinceLastSync++ r.changedSinceLastSync++
glog.V(4).Infof("saved updated claim %s", claim.Name) glog.V(4).Infof("saved updated claim %s", claim.Name)
@ -513,9 +508,11 @@ func (r *volumeReactor) deleteVolumeEvent(volume *v1.PersistentVolume) {
// Generate deletion event. Cloned volume is needed to prevent races (and we // Generate deletion event. Cloned volume is needed to prevent races (and we
// would get a clone from etcd too). // would get a clone from etcd too).
if r.fakeVolumeWatch != nil {
clone, _ := api.Scheme.DeepCopy(volume) clone, _ := api.Scheme.DeepCopy(volume)
volumeClone := clone.(*v1.PersistentVolume) volumeClone := clone.(*v1.PersistentVolume)
r.volumeSource.Delete(volumeClone) r.fakeVolumeWatch.Delete(volumeClone)
}
} }
// deleteClaimEvent simulates that a claim has been deleted in etcd and the // deleteClaimEvent simulates that a claim has been deleted in etcd and the
@ -529,9 +526,11 @@ func (r *volumeReactor) deleteClaimEvent(claim *v1.PersistentVolumeClaim) {
// Generate deletion event. Cloned volume is needed to prevent races (and we // Generate deletion event. Cloned volume is needed to prevent races (and we
// would get a clone from etcd too). // would get a clone from etcd too).
if r.fakeClaimWatch != nil {
clone, _ := api.Scheme.DeepCopy(claim) clone, _ := api.Scheme.DeepCopy(claim)
claimClone := clone.(*v1.PersistentVolumeClaim) claimClone := clone.(*v1.PersistentVolumeClaim)
r.claimSource.Delete(claimClone) r.fakeClaimWatch.Delete(claimClone)
}
} }
// addVolumeEvent simulates that a volume has been added in etcd and the // addVolumeEvent simulates that a volume has been added in etcd and the
@ -543,7 +542,9 @@ func (r *volumeReactor) addVolumeEvent(volume *v1.PersistentVolume) {
r.volumes[volume.Name] = volume r.volumes[volume.Name] = volume
// Generate event. No cloning is needed, this claim is not stored in the // Generate event. No cloning is needed, this claim is not stored in the
// controller cache yet. // controller cache yet.
r.volumeSource.Add(volume) if r.fakeVolumeWatch != nil {
r.fakeVolumeWatch.Add(volume)
}
} }
// modifyVolumeEvent simulates that a volume has been modified in etcd and the // modifyVolumeEvent simulates that a volume has been modified in etcd and the
@ -555,9 +556,11 @@ func (r *volumeReactor) modifyVolumeEvent(volume *v1.PersistentVolume) {
r.volumes[volume.Name] = volume r.volumes[volume.Name] = volume
// Generate deletion event. Cloned volume is needed to prevent races (and we // Generate deletion event. Cloned volume is needed to prevent races (and we
// would get a clone from etcd too). // would get a clone from etcd too).
if r.fakeVolumeWatch != nil {
clone, _ := api.Scheme.DeepCopy(volume) clone, _ := api.Scheme.DeepCopy(volume)
volumeClone := clone.(*v1.PersistentVolume) volumeClone := clone.(*v1.PersistentVolume)
r.volumeSource.Modify(volumeClone) r.fakeVolumeWatch.Modify(volumeClone)
}
} }
// addClaimEvent simulates that a claim has been deleted in etcd and the // addClaimEvent simulates that a claim has been deleted in etcd and the
@ -569,45 +572,49 @@ func (r *volumeReactor) addClaimEvent(claim *v1.PersistentVolumeClaim) {
r.claims[claim.Name] = claim r.claims[claim.Name] = claim
// Generate event. No cloning is needed, this claim is not stored in the // Generate event. No cloning is needed, this claim is not stored in the
// controller cache yet. // controller cache yet.
r.claimSource.Add(claim) if r.fakeClaimWatch != nil {
r.fakeClaimWatch.Add(claim)
}
} }
func newVolumeReactor(client *fake.Clientset, ctrl *PersistentVolumeController, volumeSource *fcache.FakePVControllerSource, claimSource *fcache.FakePVCControllerSource, errors []reactorError) *volumeReactor { func newVolumeReactor(client *fake.Clientset, ctrl *PersistentVolumeController, fakeVolumeWatch, fakeClaimWatch *watch.FakeWatcher, errors []reactorError) *volumeReactor {
reactor := &volumeReactor{ reactor := &volumeReactor{
volumes: make(map[string]*v1.PersistentVolume), volumes: make(map[string]*v1.PersistentVolume),
claims: make(map[string]*v1.PersistentVolumeClaim), claims: make(map[string]*v1.PersistentVolumeClaim),
ctrl: ctrl, ctrl: ctrl,
volumeSource: volumeSource, fakeVolumeWatch: fakeVolumeWatch,
claimSource: claimSource, fakeClaimWatch: fakeClaimWatch,
errors: errors, errors: errors,
} }
client.AddReactor("*", "*", reactor.React) client.AddReactor("create", "persistentvolumes", reactor.React)
client.AddReactor("update", "persistentvolumes", reactor.React)
client.AddReactor("update", "persistentvolumeclaims", reactor.React)
client.AddReactor("get", "persistentvolumes", reactor.React)
client.AddReactor("delete", "persistentvolumes", reactor.React)
client.AddReactor("delete", "persistentvolumeclaims", reactor.React)
return reactor return reactor
} }
func alwaysReady() bool { return true }
func newTestController(kubeClient clientset.Interface, volumeSource, claimSource, classSource cache.ListerWatcher, enableDynamicProvisioning bool) *PersistentVolumeController { func newTestController(kubeClient clientset.Interface, informerFactory informers.SharedInformerFactory, enableDynamicProvisioning bool) *PersistentVolumeController {
if volumeSource == nil { if informerFactory == nil {
volumeSource = fcache.NewFakePVControllerSource() informerFactory = informers.NewSharedInformerFactory(kubeClient, controller.NoResyncPeriodFunc())
} }
if claimSource == nil {
claimSource = fcache.NewFakePVCControllerSource()
}
if classSource == nil {
classSource = fcache.NewFakeControllerSource()
}
params := ControllerParameters{ params := ControllerParameters{
KubeClient: kubeClient, KubeClient: kubeClient,
SyncPeriod: 5 * time.Second, SyncPeriod: 5 * time.Second,
VolumePlugins: []vol.VolumePlugin{}, VolumePlugins: []vol.VolumePlugin{},
VolumeSource: volumeSource, VolumeInformer: informerFactory.Core().V1().PersistentVolumes(),
ClaimSource: claimSource, ClaimInformer: informerFactory.Core().V1().PersistentVolumeClaims(),
ClassSource: classSource, ClassInformer: informerFactory.Storage().V1beta1().StorageClasses(),
EventRecorder: record.NewFakeRecorder(1000), EventRecorder: record.NewFakeRecorder(1000),
EnableDynamicProvisioning: enableDynamicProvisioning, EnableDynamicProvisioning: enableDynamicProvisioning,
} }
ctrl := NewController(params) ctrl := NewController(params)
ctrl.volumeListerSynced = alwaysReady
ctrl.claimListerSynced = alwaysReady
ctrl.classListerSynced = alwaysReady
// Speed up the test // Speed up the test
ctrl.createProvisionedPVInterval = 5 * time.Millisecond ctrl.createProvisionedPVInterval = 5 * time.Millisecond
return ctrl return ctrl
@ -924,7 +931,7 @@ func runSyncTests(t *testing.T, tests []controllerTest, storageClasses []*storag
// Initialize the controller // Initialize the controller
client := &fake.Clientset{} client := &fake.Clientset{}
ctrl := newTestController(client, nil, nil, nil, true) ctrl := newTestController(client, nil, true)
reactor := newVolumeReactor(client, ctrl, nil, nil, test.errors) reactor := newVolumeReactor(client, ctrl, nil, nil, test.errors)
for _, claim := range test.initialClaims { for _, claim := range test.initialClaims {
ctrl.claims.Add(claim) ctrl.claims.Add(claim)
@ -935,14 +942,12 @@ func runSyncTests(t *testing.T, tests []controllerTest, storageClasses []*storag
reactor.volumes[volume.Name] = volume reactor.volumes[volume.Name] = volume
} }
// Convert classes to []interface{} and forcefully inject them into // Inject classes into controller via a custom lister.
// controller. indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
storageClassPtrs := make([]interface{}, len(storageClasses)) for _, class := range storageClasses {
for i, s := range storageClasses { indexer.Add(class)
storageClassPtrs[i] = s
} }
// 1 is the resource version ctrl.classLister = storagelisters.NewStorageClassLister(indexer)
ctrl.classes.Replace(storageClassPtrs, "1")
// Run the tested functions // Run the tested functions
err := test.test(ctrl, reactor, test) err := test.test(ctrl, reactor, test)
@ -980,15 +985,14 @@ func runMultisyncTests(t *testing.T, tests []controllerTest, storageClasses []*s
// Initialize the controller // Initialize the controller
client := &fake.Clientset{} client := &fake.Clientset{}
ctrl := newTestController(client, nil, nil, nil, true) ctrl := newTestController(client, nil, true)
// Convert classes to []interface{} and forcefully inject them into // Inject classes into controller via a custom lister.
// controller. indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
storageClassPtrs := make([]interface{}, len(storageClasses)) for _, class := range storageClasses {
for i, s := range storageClasses { indexer.Add(class)
storageClassPtrs[i] = s
} }
ctrl.classes.Replace(storageClassPtrs, "1") ctrl.classLister = storagelisters.NewStorageClassLister(indexer)
reactor := newVolumeReactor(client, ctrl, nil, nil, test.errors) reactor := newVolumeReactor(client, ctrl, nil, nil, test.errors)
for _, claim := range test.initialClaims { for _, claim := range test.initialClaims {

View File

@ -422,7 +422,7 @@ func TestProvisionMultiSync(t *testing.T) {
// When provisioning is disabled, provisioning a claim should instantly return nil // When provisioning is disabled, provisioning a claim should instantly return nil
func TestDisablingDynamicProvisioner(t *testing.T) { func TestDisablingDynamicProvisioner(t *testing.T) {
ctrl := newTestController(nil, nil, nil, nil, false) ctrl := newTestController(nil, nil, false)
retVal := ctrl.provisionClaim(nil) retVal := ctrl.provisionClaim(nil)
if retVal != nil { if retVal != nil {
t.Errorf("Expected nil return but got %v", retVal) t.Errorf("Expected nil return but got %v", retVal)

View File

@ -31,6 +31,8 @@ import (
storage "k8s.io/kubernetes/pkg/apis/storage/v1beta1" storage "k8s.io/kubernetes/pkg/apis/storage/v1beta1"
storageutil "k8s.io/kubernetes/pkg/apis/storage/v1beta1/util" storageutil "k8s.io/kubernetes/pkg/apis/storage/v1beta1/util"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
storagelisters "k8s.io/kubernetes/pkg/client/listers/storage/v1beta1"
"k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/util/goroutinemap" "k8s.io/kubernetes/pkg/util/goroutinemap"
vol "k8s.io/kubernetes/pkg/volume" vol "k8s.io/kubernetes/pkg/volume"
@ -146,14 +148,13 @@ const createProvisionedPVInterval = 10 * time.Second
// cache.Controllers that watch PersistentVolume and PersistentVolumeClaim // cache.Controllers that watch PersistentVolume and PersistentVolumeClaim
// changes. // changes.
type PersistentVolumeController struct { type PersistentVolumeController struct {
volumeController cache.Controller volumeLister corelisters.PersistentVolumeLister
volumeInformer cache.Indexer volumeListerSynced cache.InformerSynced
volumeSource cache.ListerWatcher claimLister corelisters.PersistentVolumeClaimLister
claimController cache.Controller claimListerSynced cache.InformerSynced
claimInformer cache.Store classLister storagelisters.StorageClassLister
claimSource cache.ListerWatcher classListerSynced cache.InformerSynced
classReflector *cache.Reflector
classSource cache.ListerWatcher
kubeClient clientset.Interface kubeClient clientset.Interface
eventRecorder record.EventRecorder eventRecorder record.EventRecorder
cloud cloudprovider.Interface cloud cloudprovider.Interface
@ -182,7 +183,6 @@ type PersistentVolumeController struct {
// have been already written. // have been already written.
volumes persistentVolumeOrderedIndex volumes persistentVolumeOrderedIndex
claims cache.Store claims cache.Store
classes cache.Store
// Work queues of claims and volumes to process. Every queue should have // Work queues of claims and volumes to process. Every queue should have
// exactly one worker thread, especially syncClaim() is not reentrant. // exactly one worker thread, especially syncClaim() is not reentrant.
@ -1464,17 +1464,10 @@ func (ctrl *PersistentVolumeController) findProvisionablePlugin(claim *v1.Persis
// provisionClaim() which leads here is never called with claimClass=="", we // provisionClaim() which leads here is never called with claimClass=="", we
// can save some checks. // can save some checks.
claimClass := storageutil.GetClaimStorageClass(claim) claimClass := storageutil.GetClaimStorageClass(claim)
classObj, found, err := ctrl.classes.GetByKey(claimClass) class, err := ctrl.classLister.Get(claimClass)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
if !found {
return nil, nil, fmt.Errorf("StorageClass %q not found", claimClass)
}
class, ok := classObj.(*storage.StorageClass)
if !ok {
return nil, nil, fmt.Errorf("Cannot convert object to StorageClass: %+v", classObj)
}
// Find a plugin for the class // Find a plugin for the class
plugin, err := ctrl.volumePluginMgr.FindProvisionablePluginByName(class.Provisioner) plugin, err := ctrl.volumePluginMgr.FindProvisionablePluginByName(class.Provisioner)

View File

@ -24,9 +24,9 @@ import (
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
v1core "k8s.io/client-go/kubernetes/typed/core/v1" v1core "k8s.io/client-go/kubernetes/typed/core/v1"
clientv1 "k8s.io/client-go/pkg/api/v1" clientv1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
@ -36,6 +36,9 @@ import (
"k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1"
storage "k8s.io/kubernetes/pkg/apis/storage/v1beta1" storage "k8s.io/kubernetes/pkg/apis/storage/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1"
storageinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/storage/v1beta1"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
"k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/util/goroutinemap" "k8s.io/kubernetes/pkg/util/goroutinemap"
@ -57,7 +60,9 @@ type ControllerParameters struct {
VolumePlugins []vol.VolumePlugin VolumePlugins []vol.VolumePlugin
Cloud cloudprovider.Interface Cloud cloudprovider.Interface
ClusterName string ClusterName string
VolumeSource, ClaimSource, ClassSource cache.ListerWatcher VolumeInformer coreinformers.PersistentVolumeInformer
ClaimInformer coreinformers.PersistentVolumeClaimInformer
ClassInformer storageinformers.StorageClassInformer
EventRecorder record.EventRecorder EventRecorder record.EventRecorder
EnableDynamicProvisioning bool EnableDynamicProvisioning bool
} }
@ -94,98 +99,47 @@ func NewController(p ControllerParameters) *PersistentVolumeController {
} }
} }
volumeSource := p.VolumeSource p.VolumeInformer.Informer().AddEventHandlerWithResyncPeriod(
if volumeSource == nil {
volumeSource = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return p.KubeClient.Core().PersistentVolumes().List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return p.KubeClient.Core().PersistentVolumes().Watch(options)
},
}
}
controller.volumeSource = volumeSource
claimSource := p.ClaimSource
if claimSource == nil {
claimSource = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return p.KubeClient.Core().PersistentVolumeClaims(metav1.NamespaceAll).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return p.KubeClient.Core().PersistentVolumeClaims(metav1.NamespaceAll).Watch(options)
},
}
}
controller.claimSource = claimSource
classSource := p.ClassSource
if classSource == nil {
classSource = &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return p.KubeClient.Storage().StorageClasses().List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return p.KubeClient.Storage().StorageClasses().Watch(options)
},
}
}
controller.classSource = classSource
controller.volumeInformer, controller.volumeController = cache.NewIndexerInformer(
volumeSource,
&v1.PersistentVolume{},
p.SyncPeriod,
cache.ResourceEventHandlerFuncs{ cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) }, AddFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) },
UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.volumeQueue, newObj) }, UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.volumeQueue, newObj) },
DeleteFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) }, DeleteFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) },
}, },
cache.Indexers{"accessmodes": accessModesIndexFunc},
)
controller.claimInformer, controller.claimController = cache.NewInformer(
claimSource,
&v1.PersistentVolumeClaim{},
p.SyncPeriod, p.SyncPeriod,
)
controller.volumeLister = p.VolumeInformer.Lister()
controller.volumeListerSynced = p.VolumeInformer.Informer().HasSynced
p.ClaimInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{ cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) }, AddFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) },
UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.claimQueue, newObj) }, UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.claimQueue, newObj) },
DeleteFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) }, DeleteFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) },
}, },
)
// This is just a cache of StorageClass instances, no special actions are
// needed when a class is created/deleted/updated.
controller.classes = cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
controller.classReflector = cache.NewReflector(
classSource,
&storage.StorageClass{},
controller.classes,
p.SyncPeriod, p.SyncPeriod,
) )
controller.claimLister = p.ClaimInformer.Lister()
controller.claimListerSynced = p.ClaimInformer.Informer().HasSynced
controller.classLister = p.ClassInformer.Lister()
controller.classListerSynced = p.ClassInformer.Informer().HasSynced
return controller return controller
} }
// initializeCaches fills all controller caches with initial data from etcd in // initializeCaches fills all controller caches with initial data from etcd in
// order to have the caches already filled when first addClaim/addVolume to // order to have the caches already filled when first addClaim/addVolume to
// perform initial synchronization of the controller. // perform initial synchronization of the controller.
func (ctrl *PersistentVolumeController) initializeCaches(volumeSource, claimSource cache.ListerWatcher) { func (ctrl *PersistentVolumeController) initializeCaches(volumeLister corelisters.PersistentVolumeLister, claimLister corelisters.PersistentVolumeClaimLister) {
volumeListObj, err := volumeSource.List(metav1.ListOptions{}) volumeList, err := volumeLister.List(labels.Everything())
if err != nil { if err != nil {
glog.Errorf("PersistentVolumeController can't initialize caches: %v", err) glog.Errorf("PersistentVolumeController can't initialize caches: %v", err)
return return
} }
volumeList, ok := volumeListObj.(*v1.PersistentVolumeList) for _, volume := range volumeList {
if !ok {
glog.Errorf("PersistentVolumeController can't initialize caches, expected list of volumes, got: %#v", volumeListObj)
return
}
for _, volume := range volumeList.Items {
// Ignore template volumes from kubernetes 1.2 // Ignore template volumes from kubernetes 1.2
deleted := ctrl.upgradeVolumeFrom1_2(&volume) deleted := ctrl.upgradeVolumeFrom1_2(volume)
if !deleted { if !deleted {
clone, err := api.Scheme.DeepCopy(&volume) clone, err := api.Scheme.DeepCopy(volume)
if err != nil { if err != nil {
glog.Errorf("error cloning volume %q: %v", volume.Name, err) glog.Errorf("error cloning volume %q: %v", volume.Name, err)
continue continue
@ -195,20 +149,15 @@ func (ctrl *PersistentVolumeController) initializeCaches(volumeSource, claimSour
} }
} }
claimListObj, err := claimSource.List(metav1.ListOptions{}) claimList, err := claimLister.List(labels.Everything())
if err != nil { if err != nil {
glog.Errorf("PersistentVolumeController can't initialize caches: %v", err) glog.Errorf("PersistentVolumeController can't initialize caches: %v", err)
return return
} }
claimList, ok := claimListObj.(*v1.PersistentVolumeClaimList) for _, claim := range claimList {
if !ok { clone, err := api.Scheme.DeepCopy(claim)
glog.Errorf("PersistentVolumeController can't initialize caches, expected list of claims, got: %#v", claimListObj)
return
}
for _, claim := range claimList.Items {
clone, err := api.Scheme.DeepCopy(&claim)
if err != nil { if err != nil {
glog.Errorf("error cloning claim %q: %v", claimToClaimKey(&claim), err) glog.Errorf("error cloning claim %q: %v", claimToClaimKey(claim), err)
continue continue
} }
claimClone := clone.(*v1.PersistentVolumeClaim) claimClone := clone.(*v1.PersistentVolumeClaim)
@ -326,10 +275,11 @@ func (ctrl *PersistentVolumeController) deleteClaim(claim *v1.PersistentVolumeCl
// Run starts all of this controller's control loops // Run starts all of this controller's control loops
func (ctrl *PersistentVolumeController) Run(stopCh <-chan struct{}) { func (ctrl *PersistentVolumeController) Run(stopCh <-chan struct{}) {
glog.V(1).Infof("starting PersistentVolumeController") glog.V(1).Infof("starting PersistentVolumeController")
ctrl.initializeCaches(ctrl.volumeSource, ctrl.claimSource) if !cache.WaitForCacheSync(stopCh, ctrl.volumeListerSynced, ctrl.claimListerSynced, ctrl.classListerSynced) {
go ctrl.volumeController.Run(stopCh) utilruntime.HandleError(fmt.Errorf("timed out waiting for volume caches to sync"))
go ctrl.claimController.Run(stopCh) return
go ctrl.classReflector.RunUntil(stopCh) }
ctrl.initializeCaches(ctrl.volumeLister, ctrl.claimLister)
go wait.Until(ctrl.volumeWorker, time.Second, stopCh) go wait.Until(ctrl.volumeWorker, time.Second, stopCh)
go wait.Until(ctrl.claimWorker, time.Second, stopCh) go wait.Until(ctrl.claimWorker, time.Second, stopCh)
@ -351,27 +301,26 @@ func (ctrl *PersistentVolumeController) volumeWorker() {
key := keyObj.(string) key := keyObj.(string)
glog.V(5).Infof("volumeWorker[%s]", key) glog.V(5).Infof("volumeWorker[%s]", key)
volumeObj, found, err := ctrl.volumeInformer.GetByKey(key) _, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil { if err != nil {
glog.V(2).Infof("error getting volume %q from informer: %v", key, err) glog.V(4).Infof("error getting name of volume %q to get volume from informer: %v", key, err)
return false return false
} }
volume, err := ctrl.volumeLister.Get(name)
if found { if err == nil {
// The volume still exists in informer cache, the event must have // The volume still exists in informer cache, the event must have
// been add/update/sync // been add/update/sync
volume, ok := volumeObj.(*v1.PersistentVolume) ctrl.updateVolume(volume)
if !ok {
glog.Errorf("expected volume, got %+v", volumeObj)
return false return false
} }
ctrl.updateVolume(volume) if !errors.IsNotFound(err) {
glog.V(2).Infof("error getting volume %q from informer: %v", key, err)
return false return false
} }
// The volume is not in informer cache, the event must have been // The volume is not in informer cache, the event must have been
// "delete" // "delete"
volumeObj, found, err = ctrl.volumes.store.GetByKey(key) volumeObj, found, err := ctrl.volumes.store.GetByKey(key)
if err != nil { if err != nil {
glog.V(2).Infof("error getting volume %q from cache: %v", key, err) glog.V(2).Infof("error getting volume %q from cache: %v", key, err)
return false return false
@ -410,26 +359,25 @@ func (ctrl *PersistentVolumeController) claimWorker() {
key := keyObj.(string) key := keyObj.(string)
glog.V(5).Infof("claimWorker[%s]", key) glog.V(5).Infof("claimWorker[%s]", key)
claimObj, found, err := ctrl.claimInformer.GetByKey(key) namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil { if err != nil {
glog.V(4).Infof("error getting namespace & name of claim %q to get claim from informer: %v", key, err)
return false
}
claim, err := ctrl.claimLister.PersistentVolumeClaims(namespace).Get(name)
if err == nil {
// The claim still exists in informer cache, the event must have
// been add/update/sync
ctrl.updateClaim(claim)
return false
}
if !errors.IsNotFound(err) {
glog.V(2).Infof("error getting claim %q from informer: %v", key, err) glog.V(2).Infof("error getting claim %q from informer: %v", key, err)
return false return false
} }
if found {
// The claim still exists in informer cache, the event must have
// been add/update/sync
claim, ok := claimObj.(*v1.PersistentVolumeClaim)
if !ok {
glog.Errorf("expected claim, got %+v", claimObj)
return false
}
ctrl.updateClaim(claim)
return false
}
// The claim is not in informer cache, the event must have been "delete" // The claim is not in informer cache, the event must have been "delete"
claimObj, found, err = ctrl.claims.GetByKey(key) claimObj, found, err := ctrl.claims.GetByKey(key)
if err != nil { if err != nil {
glog.V(2).Infof("error getting claim %q from cache: %v", key, err) glog.V(2).Infof("error getting claim %q from cache: %v", key, err)
return false return false

View File

@ -21,10 +21,13 @@ import (
"time" "time"
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/apimachinery/pkg/watch"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
fcache "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/testing" informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
"k8s.io/kubernetes/pkg/controller"
) )
// Test the real controller methods (add/update/delete claim/volume) with // Test the real controller methods (add/update/delete claim/volume) with
@ -161,26 +164,38 @@ func TestControllerSync(t *testing.T) {
// Initialize the controller // Initialize the controller
client := &fake.Clientset{} client := &fake.Clientset{}
volumeSource := fcache.NewFakePVControllerSource()
claimSource := fcache.NewFakePVCControllerSource() fakeVolumeWatch := watch.NewFake()
ctrl := newTestController(client, volumeSource, claimSource, nil, true) client.PrependWatchReactor("persistentvolumes", core.DefaultWatchReactor(fakeVolumeWatch, nil))
reactor := newVolumeReactor(client, ctrl, volumeSource, claimSource, test.errors) fakeClaimWatch := watch.NewFake()
client.PrependWatchReactor("persistentvolumeclaims", core.DefaultWatchReactor(fakeClaimWatch, nil))
client.PrependWatchReactor("storageclasses", core.DefaultWatchReactor(watch.NewFake(), nil))
informers := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
ctrl := newTestController(client, informers, true)
reactor := newVolumeReactor(client, ctrl, fakeVolumeWatch, fakeClaimWatch, test.errors)
for _, claim := range test.initialClaims { for _, claim := range test.initialClaims {
claimSource.Add(claim)
reactor.claims[claim.Name] = claim reactor.claims[claim.Name] = claim
go func(claim *v1.PersistentVolumeClaim) {
fakeClaimWatch.Add(claim)
}(claim)
} }
for _, volume := range test.initialVolumes { for _, volume := range test.initialVolumes {
volumeSource.Add(volume)
reactor.volumes[volume.Name] = volume reactor.volumes[volume.Name] = volume
go func(volume *v1.PersistentVolume) {
fakeVolumeWatch.Add(volume)
}(volume)
} }
// Start the controller // Start the controller
stopCh := make(chan struct{}) stopCh := make(chan struct{})
informers.Start(stopCh)
go ctrl.Run(stopCh) go ctrl.Run(stopCh)
// Wait for the controller to pass initial sync and fill its caches. // Wait for the controller to pass initial sync and fill its caches.
for !ctrl.volumeController.HasSynced() || for !ctrl.volumeListerSynced() ||
!ctrl.claimController.HasSynced() || !ctrl.claimListerSynced() ||
len(ctrl.claims.ListKeys()) < len(test.initialClaims) || len(ctrl.claims.ListKeys()) < len(test.initialClaims) ||
len(ctrl.volumes.store.ListKeys()) < len(test.initialVolumes) { len(ctrl.volumes.store.ListKeys()) < len(test.initialVolumes) {

View File

@ -1,50 +0,0 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_test(
name = "go_default_test",
srcs = ["fake_controller_source_test.go"],
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/watch",
],
)
go_library(
name = "go_default_library",
srcs = ["fake_controller_source.go"],
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",
"//vendor:k8s.io/apimachinery/pkg/api/meta",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/types",
"//vendor:k8s.io/apimachinery/pkg/watch",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@ -1,264 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"errors"
"math/rand"
"strconv"
"sync"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
)
func NewFakeControllerSource() *FakeControllerSource {
return &FakeControllerSource{
Items: map[nnu]runtime.Object{},
Broadcaster: watch.NewBroadcaster(100, watch.WaitIfChannelFull),
}
}
func NewFakePVControllerSource() *FakePVControllerSource {
return &FakePVControllerSource{
FakeControllerSource{
Items: map[nnu]runtime.Object{},
Broadcaster: watch.NewBroadcaster(100, watch.WaitIfChannelFull),
}}
}
func NewFakePVCControllerSource() *FakePVCControllerSource {
return &FakePVCControllerSource{
FakeControllerSource{
Items: map[nnu]runtime.Object{},
Broadcaster: watch.NewBroadcaster(100, watch.WaitIfChannelFull),
}}
}
// FakeControllerSource implements listing/watching for testing.
type FakeControllerSource struct {
lock sync.RWMutex
Items map[nnu]runtime.Object
changes []watch.Event // one change per resourceVersion
Broadcaster *watch.Broadcaster
}
type FakePVControllerSource struct {
FakeControllerSource
}
type FakePVCControllerSource struct {
FakeControllerSource
}
// namespace, name, uid to be used as a key.
type nnu struct {
namespace, name string
uid types.UID
}
// Add adds an object to the set and sends an add event to watchers.
// obj's ResourceVersion is set.
func (f *FakeControllerSource) Add(obj runtime.Object) {
f.Change(watch.Event{Type: watch.Added, Object: obj}, 1)
}
// Modify updates an object in the set and sends a modified event to watchers.
// obj's ResourceVersion is set.
func (f *FakeControllerSource) Modify(obj runtime.Object) {
f.Change(watch.Event{Type: watch.Modified, Object: obj}, 1)
}
// Delete deletes an object from the set and sends a delete event to watchers.
// obj's ResourceVersion is set.
func (f *FakeControllerSource) Delete(lastValue runtime.Object) {
f.Change(watch.Event{Type: watch.Deleted, Object: lastValue}, 1)
}
// AddDropWatch adds an object to the set but forgets to send an add event to
// watchers.
// obj's ResourceVersion is set.
func (f *FakeControllerSource) AddDropWatch(obj runtime.Object) {
f.Change(watch.Event{Type: watch.Added, Object: obj}, 0)
}
// ModifyDropWatch updates an object in the set but forgets to send a modify
// event to watchers.
// obj's ResourceVersion is set.
func (f *FakeControllerSource) ModifyDropWatch(obj runtime.Object) {
f.Change(watch.Event{Type: watch.Modified, Object: obj}, 0)
}
// DeleteDropWatch deletes an object from the set but forgets to send a delete
// event to watchers.
// obj's ResourceVersion is set.
func (f *FakeControllerSource) DeleteDropWatch(lastValue runtime.Object) {
f.Change(watch.Event{Type: watch.Deleted, Object: lastValue}, 0)
}
func (f *FakeControllerSource) key(accessor metav1.Object) nnu {
return nnu{accessor.GetNamespace(), accessor.GetName(), accessor.GetUID()}
}
// Change records the given event (setting the object's resource version) and
// sends a watch event with the specified probability.
func (f *FakeControllerSource) Change(e watch.Event, watchProbability float64) {
f.lock.Lock()
defer f.lock.Unlock()
accessor, err := meta.Accessor(e.Object)
if err != nil {
panic(err) // this is test code only
}
resourceVersion := len(f.changes) + 1
accessor.SetResourceVersion(strconv.Itoa(resourceVersion))
f.changes = append(f.changes, e)
key := f.key(accessor)
switch e.Type {
case watch.Added, watch.Modified:
f.Items[key] = e.Object
case watch.Deleted:
delete(f.Items, key)
}
if rand.Float64() < watchProbability {
f.Broadcaster.Action(e.Type, e.Object)
}
}
func (f *FakeControllerSource) getListItemsLocked() ([]runtime.Object, error) {
list := make([]runtime.Object, 0, len(f.Items))
for _, obj := range f.Items {
// Must make a copy to allow clients to modify the object.
// Otherwise, if they make a change and write it back, they
// will inadvertently change our canonical copy (in
// addition to racing with other clients).
objCopy, err := api.Scheme.DeepCopy(obj)
if err != nil {
return nil, err
}
list = append(list, objCopy.(runtime.Object))
}
return list, nil
}
// List returns a list object, with its resource version set.
func (f *FakeControllerSource) List(options metav1.ListOptions) (runtime.Object, error) {
f.lock.RLock()
defer f.lock.RUnlock()
list, err := f.getListItemsLocked()
if err != nil {
return nil, err
}
listObj := &api.List{}
if err := meta.SetList(listObj, list); err != nil {
return nil, err
}
objMeta, err := metav1.ListMetaFor(listObj)
if err != nil {
return nil, err
}
resourceVersion := len(f.changes)
objMeta.ResourceVersion = strconv.Itoa(resourceVersion)
return listObj, nil
}
// List returns a list object, with its resource version set.
func (f *FakePVControllerSource) List(options metav1.ListOptions) (runtime.Object, error) {
f.lock.RLock()
defer f.lock.RUnlock()
list, err := f.FakeControllerSource.getListItemsLocked()
if err != nil {
return nil, err
}
listObj := &v1.PersistentVolumeList{}
if err := meta.SetList(listObj, list); err != nil {
return nil, err
}
objMeta, err := metav1.ListMetaFor(listObj)
if err != nil {
return nil, err
}
resourceVersion := len(f.changes)
objMeta.ResourceVersion = strconv.Itoa(resourceVersion)
return listObj, nil
}
// List returns a list object, with its resource version set.
func (f *FakePVCControllerSource) List(options metav1.ListOptions) (runtime.Object, error) {
f.lock.RLock()
defer f.lock.RUnlock()
list, err := f.FakeControllerSource.getListItemsLocked()
if err != nil {
return nil, err
}
listObj := &v1.PersistentVolumeClaimList{}
if err := meta.SetList(listObj, list); err != nil {
return nil, err
}
objMeta, err := metav1.ListMetaFor(listObj)
if err != nil {
return nil, err
}
resourceVersion := len(f.changes)
objMeta.ResourceVersion = strconv.Itoa(resourceVersion)
return listObj, nil
}
// Watch returns a watch, which will be pre-populated with all changes
// after resourceVersion.
func (f *FakeControllerSource) Watch(options metav1.ListOptions) (watch.Interface, error) {
f.lock.RLock()
defer f.lock.RUnlock()
rc, err := strconv.Atoi(options.ResourceVersion)
if err != nil {
return nil, err
}
if rc < len(f.changes) {
changes := []watch.Event{}
for _, c := range f.changes[rc:] {
// Must make a copy to allow clients to modify the
// object. Otherwise, if they make a change and write
// it back, they will inadvertently change the our
// canonical copy (in addition to racing with other
// clients).
objCopy, err := api.Scheme.DeepCopy(c.Object)
if err != nil {
return nil, err
}
changes = append(changes, watch.Event{Type: c.Type, Object: objCopy.(runtime.Object)})
}
return f.Broadcaster.WatchWithPrefix(changes), nil
} else if rc > len(f.changes) {
return nil, errors.New("resource version in the future not supported by this fake")
}
return f.Broadcaster.Watch(), nil
}
// Shutdown closes the underlying broadcaster, waiting for events to be
// delivered. It's an error to call any method after calling shutdown. This is
// enforced by Shutdown() leaving f locked.
func (f *FakeControllerSource) Shutdown() {
f.lock.Lock() // Purposely no unlock.
f.Broadcaster.Shutdown()
}

View File

@ -1,96 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framework
import (
"sync"
"testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
)
// ensure the watch delivers the requested and only the requested items.
func consume(t *testing.T, w watch.Interface, rvs []string, done *sync.WaitGroup) {
defer done.Done()
for _, rv := range rvs {
got, ok := <-w.ResultChan()
if !ok {
t.Errorf("%#v: unexpected channel close, wanted %v", rvs, rv)
return
}
gotRV := got.Object.(*v1.Pod).ObjectMeta.ResourceVersion
if e, a := rv, gotRV; e != a {
t.Errorf("wanted %v, got %v", e, a)
} else {
t.Logf("Got %v as expected", gotRV)
}
}
// We should not get anything else.
got, open := <-w.ResultChan()
if open {
t.Errorf("%#v: unwanted object %#v", rvs, got)
}
}
func TestRCNumber(t *testing.T) {
pod := func(name string) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
}
}
wg := &sync.WaitGroup{}
wg.Add(3)
source := NewFakeControllerSource()
source.Add(pod("foo"))
source.Modify(pod("foo"))
source.Modify(pod("foo"))
w, err := source.Watch(metav1.ListOptions{ResourceVersion: "1"})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
go consume(t, w, []string{"2", "3"}, wg)
list, err := source.List(metav1.ListOptions{})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if e, a := "3", list.(*api.List).ResourceVersion; e != a {
t.Errorf("wanted %v, got %v", e, a)
}
w2, err := source.Watch(metav1.ListOptions{ResourceVersion: "2"})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
go consume(t, w2, []string{"3"}, wg)
w3, err := source.Watch(metav1.ListOptions{ResourceVersion: "3"})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
go consume(t, w3, []string{}, wg)
source.Shutdown()
wg.Wait()
}

View File

@ -314,6 +314,7 @@ func ClusterRoles() []rbac.ClusterRole {
rbac.NewRule("list", "watch").Groups(policyGroup).Resources("poddisruptionbudgets").RuleOrDie(), rbac.NewRule("list", "watch").Groups(policyGroup).Resources("poddisruptionbudgets").RuleOrDie(),
rbac.NewRule("list", "watch").Groups(autoscalingGroup).Resources("horizontalpodautoscalers").RuleOrDie(), rbac.NewRule("list", "watch").Groups(autoscalingGroup).Resources("horizontalpodautoscalers").RuleOrDie(),
rbac.NewRule("list", "watch").Groups(certificatesGroup).Resources("certificatesigningrequests").RuleOrDie(), rbac.NewRule("list", "watch").Groups(certificatesGroup).Resources("certificatesigningrequests").RuleOrDie(),
rbac.NewRule("list", "watch").Groups(storageGroup).Resources("storageclasses").RuleOrDie(),
}, },
}, },
{ {

View File

@ -523,6 +523,13 @@ items:
verbs: verbs:
- list - list
- watch - watch
- apiGroups:
- storage.k8s.io
resources:
- storageclasses
verbs:
- list
- watch
- apiVersion: rbac.authorization.k8s.io/v1beta1 - apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRole kind: ClusterRole
metadata: metadata:

View File

@ -36,6 +36,7 @@ import (
storage "k8s.io/kubernetes/pkg/apis/storage/v1beta1" storage "k8s.io/kubernetes/pkg/apis/storage/v1beta1"
storageutil "k8s.io/kubernetes/pkg/apis/storage/v1beta1/util" storageutil "k8s.io/kubernetes/pkg/apis/storage/v1beta1/util"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake" fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/volume/persistentvolume" persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
"k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume"
@ -110,7 +111,7 @@ func TestPersistentVolumeRecycler(t *testing.T) {
ns := framework.CreateTestingNamespace("pv-recycler", s, t) ns := framework.CreateTestingNamespace("pv-recycler", s, t)
defer framework.DeleteTestingNamespace(ns, s, t) defer framework.DeleteTestingNamespace(ns, s, t)
testClient, ctrl, watchPV, watchPVC := createClients(ns, t, s, defaultSyncPeriod) testClient, ctrl, informers, watchPV, watchPVC := createClients(ns, t, s, defaultSyncPeriod)
defer watchPV.Stop() defer watchPV.Stop()
defer watchPVC.Stop() defer watchPVC.Stop()
@ -119,6 +120,7 @@ func TestPersistentVolumeRecycler(t *testing.T) {
defer testClient.Core().PersistentVolumes().DeleteCollection(nil, metav1.ListOptions{}) defer testClient.Core().PersistentVolumes().DeleteCollection(nil, metav1.ListOptions{})
stopCh := make(chan struct{}) stopCh := make(chan struct{})
informers.Start(stopCh)
go ctrl.Run(stopCh) go ctrl.Run(stopCh)
defer close(stopCh) defer close(stopCh)
@ -164,7 +166,7 @@ func TestPersistentVolumeDeleter(t *testing.T) {
ns := framework.CreateTestingNamespace("pv-deleter", s, t) ns := framework.CreateTestingNamespace("pv-deleter", s, t)
defer framework.DeleteTestingNamespace(ns, s, t) defer framework.DeleteTestingNamespace(ns, s, t)
testClient, ctrl, watchPV, watchPVC := createClients(ns, t, s, defaultSyncPeriod) testClient, ctrl, informers, watchPV, watchPVC := createClients(ns, t, s, defaultSyncPeriod)
defer watchPV.Stop() defer watchPV.Stop()
defer watchPVC.Stop() defer watchPVC.Stop()
@ -173,6 +175,7 @@ func TestPersistentVolumeDeleter(t *testing.T) {
defer testClient.Core().PersistentVolumes().DeleteCollection(nil, metav1.ListOptions{}) defer testClient.Core().PersistentVolumes().DeleteCollection(nil, metav1.ListOptions{})
stopCh := make(chan struct{}) stopCh := make(chan struct{})
informers.Start(stopCh)
go ctrl.Run(stopCh) go ctrl.Run(stopCh)
defer close(stopCh) defer close(stopCh)
@ -223,7 +226,7 @@ func TestPersistentVolumeBindRace(t *testing.T) {
ns := framework.CreateTestingNamespace("pv-bind-race", s, t) ns := framework.CreateTestingNamespace("pv-bind-race", s, t)
defer framework.DeleteTestingNamespace(ns, s, t) defer framework.DeleteTestingNamespace(ns, s, t)
testClient, ctrl, watchPV, watchPVC := createClients(ns, t, s, defaultSyncPeriod) testClient, ctrl, informers, watchPV, watchPVC := createClients(ns, t, s, defaultSyncPeriod)
defer watchPV.Stop() defer watchPV.Stop()
defer watchPVC.Stop() defer watchPVC.Stop()
@ -232,6 +235,7 @@ func TestPersistentVolumeBindRace(t *testing.T) {
defer testClient.Core().PersistentVolumes().DeleteCollection(nil, metav1.ListOptions{}) defer testClient.Core().PersistentVolumes().DeleteCollection(nil, metav1.ListOptions{})
stopCh := make(chan struct{}) stopCh := make(chan struct{})
informers.Start(stopCh)
go ctrl.Run(stopCh) go ctrl.Run(stopCh)
defer close(stopCh) defer close(stopCh)
@ -294,7 +298,7 @@ func TestPersistentVolumeClaimLabelSelector(t *testing.T) {
ns := framework.CreateTestingNamespace("pvc-label-selector", s, t) ns := framework.CreateTestingNamespace("pvc-label-selector", s, t)
defer framework.DeleteTestingNamespace(ns, s, t) defer framework.DeleteTestingNamespace(ns, s, t)
testClient, controller, watchPV, watchPVC := createClients(ns, t, s, defaultSyncPeriod) testClient, controller, informers, watchPV, watchPVC := createClients(ns, t, s, defaultSyncPeriod)
defer watchPV.Stop() defer watchPV.Stop()
defer watchPVC.Stop() defer watchPVC.Stop()
@ -303,6 +307,7 @@ func TestPersistentVolumeClaimLabelSelector(t *testing.T) {
defer testClient.Core().PersistentVolumes().DeleteCollection(nil, metav1.ListOptions{}) defer testClient.Core().PersistentVolumes().DeleteCollection(nil, metav1.ListOptions{})
stopCh := make(chan struct{}) stopCh := make(chan struct{})
informers.Start(stopCh)
go controller.Run(stopCh) go controller.Run(stopCh)
defer close(stopCh) defer close(stopCh)
@ -374,7 +379,7 @@ func TestPersistentVolumeClaimLabelSelectorMatchExpressions(t *testing.T) {
ns := framework.CreateTestingNamespace("pvc-match-expressions", s, t) ns := framework.CreateTestingNamespace("pvc-match-expressions", s, t)
defer framework.DeleteTestingNamespace(ns, s, t) defer framework.DeleteTestingNamespace(ns, s, t)
testClient, controller, watchPV, watchPVC := createClients(ns, t, s, defaultSyncPeriod) testClient, controller, informers, watchPV, watchPVC := createClients(ns, t, s, defaultSyncPeriod)
defer watchPV.Stop() defer watchPV.Stop()
defer watchPVC.Stop() defer watchPVC.Stop()
@ -383,6 +388,7 @@ func TestPersistentVolumeClaimLabelSelectorMatchExpressions(t *testing.T) {
defer testClient.Core().PersistentVolumes().DeleteCollection(nil, metav1.ListOptions{}) defer testClient.Core().PersistentVolumes().DeleteCollection(nil, metav1.ListOptions{})
stopCh := make(chan struct{}) stopCh := make(chan struct{})
informers.Start(stopCh)
go controller.Run(stopCh) go controller.Run(stopCh)
defer close(stopCh) defer close(stopCh)
@ -473,7 +479,7 @@ func TestPersistentVolumeMultiPVs(t *testing.T) {
ns := framework.CreateTestingNamespace("multi-pvs", s, t) ns := framework.CreateTestingNamespace("multi-pvs", s, t)
defer framework.DeleteTestingNamespace(ns, s, t) defer framework.DeleteTestingNamespace(ns, s, t)
testClient, controller, watchPV, watchPVC := createClients(ns, t, s, defaultSyncPeriod) testClient, controller, informers, watchPV, watchPVC := createClients(ns, t, s, defaultSyncPeriod)
defer watchPV.Stop() defer watchPV.Stop()
defer watchPVC.Stop() defer watchPVC.Stop()
@ -482,6 +488,7 @@ func TestPersistentVolumeMultiPVs(t *testing.T) {
defer testClient.Core().PersistentVolumes().DeleteCollection(nil, metav1.ListOptions{}) defer testClient.Core().PersistentVolumes().DeleteCollection(nil, metav1.ListOptions{})
stopCh := make(chan struct{}) stopCh := make(chan struct{})
informers.Start(stopCh)
go controller.Run(stopCh) go controller.Run(stopCh)
defer close(stopCh) defer close(stopCh)
@ -562,7 +569,7 @@ func TestPersistentVolumeMultiPVsPVCs(t *testing.T) {
ns := framework.CreateTestingNamespace("multi-pvs-pvcs", s, t) ns := framework.CreateTestingNamespace("multi-pvs-pvcs", s, t)
defer framework.DeleteTestingNamespace(ns, s, t) defer framework.DeleteTestingNamespace(ns, s, t)
testClient, binder, watchPV, watchPVC := createClients(ns, t, s, defaultSyncPeriod) testClient, binder, informers, watchPV, watchPVC := createClients(ns, t, s, defaultSyncPeriod)
defer watchPV.Stop() defer watchPV.Stop()
defer watchPVC.Stop() defer watchPVC.Stop()
@ -571,6 +578,7 @@ func TestPersistentVolumeMultiPVsPVCs(t *testing.T) {
defer testClient.Core().PersistentVolumes().DeleteCollection(nil, metav1.ListOptions{}) defer testClient.Core().PersistentVolumes().DeleteCollection(nil, metav1.ListOptions{})
controllerStopCh := make(chan struct{}) controllerStopCh := make(chan struct{})
informers.Start(controllerStopCh)
go binder.Run(controllerStopCh) go binder.Run(controllerStopCh)
defer close(controllerStopCh) defer close(controllerStopCh)
@ -727,7 +735,7 @@ func TestPersistentVolumeControllerStartup(t *testing.T) {
const shortSyncPeriod = 2 * time.Second const shortSyncPeriod = 2 * time.Second
syncPeriod := getSyncPeriod(shortSyncPeriod) syncPeriod := getSyncPeriod(shortSyncPeriod)
testClient, binder, watchPV, watchPVC := createClients(ns, t, s, shortSyncPeriod) testClient, binder, informers, watchPV, watchPVC := createClients(ns, t, s, shortSyncPeriod)
defer watchPV.Stop() defer watchPV.Stop()
defer watchPVC.Stop() defer watchPVC.Stop()
@ -784,6 +792,7 @@ func TestPersistentVolumeControllerStartup(t *testing.T) {
// Start the controller when all PVs and PVCs are already saved in etcd // Start the controller when all PVs and PVCs are already saved in etcd
stopCh := make(chan struct{}) stopCh := make(chan struct{})
informers.Start(stopCh)
go binder.Run(stopCh) go binder.Run(stopCh)
defer close(stopCh) defer close(stopCh)
@ -850,7 +859,7 @@ func TestPersistentVolumeProvisionMultiPVCs(t *testing.T) {
ns := framework.CreateTestingNamespace("provision-multi-pvs", s, t) ns := framework.CreateTestingNamespace("provision-multi-pvs", s, t)
defer framework.DeleteTestingNamespace(ns, s, t) defer framework.DeleteTestingNamespace(ns, s, t)
testClient, binder, watchPV, watchPVC := createClients(ns, t, s, defaultSyncPeriod) testClient, binder, informers, watchPV, watchPVC := createClients(ns, t, s, defaultSyncPeriod)
defer watchPV.Stop() defer watchPV.Stop()
defer watchPVC.Stop() defer watchPVC.Stop()
@ -871,6 +880,7 @@ func TestPersistentVolumeProvisionMultiPVCs(t *testing.T) {
testClient.Storage().StorageClasses().Create(&storageClass) testClient.Storage().StorageClasses().Create(&storageClass)
stopCh := make(chan struct{}) stopCh := make(chan struct{})
informers.Start(stopCh)
go binder.Run(stopCh) go binder.Run(stopCh)
defer close(stopCh) defer close(stopCh)
@ -947,7 +957,7 @@ func TestPersistentVolumeMultiPVsDiffAccessModes(t *testing.T) {
ns := framework.CreateTestingNamespace("multi-pvs-diff-access", s, t) ns := framework.CreateTestingNamespace("multi-pvs-diff-access", s, t)
defer framework.DeleteTestingNamespace(ns, s, t) defer framework.DeleteTestingNamespace(ns, s, t)
testClient, controller, watchPV, watchPVC := createClients(ns, t, s, defaultSyncPeriod) testClient, controller, informers, watchPV, watchPVC := createClients(ns, t, s, defaultSyncPeriod)
defer watchPV.Stop() defer watchPV.Stop()
defer watchPVC.Stop() defer watchPVC.Stop()
@ -956,6 +966,7 @@ func TestPersistentVolumeMultiPVsDiffAccessModes(t *testing.T) {
defer testClient.Core().PersistentVolumes().DeleteCollection(nil, metav1.ListOptions{}) defer testClient.Core().PersistentVolumes().DeleteCollection(nil, metav1.ListOptions{})
stopCh := make(chan struct{}) stopCh := make(chan struct{})
informers.Start(stopCh)
go controller.Run(stopCh) go controller.Run(stopCh)
defer close(stopCh) defer close(stopCh)
@ -1088,7 +1099,7 @@ func waitForAnyPersistentVolumeClaimPhase(w watch.Interface, phase v1.Persistent
} }
} }
func createClients(ns *v1.Namespace, t *testing.T, s *httptest.Server, syncPeriod time.Duration) (*clientset.Clientset, *persistentvolumecontroller.PersistentVolumeController, watch.Interface, watch.Interface) { func createClients(ns *v1.Namespace, t *testing.T, s *httptest.Server, syncPeriod time.Duration) (*clientset.Clientset, *persistentvolumecontroller.PersistentVolumeController, informers.SharedInformerFactory, watch.Interface, watch.Interface) {
// Use higher QPS and Burst, there is a test for race conditions which // Use higher QPS and Burst, there is a test for race conditions which
// creates many objects and default values were too low. // creates many objects and default values were too low.
binderClient := clientset.NewForConfigOrDie(&restclient.Config{ binderClient := clientset.NewForConfigOrDie(&restclient.Config{
@ -1119,12 +1130,16 @@ func createClients(ns *v1.Namespace, t *testing.T, s *httptest.Server, syncPerio
} }
plugins := []volume.VolumePlugin{plugin} plugins := []volume.VolumePlugin{plugin}
cloud := &fakecloud.FakeCloud{} cloud := &fakecloud.FakeCloud{}
informers := informers.NewSharedInformerFactory(testClient, getSyncPeriod(syncPeriod))
ctrl := persistentvolumecontroller.NewController( ctrl := persistentvolumecontroller.NewController(
persistentvolumecontroller.ControllerParameters{ persistentvolumecontroller.ControllerParameters{
KubeClient: binderClient, KubeClient: binderClient,
SyncPeriod: getSyncPeriod(syncPeriod), SyncPeriod: getSyncPeriod(syncPeriod),
VolumePlugins: plugins, VolumePlugins: plugins,
Cloud: cloud, Cloud: cloud,
VolumeInformer: informers.Core().V1().PersistentVolumes(),
ClaimInformer: informers.Core().V1().PersistentVolumeClaims(),
ClassInformer: informers.Storage().V1beta1().StorageClasses(),
EnableDynamicProvisioning: true, EnableDynamicProvisioning: true,
}) })
@ -1137,7 +1152,7 @@ func createClients(ns *v1.Namespace, t *testing.T, s *httptest.Server, syncPerio
t.Fatalf("Failed to watch PersistentVolumeClaims: %v", err) t.Fatalf("Failed to watch PersistentVolumeClaims: %v", err)
} }
return testClient, ctrl, watchPV, watchPVC return testClient, ctrl, informers, watchPV, watchPVC
} }
func createPV(name, path, cap string, mode []v1.PersistentVolumeAccessMode, reclaim v1.PersistentVolumeReclaimPolicy) *v1.PersistentVolume { func createPV(name, path, cap string, mode []v1.PersistentVolumeAccessMode, reclaim v1.PersistentVolumeReclaimPolicy) *v1.PersistentVolume {