Added PersistentVolumeController

pull/6/head
markturansky 2015-10-12 14:27:49 -04:00
parent d3243b8778
commit 4fc1bf1f23
20 changed files with 1195 additions and 129 deletions

View File

@ -137,6 +137,7 @@ func NewCMServer() *CMServer {
PersistentVolumeRecyclerIncrementTimeoutNFS: 30,
PersistentVolumeRecyclerMinimumTimeoutHostPath: 60,
PersistentVolumeRecyclerIncrementTimeoutHostPath: 30,
EnableHostPathProvisioning: false,
},
KubeAPIQPS: 20.0,
KubeAPIBurst: 30,
@ -176,6 +177,7 @@ type VolumeConfigFlags struct {
PersistentVolumeRecyclerPodTemplateFilePathHostPath string
PersistentVolumeRecyclerMinimumTimeoutHostPath int
PersistentVolumeRecyclerIncrementTimeoutHostPath int
EnableHostPathProvisioning bool
}
// AddFlags adds flags for a specific CMServer to the specified FlagSet
@ -201,6 +203,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.VolumeConfigFlags.PersistentVolumeRecyclerPodTemplateFilePathHostPath, "pv-recycler-pod-template-filepath-hostpath", s.VolumeConfigFlags.PersistentVolumeRecyclerPodTemplateFilePathHostPath, "The file path to a pod definition used as a template for HostPath persistent volume recycling. This is for development and testing only and will not work in a multi-node cluster.")
fs.IntVar(&s.VolumeConfigFlags.PersistentVolumeRecyclerMinimumTimeoutHostPath, "pv-recycler-minimum-timeout-hostpath", s.VolumeConfigFlags.PersistentVolumeRecyclerMinimumTimeoutHostPath, "The minimum ActiveDeadlineSeconds to use for a HostPath Recycler pod. This is for development and testing only and will not work in a multi-node cluster.")
fs.IntVar(&s.VolumeConfigFlags.PersistentVolumeRecyclerIncrementTimeoutHostPath, "pv-recycler-timeout-increment-hostpath", s.VolumeConfigFlags.PersistentVolumeRecyclerIncrementTimeoutHostPath, "the increment of time added per Gi to ActiveDeadlineSeconds for a HostPath scrubber pod. This is for development and testing only and will not work in a multi-node cluster.")
fs.BoolVar(&s.VolumeConfigFlags.EnableHostPathProvisioning, "enable-hostpath-provisioner", s.VolumeConfigFlags.EnableHostPathProvisioning, "Enable HostPath PV provisioning when running without a cloud provider. This allows testing and development of provisioning features. HostPath provisioning is not supported in any way, won't work in a multi-node cluster, and should not be used for anything other than testing or development.")
fs.IntVar(&s.TerminatedPodGCThreshold, "terminated-pod-gc-threshold", s.TerminatedPodGCThreshold, "Number of terminated pods that can exist before the terminated pod garbage collector starts deleting terminated pods. If <= 0, the terminated pod garbage collector is disabled.")
fs.DurationVar(&s.HorizontalPodAutoscalerSyncPeriod, "horizontal-pod-autoscaler-sync-period", s.HorizontalPodAutoscalerSyncPeriod, "The period for syncing the number of pods in horizontal pod autoscaler.")
fs.DurationVar(&s.DeploymentControllerSyncPeriod, "deployment-controller-sync-period", s.DeploymentControllerSyncPeriod, "Period for syncing the deployments.")
@ -385,15 +388,29 @@ func (s *CMServer) Run(_ []string) error {
}
}
volumePlugins := ProbeRecyclableVolumePlugins(s.VolumeConfigFlags)
provisioner, err := NewVolumeProvisioner(cloud, s.VolumeConfigFlags)
if err != nil {
glog.Fatal("A Provisioner could not be created, but one was expected. Provisioning will not work. This functionality is considered an early Alpha version.")
}
pvclaimBinder := persistentvolumecontroller.NewPersistentVolumeClaimBinder(clientForUserAgentOrDie(*kubeconfig, "persistent-volume-binder"), s.PVClaimBinderSyncPeriod)
pvclaimBinder.Run()
pvRecycler, err := persistentvolumecontroller.NewPersistentVolumeRecycler(clientForUserAgentOrDie(*kubeconfig, "persistent-volume-recycler"), s.PVClaimBinderSyncPeriod, ProbeRecyclableVolumePlugins(s.VolumeConfigFlags))
pvRecycler, err := persistentvolumecontroller.NewPersistentVolumeRecycler(clientForUserAgentOrDie(*kubeconfig, "persistent-volume-recycler"), s.PVClaimBinderSyncPeriod, ProbeRecyclableVolumePlugins(s.VolumeConfigFlags), cloud)
if err != nil {
glog.Fatalf("Failed to start persistent volume recycler: %+v", err)
}
pvRecycler.Run()
if provisioner != nil {
pvController, err := persistentvolumecontroller.NewPersistentVolumeProvisionerController(persistentvolumecontroller.NewControllerClient(clientForUserAgentOrDie(*kubeconfig, "persistent-volume-provisioner")), s.PVClaimBinderSyncPeriod, volumePlugins, provisioner, cloud)
if err != nil {
glog.Fatalf("Failed to start persistent volume provisioner controller: %+v", err)
}
pvController.Run()
}
var rootCA []byte
if s.RootCAFile != "" {

View File

@ -21,12 +21,18 @@ import (
// This should probably be part of some configuration fed into the build for a
// given binary target.
"fmt"
//Cloud providers
_ "k8s.io/kubernetes/pkg/cloudprovider/providers"
// Volume plugins
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/util/io"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/aws_ebs"
"k8s.io/kubernetes/pkg/volume/cinder"
"k8s.io/kubernetes/pkg/volume/gce_pd"
"k8s.io/kubernetes/pkg/volume/host_path"
"k8s.io/kubernetes/pkg/volume/nfs"
@ -51,7 +57,7 @@ func ProbeRecyclableVolumePlugins(flags VolumeConfigFlags) []volume.VolumePlugin
RecyclerTimeoutIncrement: flags.PersistentVolumeRecyclerIncrementTimeoutHostPath,
RecyclerPodTemplate: volume.NewPersistentVolumeRecyclerPodTemplate(),
}
if err := attemptToLoadRecycler(flags.PersistentVolumeRecyclerPodTemplateFilePathHostPath, &hostPathConfig); err != nil {
if err := AttemptToLoadRecycler(flags.PersistentVolumeRecyclerPodTemplateFilePathHostPath, &hostPathConfig); err != nil {
glog.Fatalf("Could not create hostpath recycler pod from file %s: %+v", flags.PersistentVolumeRecyclerPodTemplateFilePathHostPath, err)
}
allPlugins = append(allPlugins, host_path.ProbeVolumePlugins(hostPathConfig)...)
@ -61,18 +67,49 @@ func ProbeRecyclableVolumePlugins(flags VolumeConfigFlags) []volume.VolumePlugin
RecyclerTimeoutIncrement: flags.PersistentVolumeRecyclerIncrementTimeoutNFS,
RecyclerPodTemplate: volume.NewPersistentVolumeRecyclerPodTemplate(),
}
if err := attemptToLoadRecycler(flags.PersistentVolumeRecyclerPodTemplateFilePathNFS, &nfsConfig); err != nil {
if err := AttemptToLoadRecycler(flags.PersistentVolumeRecyclerPodTemplateFilePathNFS, &nfsConfig); err != nil {
glog.Fatalf("Could not create NFS recycler pod from file %s: %+v", flags.PersistentVolumeRecyclerPodTemplateFilePathNFS, err)
}
allPlugins = append(allPlugins, nfs.ProbeVolumePlugins(nfsConfig)...)
allPlugins = append(allPlugins, aws_ebs.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, gce_pd.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, cinder.ProbeVolumePlugins()...)
return allPlugins
}
// attemptToLoadRecycler tries decoding a pod from a filepath for use as a recycler for a volume.
// NewVolumeProvisioner returns a volume provisioner to use when running in a cloud or development environment.
// The beta implementation of provisioning allows 1 implied provisioner per cloud, until we allow configuration of many.
// We explicitly map clouds to volume plugins here which allows us to configure many later without backwards compatibility issues.
// Not all cloudproviders have provisioning capability, which is the reason for the bool in the return to tell the caller to expect one or not.
func NewVolumeProvisioner(cloud cloudprovider.Interface, flags VolumeConfigFlags) (volume.ProvisionableVolumePlugin, error) {
switch {
case cloud == nil && flags.EnableHostPathProvisioning:
return getProvisionablePluginFromVolumePlugins(host_path.ProbeVolumePlugins(volume.VolumeConfig{}))
// case cloud != nil && aws.ProviderName == cloud.ProviderName():
// return getProvisionablePluginFromVolumePlugins(aws_ebs.ProbeVolumePlugins())
// case cloud != nil && gce.ProviderName == cloud.ProviderName():
// return getProvisionablePluginFromVolumePlugins(gce_pd.ProbeVolumePlugins())
// case cloud != nil && openstack.ProviderName == cloud.ProviderName():
// return getProvisionablePluginFromVolumePlugins(cinder.ProbeVolumePlugins())
}
return nil, nil
}
func getProvisionablePluginFromVolumePlugins(plugins []volume.VolumePlugin) (volume.ProvisionableVolumePlugin, error) {
for _, plugin := range plugins {
if provisonablePlugin, ok := plugin.(volume.ProvisionableVolumePlugin); ok {
return provisonablePlugin, nil
}
}
return nil, fmt.Errorf("ProvisionablePlugin expected but not found in %#v: ", plugins)
}
// AttemptToLoadRecycler tries decoding a pod from a filepath for use as a recycler for a volume.
// If successful, this method will set the recycler on the config.
// If unsucessful, an error is returned.
func attemptToLoadRecycler(path string, config *volume.VolumeConfig) error {
// If unsuccessful, an error is returned. Function is exported for reuse downstream.
func AttemptToLoadRecycler(path string, config *volume.VolumeConfig) error {
if path != "" {
recyclerPod, err := io.LoadPodFromFile(path)
if err != nil {

View File

@ -25,7 +25,7 @@ import (
"strconv"
"time"
"k8s.io/kubernetes/cmd/kube-controller-manager/app"
kubecontrollermanager "k8s.io/kubernetes/cmd/kube-controller-manager/app"
"k8s.io/kubernetes/contrib/mesos/pkg/node"
"k8s.io/kubernetes/pkg/api/unversioned"
client "k8s.io/kubernetes/pkg/client/unversioned"
@ -57,14 +57,14 @@ import (
// CMServer is the main context object for the controller manager.
type CMServer struct {
*app.CMServer
*kubecontrollermanager.CMServer
UseHostPortEndpoints bool
}
// NewCMServer creates a new CMServer with a default config.
func NewCMServer() *CMServer {
s := &CMServer{
CMServer: app.NewCMServer(),
CMServer: kubecontrollermanager.NewCMServer(),
}
s.CloudProvider = mesos.ProviderName
s.UseHostPortEndpoints = true
@ -167,14 +167,29 @@ func (s *CMServer) Run(_ []string) error {
namespaceController := namespacecontroller.NewNamespaceController(kubeClient, &unversioned.APIVersions{}, s.NamespaceSyncPeriod)
namespaceController.Run()
volumePlugins := kubecontrollermanager.ProbeRecyclableVolumePlugins(s.VolumeConfigFlags)
provisioner, err := kubecontrollermanager.NewVolumeProvisioner(cloud, s.VolumeConfigFlags)
if err != nil {
glog.Fatal("A Provisioner could not be created, but one was expected. Provisioning will not work. This functionality is considered an early Alpha version.")
}
pvclaimBinder := persistentvolumecontroller.NewPersistentVolumeClaimBinder(kubeClient, s.PVClaimBinderSyncPeriod)
pvclaimBinder.Run()
pvRecycler, err := persistentvolumecontroller.NewPersistentVolumeRecycler(kubeClient, s.PVClaimBinderSyncPeriod, app.ProbeRecyclableVolumePlugins(s.VolumeConfigFlags))
pvRecycler, err := persistentvolumecontroller.NewPersistentVolumeRecycler(kubeClient, s.PVClaimBinderSyncPeriod, kubecontrollermanager.ProbeRecyclableVolumePlugins(s.VolumeConfigFlags), cloud)
if err != nil {
glog.Fatalf("Failed to start persistent volume recycler: %+v", err)
}
pvRecycler.Run()
if provisioner != nil {
pvController, err := persistentvolumecontroller.NewPersistentVolumeProvisionerController(persistentvolumecontroller.NewControllerClient(kubeClient), s.PVClaimBinderSyncPeriod, volumePlugins, provisioner, cloud)
if err != nil {
glog.Fatalf("Failed to start persistent volume provisioner controller: %+v", err)
}
pvController.Run()
}
var rootCA []byte
if s.RootCAFile != "" {

View File

@ -66,6 +66,7 @@ kube-controller-manager
--deleting-pods-burst=10: Number of nodes on which pods are bursty deleted in case of node failure. For more details look into RateLimiter.
--deleting-pods-qps=0.1: Number of nodes per second on which pods are deleted in case of node failure.
--deployment-controller-sync-period=30s: Period for syncing the deployments.
--enable-hostpath-provisioner[=false]: Enable HostPath PV provisioning when running without a cloud provider. This allows testing and development of provisioning features. HostPath provisioning is not supported in any way, won't work in a multi-node cluster, and should not be used for anything other than testing or development.
--google-json-key="": The Google Cloud Platform Service Account JSON Key to use for authentication.
--horizontal-pod-autoscaler-sync-period=30s: The period for syncing the number of pods in horizontal pod autoscaler.
--kube-api-burst=30: Burst to use while talking with kubernetes apiserver
@ -96,7 +97,7 @@ kube-controller-manager
--terminated-pod-gc-threshold=12500: Number of terminated pods that can exist before the terminated pod garbage collector starts deleting terminated pods. If <= 0, the terminated pod garbage collector is disabled.
```
###### Auto generated by spf13/cobra on 30-Nov-2015
###### Auto generated by spf13/cobra on 9-Dec-2015
<!-- BEGIN MUNGE: GENERATED_ANALYTICS -->

View File

@ -0,0 +1,124 @@
<!-- BEGIN MUNGE: UNVERSIONED_WARNING -->
<!-- BEGIN STRIP_FOR_RELEASE -->
<img src="http://kubernetes.io/img/warning.png" alt="WARNING"
width="25" height="25">
<img src="http://kubernetes.io/img/warning.png" alt="WARNING"
width="25" height="25">
<img src="http://kubernetes.io/img/warning.png" alt="WARNING"
width="25" height="25">
<img src="http://kubernetes.io/img/warning.png" alt="WARNING"
width="25" height="25">
<img src="http://kubernetes.io/img/warning.png" alt="WARNING"
width="25" height="25">
<h2>PLEASE NOTE: This document applies to the HEAD of the source tree</h2>
If you are using a released version of Kubernetes, you should
refer to the docs that go with that version.
<strong>
The latest release of this document can be found
[here](http://releases.k8s.io/release-1.1/examples/experimental/persistent-volume-provisioning/README.md).
Documentation for other releases can be found at
[releases.k8s.io](http://releases.k8s.io).
</strong>
--
<!-- END STRIP_FOR_RELEASE -->
<!-- END MUNGE: UNVERSIONED_WARNING -->
## Persistent Volume Provisioning
This example shows how to use experimental persistent volume provisioning.
### Pre-requisites
This example assumes that you have an understanding of Kubernetes administration and can modify the
scripts that launch kube-controller-manager.
### Admin Configuration
No configuration is required by the admin! 3 cloud providers will be provided in the alpha version
of this feature: EBS, GCE, and Cinder.
When Kubernetes is running in one of those clouds, there will be an implied provisioner.
There is no provisioner when running outside of any of those 3 cloud providers.
A fourth provisioner is included for testing and development only. It creates HostPath volumes,
which will never work outside of a single node cluster. It is not supported in any way except for
local for testing and development.
### User provisioning requests
Users request dynamically provisioned storage by including a storage class in their `PersistentVolumeClaim`.
The annotation `volume.alpha.kubernetes.io/storage-class` is used to access this experimental feature.
In the future, admins will be able to define many storage classes.
The storage class may remain in an annotation or become a field on the claim itself.
> The value of the storage-class annotation does not matter in the alpha version of this feature. There is
a single implied provisioner per cloud (which creates 1 kind of volume in the provider). The full version of the feature
will require that this value matches what is configured by the administrator.
```
{
"kind": "PersistentVolumeClaim",
"apiVersion": "v1",
"metadata": {
"name": "claim1",
"annotations": {
"volume.alpha.kubernetes.io/storage-class": "foo"
}
},
"spec": {
"accessModes": [
"ReadWriteOnce"
],
"resources": {
"requests": {
"storage": "3Gi"
}
}
}
}
```
### Sample output
This example uses HostPath but any provisioner would follow the same flow.
First we note there are no Persistent Volumes in the cluster. After creating a claim, we see a new PV is created
and automatically bound to the claim requesting storage.
```
$ kubectl get pv
$ kubectl create -f examples/experimental/persistent-volume-provisioning/claim1.json
I1012 13:07:57.666759 22875 decoder.go:141] decoding stream as JSON
persistentvolumeclaim "claim1" created
$ kubectl get pv
NAME LABELS CAPACITY ACCESSMODES STATUS CLAIM REASON AGE
pv-hostpath-r6z5o createdby=hostpath-dynamic-provisioner 3Gi RWO Bound default/claim1 2s
$ kubectl get pvc
NAME LABELS STATUS VOLUME CAPACITY ACCESSMODES AGE
claim1 <none> Bound pv-hostpath-r6z5o 3Gi RWO 7s
# delete the claim to release the volume
$ kubectl delete pvc claim1
persistentvolumeclaim "claim1" deleted
# the volume is deleted in response to being release of its claim
$ kubectl get pv
```
<!-- BEGIN MUNGE: GENERATED_ANALYTICS -->
[![Analytics](https://kubernetes-site.appspot.com/UA-36037335-10/GitHub/examples/experimental/persistent-volume-provisioning/README.md?pixel)]()
<!-- END MUNGE: GENERATED_ANALYTICS -->

View File

@ -0,0 +1,20 @@
{
"kind": "PersistentVolumeClaim",
"apiVersion": "v1",
"metadata": {
"name": "claim1",
"annotations": {
"volume.alpha.kubernetes.io/storage-class": "foo"
}
},
"spec": {
"accessModes": [
"ReadWriteOnce"
],
"resources": {
"requests": {
"storage": "3Gi"
}
}
}
}

View File

@ -0,0 +1,20 @@
{
"kind": "PersistentVolumeClaim",
"apiVersion": "v1",
"metadata": {
"name": "claim2",
"annotations": {
"volume.alpha.kubernetes.io/storage-class": "bar"
}
},
"spec": {
"accessModes": [
"ReadWriteOnce"
],
"resources": {
"requests": {
"storage": "3Gi"
}
}
}
}

View File

@ -91,6 +91,7 @@ RKT_PATH=${RKT_PATH:-""}
RKT_STAGE1_IMAGE=${RKT_STAGE1_IMAGE:-""}
CHAOS_CHANCE=${CHAOS_CHANCE:-0.0}
CPU_CFS_QUOTA=${CPU_CFS_QUOTA:-false}
ENABLE_HOSTPATH_PROVISIONER=${ENABLE_HOSTPATH_PROVISIONER:-"false"}
function test_apiserver_off {
# For the common local scenario, fail fast if server is already running.
@ -250,6 +251,7 @@ function start_controller_manager {
--v=${LOG_LEVEL} \
--service-account-private-key-file="${SERVICE_ACCOUNT_KEY}" \
--root-ca-file="${ROOT_CA_FILE}" \
--enable-hostpath-provisioner="${ENABLE_HOSTPATH_PROVISIONER}" \
--master="${API_HOST}:${API_PORT}" >"${CTLRMGR_LOG}" 2>&1 &
CTLRMGR_PID=$!
}

View File

@ -78,6 +78,7 @@ e2e-output-dir
e2e-verify-service-account
enable-debugging-handlers
enable-server
enable-hostpath-provisioner
etcd-config
etcd-prefix
etcd-server

View File

@ -86,8 +86,7 @@ func NewPersistentVolumeClaimBinder(kubeClient client.Interface, syncPeriod time
framework.ResourceEventHandlerFuncs{
AddFunc: binder.addClaim,
UpdateFunc: binder.updateClaim,
// no DeleteFunc needed. a claim requires no clean-up.
// syncVolume handles the missing claim
DeleteFunc: binder.deleteClaim,
},
)
@ -145,6 +144,33 @@ func (binder *PersistentVolumeClaimBinder) updateClaim(oldObj, newObj interface{
}
}
func (binder *PersistentVolumeClaimBinder) deleteClaim(obj interface{}) {
binder.lock.Lock()
defer binder.lock.Unlock()
var volume *api.PersistentVolume
if pvc, ok := obj.(*api.PersistentVolumeClaim); ok {
if pvObj, exists, _ := binder.volumeIndex.GetByKey(pvc.Spec.VolumeName); exists {
if pv, ok := pvObj.(*api.PersistentVolume); ok {
volume = pv
}
}
}
if unk, ok := obj.(cache.DeletedFinalStateUnknown); ok && unk.Obj != nil {
if pv, ok := unk.Obj.(*api.PersistentVolume); ok {
volume = pv
}
}
// sync the volume when its claim is deleted. Explicitly sync'ing the volume here in response to
// claim deletion prevents the volume from waiting until the next sync period for its Release.
if volume != nil {
err := syncVolume(binder.volumeIndex, binder.client, volume)
if err != nil {
glog.Errorf("PVClaimBinder could not update volume %s from deleteClaim handler: %+v", volume.Name, err)
}
}
}
func syncVolume(volumeIndex *persistentVolumeOrderedIndex, binderClient binderClient, volume *api.PersistentVolume) (err error) {
glog.V(5).Infof("Synchronizing PersistentVolume[%s], current phase: %s\n", volume.Name, volume.Status.Phase)
@ -166,6 +192,11 @@ func syncVolume(volumeIndex *persistentVolumeOrderedIndex, binderClient binderCl
volumeIndex.Add(volume)
}
if isBeingProvisioned(volume) {
glog.V(4).Infof("Skipping PersistentVolume[%s], waiting for provisioning to finish", volume.Name)
return nil
}
switch currentPhase {
case api.VolumePending:
@ -275,38 +306,46 @@ func syncClaim(volumeIndex *persistentVolumeOrderedIndex, binderClient binderCli
switch claim.Status.Phase {
case api.ClaimPending:
// claims w/ a storage-class annotation for provisioning with *only* match volumes with a ClaimRef of the claim.
volume, err := volumeIndex.findBestMatchForClaim(claim)
if err != nil {
return err
}
if volume == nil {
glog.V(5).Infof("A volume match does not exist for persistent claim: %s", claim.Name)
return nil
}
// create a reference to the claim and assign it to the volume being bound.
// the volume is a pointer and assigning the reference fixes a race condition where another
// claim might match this volume but before the claimRef is persistent in the next case statement
if isBeingProvisioned(volume) {
glog.V(5).Infof("PersistentVolume[%s] for PersistentVolumeClaim[%s/%s] is still being provisioned.", volume.Name, claim.Namespace, claim.Name)
return nil
}
claimRef, err := api.GetReference(claim)
if err != nil {
return fmt.Errorf("Unexpected error getting claim reference: %v\n", err)
}
// make a binding reference to the claim and ensure to update the local index to prevent dupe bindings
clone, err := conversion.NewCloner().DeepCopy(volume)
if err != nil {
return fmt.Errorf("Error cloning pv: %v", err)
}
volumeClone, ok := clone.(*api.PersistentVolume)
if !ok {
return fmt.Errorf("Unexpected pv cast error : %v\n", volumeClone)
}
volumeClone.Spec.ClaimRef = claimRef
if updatedVolume, err := binderClient.UpdatePersistentVolume(volumeClone); err != nil {
return fmt.Errorf("Unexpected error saving PersistentVolume.Status: %+v", err)
} else {
volume = updatedVolume
volumeIndex.Update(updatedVolume)
// Make a binding reference to the claim by persisting claimRef on the volume.
// The local cache must be updated with the new bind to prevent subsequent
// claims from binding to the volume.
if volume.Spec.ClaimRef == nil {
clone, err := conversion.NewCloner().DeepCopy(volume)
if err != nil {
return fmt.Errorf("Error cloning pv: %v", err)
}
volumeClone, ok := clone.(*api.PersistentVolume)
if !ok {
return fmt.Errorf("Unexpected pv cast error : %v\n", volumeClone)
}
volumeClone.Spec.ClaimRef = claimRef
if updatedVolume, err := binderClient.UpdatePersistentVolume(volumeClone); err != nil {
return fmt.Errorf("Unexpected error saving PersistentVolume.Status: %+v", err)
} else {
volume = updatedVolume
volumeIndex.Update(updatedVolume)
}
}
// the bind is persisted on the volume above and will always match the claim in a search.
@ -341,6 +380,14 @@ func syncClaim(volumeIndex *persistentVolumeOrderedIndex, binderClient binderCli
return nil
}
func isBeingProvisioned(volume *api.PersistentVolume) bool {
value, found := volume.Annotations[pvProvisioningRequiredAnnotationKey]
if found && value != pvProvisioningCompletedAnnotationValue {
return true
}
return false
}
// Run starts all of this binder's control loops
func (controller *PersistentVolumeClaimBinder) Run() {
glog.V(5).Infof("Starting PersistentVolumeClaimBinder\n")

View File

@ -0,0 +1,498 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package persistentvolume
import (
"fmt"
"sync"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/conversion"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/io"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog"
)
// PersistentVolumeProvisionerController reconciles the state of all PersistentVolumes and PersistentVolumeClaims.
type PersistentVolumeProvisionerController struct {
volumeController *framework.Controller
volumeStore cache.Store
claimController *framework.Controller
claimStore cache.Store
client controllerClient
cloud cloudprovider.Interface
provisioner volume.ProvisionableVolumePlugin
pluginMgr volume.VolumePluginMgr
stopChannels map[string]chan struct{}
mutex sync.RWMutex
}
// constant name values for the controllers stopChannels map.
// the controller uses these for graceful shutdown
const volumesStopChannel = "volumes"
const claimsStopChannel = "claims"
// NewPersistentVolumeProvisionerController creates a new PersistentVolumeProvisionerController
func NewPersistentVolumeProvisionerController(client controllerClient, syncPeriod time.Duration, plugins []volume.VolumePlugin, provisioner volume.ProvisionableVolumePlugin, cloud cloudprovider.Interface) (*PersistentVolumeProvisionerController, error) {
controller := &PersistentVolumeProvisionerController{
client: client,
cloud: cloud,
provisioner: provisioner,
}
if err := controller.pluginMgr.InitPlugins(plugins, controller); err != nil {
return nil, fmt.Errorf("Could not initialize volume plugins for PersistentVolumeProvisionerController: %+v", err)
}
glog.V(5).Infof("Initializing provisioner: %s", controller.provisioner.Name())
controller.provisioner.Init(controller)
controller.volumeStore, controller.volumeController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options unversioned.ListOptions) (runtime.Object, error) {
return client.ListPersistentVolumes(options)
},
WatchFunc: func(options unversioned.ListOptions) (watch.Interface, error) {
return client.WatchPersistentVolumes(options)
},
},
&api.PersistentVolume{},
syncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: controller.handleAddVolume,
UpdateFunc: controller.handleUpdateVolume,
// delete handler not needed in this controller.
// volume deletion is handled by the recycler controller
},
)
controller.claimStore, controller.claimController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options unversioned.ListOptions) (runtime.Object, error) {
return client.ListPersistentVolumeClaims(api.NamespaceAll, options)
},
WatchFunc: func(options unversioned.ListOptions) (watch.Interface, error) {
return client.WatchPersistentVolumeClaims(api.NamespaceAll, options)
},
},
&api.PersistentVolumeClaim{},
syncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: controller.handleAddClaim,
UpdateFunc: controller.handleUpdateClaim,
// delete handler not needed.
// normal recycling applies when a claim is deleted.
// recycling is handled by the binding controller.
},
)
return controller, nil
}
func (controller *PersistentVolumeProvisionerController) handleAddVolume(obj interface{}) {
controller.mutex.Lock()
defer controller.mutex.Unlock()
cachedPv, _, _ := controller.volumeStore.Get(obj)
if pv, ok := cachedPv.(*api.PersistentVolume); ok {
err := controller.reconcileVolume(pv)
if err != nil {
glog.Errorf("Error reconciling volume %s: %+v", pv.Name, err)
}
}
}
func (controller *PersistentVolumeProvisionerController) handleUpdateVolume(oldObj, newObj interface{}) {
// The flow for Update is the same as Add.
// A volume is only provisioned if not done so already.
controller.handleAddVolume(newObj)
}
func (controller *PersistentVolumeProvisionerController) handleAddClaim(obj interface{}) {
controller.mutex.Lock()
defer controller.mutex.Unlock()
cachedPvc, exists, _ := controller.claimStore.Get(obj)
if !exists {
glog.Errorf("PersistentVolumeClaim does not exist in the local cache: %+v", obj)
return
}
if pvc, ok := cachedPvc.(*api.PersistentVolumeClaim); ok {
err := controller.reconcileClaim(pvc)
if err != nil {
glog.Errorf("Error encoutered reconciling claim %s: %+v", pvc.Name, err)
}
}
}
func (controller *PersistentVolumeProvisionerController) handleUpdateClaim(oldObj, newObj interface{}) {
// The flow for Update is the same as Add.
// A volume is only provisioned for a claim if not done so already.
controller.handleAddClaim(newObj)
}
func (controller *PersistentVolumeProvisionerController) reconcileClaim(claim *api.PersistentVolumeClaim) error {
if controller.provisioner == nil {
return fmt.Errorf("No provisioner configured for controller")
}
// no provisioning requested, return Pending. Claim may be pending indefinitely without a match.
if !keyExists(qosProvisioningKey, claim.Annotations) {
glog.V(5).Infof("PersistentVolumeClaim[%s] no provisioning required", claim.Name)
return nil
}
if len(claim.Spec.VolumeName) != 0 {
glog.V(5).Infof("PersistentVolumeClaim[%s] already bound. No provisioning required", claim.Name)
return nil
}
if isAnnotationMatch(pvProvisioningRequiredAnnotationKey, pvProvisioningCompletedAnnotationValue, claim.Annotations) {
glog.V(5).Infof("PersistentVolumeClaim[%s] is already provisioned.", claim.Name)
return nil
}
glog.V(5).Infof("PersistentVolumeClaim[%s] provisioning", claim.Name)
provisioner, err := newProvisioner(controller.provisioner, claim)
if err != nil {
return fmt.Errorf("Unexpected error getting new provisioner for claim %s: %v\n", claim.Name, err)
}
newVolume, err := provisioner.NewPersistentVolumeTemplate()
if err != nil {
return fmt.Errorf("Unexpected error getting new volume template for claim %s: %v\n", claim.Name, err)
}
claimRef, err := api.GetReference(claim)
if err != nil {
return fmt.Errorf("Unexpected error getting claim reference for %s: %v\n", claim.Name, err)
}
storageClass, _ := claim.Annotations[qosProvisioningKey]
// the creation of this volume is the bind to the claim.
// The claim will match the volume during the next sync period when the volume is in the local cache
newVolume.Spec.ClaimRef = claimRef
newVolume.Annotations[pvProvisioningRequiredAnnotationKey] = "true"
newVolume.Annotations[qosProvisioningKey] = storageClass
newVolume, err = controller.client.CreatePersistentVolume(newVolume)
glog.V(5).Infof("Unprovisioned PersistentVolume[%s] created for PVC[%s], which will be fulfilled in the storage provider", newVolume.Name, claim.Name)
if err != nil {
return fmt.Errorf("PersistentVolumeClaim[%s] failed provisioning: %+v", claim.Name, err)
}
claim.Annotations[pvProvisioningRequiredAnnotationKey] = pvProvisioningCompletedAnnotationValue
_, err = controller.client.UpdatePersistentVolumeClaim(claim)
if err != nil {
glog.Error("error updating persistent volume claim: %v", err)
}
return nil
}
func (controller *PersistentVolumeProvisionerController) reconcileVolume(pv *api.PersistentVolume) error {
glog.V(5).Infof("PersistentVolume[%s] reconciling", pv.Name)
if pv.Spec.ClaimRef == nil {
glog.V(5).Infof("PersistentVolume[%s] is not bound to a claim. No provisioning required", pv.Name)
return nil
}
// TODO: fix this leaky abstraction. Had to make our own store key because ClaimRef fails the default keyfunc (no Meta on object).
obj, exists, _ := controller.claimStore.GetByKey(fmt.Sprintf("%s/%s", pv.Spec.ClaimRef.Namespace, pv.Spec.ClaimRef.Name))
if !exists {
return fmt.Errorf("PersistentVolumeClaim[%s/%s] not found in local cache", pv.Spec.ClaimRef.Namespace, pv.Spec.ClaimRef.Name)
}
claim, ok := obj.(*api.PersistentVolumeClaim)
if !ok {
return fmt.Errorf("PersistentVolumeClaim expected, but got %v", obj)
}
// no provisioning required, volume is ready and Bound
if !keyExists(pvProvisioningRequiredAnnotationKey, pv.Annotations) {
glog.V(5).Infof("PersistentVolume[%s] does not require provisioning", pv.Name)
return nil
}
// provisioning is completed, volume is ready.
if isProvisioningComplete(pv) {
glog.V(5).Infof("PersistentVolume[%s] is bound and provisioning is complete", pv.Name)
if pv.Spec.ClaimRef.Namespace != claim.Namespace || pv.Spec.ClaimRef.Name != claim.Name {
return fmt.Errorf("pre-bind mismatch - expected %s but found %s/%s", claimToClaimKey(claim), pv.Spec.ClaimRef.Namespace, pv.Spec.ClaimRef.Name)
}
return nil
}
// provisioning is incomplete. Attempt to provision the volume.
glog.V(5).Infof("PersistentVolume[%s] provisioning in progress", pv.Name)
err := provisionVolume(pv, controller)
if err != nil {
return fmt.Errorf("Error provisioning PersistentVolume[%s]: %v", err)
}
return nil
}
// provisionVolume provisions a volume that has been created in the cluster but not yet fulfilled by
// the storage provider.
func provisionVolume(pv *api.PersistentVolume, controller *PersistentVolumeProvisionerController) error {
if isProvisioningComplete(pv) {
return fmt.Errorf("PersistentVolume[%s] is already provisioned", pv.Name)
}
if _, exists := pv.Annotations[qosProvisioningKey]; !exists {
return fmt.Errorf("PersistentVolume[%s] does not contain a provisioning request. Provisioning not required.", pv.Name)
}
if controller.provisioner == nil {
return fmt.Errorf("No provisioner found for volume: %s", pv.Name)
}
// Find the claim in local cache
obj, exists, _ := controller.claimStore.GetByKey(fmt.Sprintf("%s/%s", pv.Spec.ClaimRef.Namespace, pv.Spec.ClaimRef.Name))
if !exists {
return fmt.Errorf("Could not find PersistentVolumeClaim[%s/%s] in local cache", pv.Spec.ClaimRef.Name, pv.Name)
}
claim := obj.(*api.PersistentVolumeClaim)
provisioner, _ := newProvisioner(controller.provisioner, claim)
err := provisioner.Provision(pv)
if err != nil {
glog.Errorf("Could not provision %s", pv.Name)
pv.Status.Phase = api.VolumeFailed
pv.Status.Message = err.Error()
if pv, apiErr := controller.client.UpdatePersistentVolumeStatus(pv); apiErr != nil {
return fmt.Errorf("PersistentVolume[%s] failed provisioning and also failed status update: %v - %v", pv.Name, err, apiErr)
}
return fmt.Errorf("PersistentVolume[%s] failed provisioning : %v", pv.Name, err, err)
}
clone, err := conversion.NewCloner().DeepCopy(pv)
volumeClone, ok := clone.(*api.PersistentVolume)
if !ok {
return fmt.Errorf("Unexpected pv cast error : %v\n", volumeClone)
}
volumeClone.Annotations[pvProvisioningRequiredAnnotationKey] = pvProvisioningCompletedAnnotationValue
pv, err = controller.client.UpdatePersistentVolume(volumeClone)
if err != nil {
// TODO: https://github.com/kubernetes/kubernetes/issues/14443
// the volume was created in the infrastructure and likely has a PV name on it,
// but we failed to save the annotation that marks the volume as provisioned.
return fmt.Errorf("Error updating PersistentVolume[%s] with provisioning completed annotation. There is a potential for dupes and orphans.", volumeClone.Name)
}
return nil
}
// Run starts all of this controller's control loops
func (controller *PersistentVolumeProvisionerController) Run() {
glog.V(5).Infof("Starting PersistentVolumeProvisionerController\n")
if controller.stopChannels == nil {
controller.stopChannels = make(map[string]chan struct{})
}
if _, exists := controller.stopChannels[volumesStopChannel]; !exists {
controller.stopChannels[volumesStopChannel] = make(chan struct{})
go controller.volumeController.Run(controller.stopChannels[volumesStopChannel])
}
if _, exists := controller.stopChannels[claimsStopChannel]; !exists {
controller.stopChannels[claimsStopChannel] = make(chan struct{})
go controller.claimController.Run(controller.stopChannels[claimsStopChannel])
}
}
// Stop gracefully shuts down this controller
func (controller *PersistentVolumeProvisionerController) Stop() {
glog.V(5).Infof("Stopping PersistentVolumeProvisionerController\n")
for name, stopChan := range controller.stopChannels {
close(stopChan)
delete(controller.stopChannels, name)
}
}
func newProvisioner(plugin volume.ProvisionableVolumePlugin, claim *api.PersistentVolumeClaim) (volume.Provisioner, error) {
volumeOptions := volume.VolumeOptions{
Capacity: claim.Spec.Resources.Requests[api.ResourceName(api.ResourceStorage)],
AccessModes: claim.Spec.AccessModes,
PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimDelete,
}
provisioner, err := plugin.NewProvisioner(volumeOptions)
return provisioner, err
}
// controllerClient abstracts access to PVs and PVCs. Easy to mock for testing and wrap for real client.
type controllerClient interface {
CreatePersistentVolume(pv *api.PersistentVolume) (*api.PersistentVolume, error)
ListPersistentVolumes(options unversioned.ListOptions) (*api.PersistentVolumeList, error)
WatchPersistentVolumes(options unversioned.ListOptions) (watch.Interface, error)
GetPersistentVolume(name string) (*api.PersistentVolume, error)
UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error)
DeletePersistentVolume(volume *api.PersistentVolume) error
UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error)
GetPersistentVolumeClaim(namespace, name string) (*api.PersistentVolumeClaim, error)
ListPersistentVolumeClaims(namespace string, options unversioned.ListOptions) (*api.PersistentVolumeClaimList, error)
WatchPersistentVolumeClaims(namespace string, options unversioned.ListOptions) (watch.Interface, error)
UpdatePersistentVolumeClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error)
UpdatePersistentVolumeClaimStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error)
// provided to give VolumeHost and plugins access to the kube client
GetKubeClient() client.Interface
}
func NewControllerClient(c client.Interface) controllerClient {
return &realControllerClient{c}
}
var _ controllerClient = &realControllerClient{}
type realControllerClient struct {
client client.Interface
}
func (c *realControllerClient) GetPersistentVolume(name string) (*api.PersistentVolume, error) {
return c.client.PersistentVolumes().Get(name)
}
func (c *realControllerClient) ListPersistentVolumes(options unversioned.ListOptions) (*api.PersistentVolumeList, error) {
return c.client.PersistentVolumes().List(options)
}
func (c *realControllerClient) WatchPersistentVolumes(options unversioned.ListOptions) (watch.Interface, error) {
return c.client.PersistentVolumes().Watch(options)
}
func (c *realControllerClient) CreatePersistentVolume(pv *api.PersistentVolume) (*api.PersistentVolume, error) {
return c.client.PersistentVolumes().Create(pv)
}
func (c *realControllerClient) UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
return c.client.PersistentVolumes().Update(volume)
}
func (c *realControllerClient) DeletePersistentVolume(volume *api.PersistentVolume) error {
return c.client.PersistentVolumes().Delete(volume.Name)
}
func (c *realControllerClient) UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
return c.client.PersistentVolumes().UpdateStatus(volume)
}
func (c *realControllerClient) GetPersistentVolumeClaim(namespace, name string) (*api.PersistentVolumeClaim, error) {
return c.client.PersistentVolumeClaims(namespace).Get(name)
}
func (c *realControllerClient) ListPersistentVolumeClaims(namespace string, options unversioned.ListOptions) (*api.PersistentVolumeClaimList, error) {
return c.client.PersistentVolumeClaims(namespace).List(options)
}
func (c *realControllerClient) WatchPersistentVolumeClaims(namespace string, options unversioned.ListOptions) (watch.Interface, error) {
return c.client.PersistentVolumeClaims(namespace).Watch(options)
}
func (c *realControllerClient) UpdatePersistentVolumeClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) {
return c.client.PersistentVolumeClaims(claim.Namespace).Update(claim)
}
func (c *realControllerClient) UpdatePersistentVolumeClaimStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) {
return c.client.PersistentVolumeClaims(claim.Namespace).UpdateStatus(claim)
}
func (c *realControllerClient) GetKubeClient() client.Interface {
return c.client
}
func keyExists(key string, haystack map[string]string) bool {
_, exists := haystack[key]
return exists
}
func isProvisioningComplete(pv *api.PersistentVolume) bool {
return isAnnotationMatch(pvProvisioningRequiredAnnotationKey, pvProvisioningCompletedAnnotationValue, pv.Annotations)
}
func isAnnotationMatch(key, needle string, haystack map[string]string) bool {
value, exists := haystack[key]
if !exists {
return false
}
return value == needle
}
func isRecyclable(policy api.PersistentVolumeReclaimPolicy) bool {
return policy == api.PersistentVolumeReclaimDelete || policy == api.PersistentVolumeReclaimRecycle
}
// VolumeHost implementation
// PersistentVolumeRecycler is host to the volume plugins, but does not actually mount any volumes.
// Because no mounting is performed, most of the VolumeHost methods are not implemented.
func (c *PersistentVolumeProvisionerController) GetPluginDir(podUID string) string {
return ""
}
func (c *PersistentVolumeProvisionerController) GetPodVolumeDir(podUID types.UID, pluginName, volumeName string) string {
return ""
}
func (c *PersistentVolumeProvisionerController) GetPodPluginDir(podUID types.UID, pluginName string) string {
return ""
}
func (c *PersistentVolumeProvisionerController) GetKubeClient() client.Interface {
return c.client.GetKubeClient()
}
func (c *PersistentVolumeProvisionerController) NewWrapperBuilder(spec *volume.Spec, pod *api.Pod, opts volume.VolumeOptions) (volume.Builder, error) {
return nil, fmt.Errorf("NewWrapperBuilder not supported by PVClaimBinder's VolumeHost implementation")
}
func (c *PersistentVolumeProvisionerController) NewWrapperCleaner(spec *volume.Spec, podUID types.UID) (volume.Cleaner, error) {
return nil, fmt.Errorf("NewWrapperCleaner not supported by PVClaimBinder's VolumeHost implementation")
}
func (c *PersistentVolumeProvisionerController) GetCloudProvider() cloudprovider.Interface {
return c.cloud
}
func (c *PersistentVolumeProvisionerController) GetMounter() mount.Interface {
return nil
}
func (c *PersistentVolumeProvisionerController) GetWriter() io.Writer {
return nil
}
func (c *PersistentVolumeProvisionerController) GetHostName() string {
return ""
}
const (
// these pair of constants are used by the provisioner.
// The key is a kube namespaced key that denotes a volume requires provisioning.
// The value is set only when provisioning is completed. Any other value will tell the provisioner
// that provisioning has not yet occurred.
pvProvisioningRequiredAnnotationKey = "volume.experimental.kubernetes.io/provisioning-required"
pvProvisioningCompletedAnnotationValue = "volume.experimental.kubernetes.io/provisioning-completed"
)

View File

@ -0,0 +1,240 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package persistentvolume
import (
"fmt"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned"
client "k8s.io/kubernetes/pkg/client/unversioned"
fake_cloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/watch"
)
func TestProvisionerRunStop(t *testing.T) {
controller, _ := makeTestController()
if len(controller.stopChannels) != 0 {
t.Errorf("Non-running provisioner should not have any stopChannels. Got %v", len(controller.stopChannels))
}
controller.Run()
if len(controller.stopChannels) != 2 {
t.Errorf("Running provisioner should have exactly 2 stopChannels. Got %v", len(controller.stopChannels))
}
controller.Stop()
if len(controller.stopChannels) != 0 {
t.Errorf("Non-running provisioner should not have any stopChannels. Got %v", len(controller.stopChannels))
}
}
func makeTestVolume() *api.PersistentVolume {
return &api.PersistentVolume{
ObjectMeta: api.ObjectMeta{
Annotations: map[string]string{},
Name: "pv01",
},
Spec: api.PersistentVolumeSpec{
PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimDelete,
AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce},
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("10Gi"),
},
PersistentVolumeSource: api.PersistentVolumeSource{
HostPath: &api.HostPathVolumeSource{
Path: "/tmp/data01",
},
},
},
}
}
func makeTestClaim() *api.PersistentVolumeClaim {
return &api.PersistentVolumeClaim{
ObjectMeta: api.ObjectMeta{
Annotations: map[string]string{},
Name: "claim01",
Namespace: "ns",
SelfLink: testapi.Default.SelfLink("pvc", ""),
},
Spec: api.PersistentVolumeClaimSpec{
AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce},
Resources: api.ResourceRequirements{
Requests: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("8G"),
},
},
},
}
}
func makeTestController() (*PersistentVolumeProvisionerController, *mockControllerClient) {
mockClient := &mockControllerClient{}
mockVolumePlugin := &volume.FakeVolumePlugin{}
controller, _ := NewPersistentVolumeProvisionerController(mockClient, 1*time.Second, nil, mockVolumePlugin, &fake_cloud.FakeCloud{})
return controller, mockClient
}
func TestReconcileClaim(t *testing.T) {
controller, mockClient := makeTestController()
pvc := makeTestClaim()
// watch would have added the claim to the store
controller.claimStore.Add(pvc)
err := controller.reconcileClaim(pvc)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
// non-provisionable PVC should not have created a volume on reconciliation
if mockClient.volume != nil {
t.Error("Unexpected volume found in mock client. Expected nil")
}
pvc.Annotations[qosProvisioningKey] = "foo"
err = controller.reconcileClaim(pvc)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
// PVC requesting provisioning should have a PV created for it
if mockClient.volume == nil {
t.Error("Expected to find bound volume but got nil")
}
if mockClient.volume.Spec.ClaimRef.Name != pvc.Name {
t.Errorf("Expected PV to be bound to %s but got %s", mockClient.volume.Spec.ClaimRef.Name, pvc.Name)
}
}
func TestReconcileVolume(t *testing.T) {
controller, mockClient := makeTestController()
pv := makeTestVolume()
pvc := makeTestClaim()
err := controller.reconcileVolume(pv)
if err != nil {
t.Errorf("Unexpected error %v", err)
}
// watch adds claim to the store.
// we need to add it to our mock client to mimic normal Get call
controller.claimStore.Add(pvc)
mockClient.claim = pvc
// pretend the claim and volume are bound, no provisioning required
claimRef, _ := api.GetReference(pvc)
pv.Spec.ClaimRef = claimRef
err = controller.reconcileVolume(pv)
if err != nil {
t.Errorf("Unexpected error %v", err)
}
pv.Annotations[pvProvisioningRequiredAnnotationKey] = "!pvProvisioningCompleted"
pv.Annotations[qosProvisioningKey] = "foo"
err = controller.reconcileVolume(pv)
if !isAnnotationMatch(pvProvisioningRequiredAnnotationKey, pvProvisioningCompletedAnnotationValue, mockClient.volume.Annotations) {
t.Errorf("Expected %s but got %s", pvProvisioningRequiredAnnotationKey, mockClient.volume.Annotations[pvProvisioningRequiredAnnotationKey])
}
}
var _ controllerClient = &mockControllerClient{}
type mockControllerClient struct {
volume *api.PersistentVolume
claim *api.PersistentVolumeClaim
}
func (c *mockControllerClient) GetPersistentVolume(name string) (*api.PersistentVolume, error) {
return c.volume, nil
}
func (c *mockControllerClient) CreatePersistentVolume(pv *api.PersistentVolume) (*api.PersistentVolume, error) {
if pv.GenerateName != "" && pv.Name == "" {
pv.Name = fmt.Sprintf(pv.GenerateName, util.NewUUID())
}
c.volume = pv
return c.volume, nil
}
func (c *mockControllerClient) ListPersistentVolumes(options unversioned.ListOptions) (*api.PersistentVolumeList, error) {
return &api.PersistentVolumeList{
Items: []api.PersistentVolume{*c.volume},
}, nil
}
func (c *mockControllerClient) WatchPersistentVolumes(options unversioned.ListOptions) (watch.Interface, error) {
return watch.NewFake(), nil
}
func (c *mockControllerClient) UpdatePersistentVolume(pv *api.PersistentVolume) (*api.PersistentVolume, error) {
return c.CreatePersistentVolume(pv)
}
func (c *mockControllerClient) DeletePersistentVolume(volume *api.PersistentVolume) error {
c.volume = nil
return nil
}
func (c *mockControllerClient) UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
return volume, nil
}
func (c *mockControllerClient) GetPersistentVolumeClaim(namespace, name string) (*api.PersistentVolumeClaim, error) {
if c.claim != nil {
return c.claim, nil
} else {
return nil, errors.NewNotFound("persistentVolume", name)
}
}
func (c *mockControllerClient) ListPersistentVolumeClaims(namespace string, options unversioned.ListOptions) (*api.PersistentVolumeClaimList, error) {
return &api.PersistentVolumeClaimList{
Items: []api.PersistentVolumeClaim{*c.claim},
}, nil
}
func (c *mockControllerClient) WatchPersistentVolumeClaims(namespace string, options unversioned.ListOptions) (watch.Interface, error) {
return watch.NewFake(), nil
}
func (c *mockControllerClient) UpdatePersistentVolumeClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) {
c.claim = claim
return c.claim, nil
}
func (c *mockControllerClient) UpdatePersistentVolumeClaimStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) {
return claim, nil
}
func (c *mockControllerClient) GetKubeClient() client.Interface {
return nil
}

View File

@ -46,14 +46,16 @@ type PersistentVolumeRecycler struct {
client recyclerClient
kubeClient client.Interface
pluginMgr volume.VolumePluginMgr
cloud cloudprovider.Interface
}
// PersistentVolumeRecycler creates a new PersistentVolumeRecycler
func NewPersistentVolumeRecycler(kubeClient client.Interface, syncPeriod time.Duration, plugins []volume.VolumePlugin) (*PersistentVolumeRecycler, error) {
func NewPersistentVolumeRecycler(kubeClient client.Interface, syncPeriod time.Duration, plugins []volume.VolumePlugin, cloud cloudprovider.Interface) (*PersistentVolumeRecycler, error) {
recyclerClient := NewRecyclerClient(kubeClient)
recycler := &PersistentVolumeRecycler{
client: recyclerClient,
kubeClient: kubeClient,
cloud: cloud,
}
if err := recycler.pluginMgr.InitPlugins(plugins, recycler); err != nil {
@ -283,7 +285,7 @@ func (f *PersistentVolumeRecycler) NewWrapperCleaner(spec *volume.Spec, podUID t
}
func (f *PersistentVolumeRecycler) GetCloudProvider() cloudprovider.Interface {
return nil
return f.cloud
}
func (f *PersistentVolumeRecycler) GetMounter() mount.Interface {

View File

@ -21,15 +21,15 @@ import (
"sort"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/client/cache"
)
const (
// A PV created specifically for one claim must contain this annotation in order to bind to the claim.
// The value must be the namespace and name of the claim being bound to (i.e, claim.Namespace/claim.Name)
// This is an experimental feature and likely to change in the future.
createdForKey = "volume.extensions.kubernetes.io/provisioned-for"
// A PVClaim can request a quality of service tier by adding this annotation. The value of the annotation
// is arbitrary. The values are pre-defined by a cluster admin and known to users when requesting a QoS.
// For example tiers might be gold, silver, and tin and the admin configures what that means for each volume plugin that can provision a volume.
// Values in the alpha version of this feature are not meaningful, but will be in the full version of this feature.
qosProvisioningKey = "volume.alpha.kubernetes.io/storage-class"
)
// persistentVolumeOrderedIndex is a cache.Store that keeps persistent volumes indexed by AccessModes and ordered by storage capacity.
@ -80,10 +80,7 @@ func (pvIndex *persistentVolumeOrderedIndex) ListByAccessModes(modes []api.Persi
type matchPredicate func(compareThis, toThis *api.PersistentVolume) bool
// find returns the nearest PV from the ordered list or nil if a match is not found
func (pvIndex *persistentVolumeOrderedIndex) find(searchPV *api.PersistentVolume, matchPredicate matchPredicate) (*api.PersistentVolume, error) {
// the 'searchPV' argument is a synthetic PV with capacity and accessmodes set according to the user's PersistentVolumeClaim.
// the synthetic pv arg is, therefore, a request for a storage resource.
//
func (pvIndex *persistentVolumeOrderedIndex) findByClaim(claim *api.PersistentVolumeClaim, matchPredicate matchPredicate) (*api.PersistentVolume, error) {
// PVs are indexed by their access modes to allow easier searching. Each index is the string representation of a set of access modes.
// There is a finite number of possible sets and PVs will only be indexed in one of them (whichever index matches the PV's modes).
//
@ -92,17 +89,7 @@ func (pvIndex *persistentVolumeOrderedIndex) find(searchPV *api.PersistentVolume
//
// Searches are performed against a set of access modes, so we can attempt not only the exact matching modes but also
// potential matches (the GCEPD example above).
allPossibleModes := pvIndex.allPossibleMatchingAccessModes(searchPV.Spec.AccessModes)
// the searchPV should contain an annotation that allows pre-binding to a claim.
// we can use the same annotation value (pvc's namespace/name) and check against
// existing volumes to find an exact match. It is possible that a bind is made (ClaimRef persisted to PV)
// but the fail to update claim.Spec.VolumeName fails. This check allows the claim to find the volume
// that's already bound to the claim.
preboundClaim := ""
if createdFor, ok := searchPV.Annotations[createdForKey]; ok {
preboundClaim = createdFor
}
allPossibleModes := pvIndex.allPossibleMatchingAccessModes(claim.Spec.AccessModes)
for _, modes := range allPossibleModes {
volumes, err := pvIndex.ListByAccessModes(modes)
@ -115,19 +102,34 @@ func (pvIndex *persistentVolumeOrderedIndex) find(searchPV *api.PersistentVolume
// return the exact pre-binding match, if found
unboundVolumes := []*api.PersistentVolume{}
for _, volume := range volumes {
// volume isn't currently bound or pre-bound.
if volume.Spec.ClaimRef == nil {
// volume isn't currently bound or pre-bound.
unboundVolumes = append(unboundVolumes, volume)
continue
}
boundClaim := fmt.Sprintf("%s/%s", volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name)
if boundClaim == preboundClaim {
if claim.Name == volume.Spec.ClaimRef.Name && claim.Namespace == volume.Spec.ClaimRef.Namespace {
// exact match! No search required.
return volume, nil
}
}
// a claim requesting provisioning will have an exact match pre-bound to the claim.
// no need to search through unbound volumes. The matching volume will be created by the provisioner
// and will match above when the claim is re-processed by the binder.
if keyExists(qosProvisioningKey, claim.Annotations) {
return nil, nil
}
searchPV := &api.PersistentVolume{
Spec: api.PersistentVolumeSpec{
AccessModes: claim.Spec.AccessModes,
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): claim.Spec.Resources.Requests[api.ResourceName(api.ResourceStorage)],
},
},
}
i := sort.Search(len(unboundVolumes), func(i int) bool { return matchPredicate(searchPV, unboundVolumes[i]) })
if i < len(unboundVolumes) {
return unboundVolumes[i], nil
@ -136,27 +138,9 @@ func (pvIndex *persistentVolumeOrderedIndex) find(searchPV *api.PersistentVolume
return nil, nil
}
// findByAccessModesAndStorageCapacity is a convenience method that calls Find w/ requisite matchPredicate for storage
func (pvIndex *persistentVolumeOrderedIndex) findByAccessModesAndStorageCapacity(prebindKey string, modes []api.PersistentVolumeAccessMode, qty resource.Quantity) (*api.PersistentVolume, error) {
pv := &api.PersistentVolume{
ObjectMeta: api.ObjectMeta{
Annotations: map[string]string{
createdForKey: prebindKey,
},
},
Spec: api.PersistentVolumeSpec{
AccessModes: modes,
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): qty,
},
},
}
return pvIndex.find(pv, matchStorageCapacity)
}
// findBestMatchForClaim is a convenience method that finds a volume by the claim's AccessModes and requests for Storage
func (pvIndex *persistentVolumeOrderedIndex) findBestMatchForClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolume, error) {
return pvIndex.findByAccessModesAndStorageCapacity(fmt.Sprintf("%s/%s", claim.Namespace, claim.Name), claim.Spec.AccessModes, claim.Spec.Resources.Requests[api.ResourceName(api.ResourceStorage)])
return pvIndex.findByClaim(claim, matchStorageCapacity)
}
// byCapacity is used to order volumes by ascending storage size
@ -268,3 +252,7 @@ func (c byAccessModes) Swap(i, j int) {
func (c byAccessModes) Len() int {
return len(c.modes)
}
func claimToClaimKey(claim *api.PersistentVolumeClaim) string {
return fmt.Sprintf("%s/%s", claim.Namespace, claim.Name)
}

View File

@ -22,7 +22,6 @@ import (
"regexp"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/volume"
@ -35,11 +34,11 @@ import (
func ProbeVolumePlugins(volumeConfig volume.VolumeConfig) []volume.VolumePlugin {
return []volume.VolumePlugin{
&hostPathPlugin{
host: nil,
newRecyclerFunc: newRecycler,
newDeleterFunc: newDeleter,
newCreaterFunc: newCreater,
config: volumeConfig,
host: nil,
newRecyclerFunc: newRecycler,
newDeleterFunc: newDeleter,
newProvisionerFunc: newProvisioner,
config: volumeConfig,
},
}
}
@ -47,28 +46,28 @@ func ProbeVolumePlugins(volumeConfig volume.VolumeConfig) []volume.VolumePlugin
func ProbeRecyclableVolumePlugins(recyclerFunc func(spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error), volumeConfig volume.VolumeConfig) []volume.VolumePlugin {
return []volume.VolumePlugin{
&hostPathPlugin{
host: nil,
newRecyclerFunc: recyclerFunc,
newCreaterFunc: newCreater,
config: volumeConfig,
host: nil,
newRecyclerFunc: recyclerFunc,
newProvisionerFunc: newProvisioner,
config: volumeConfig,
},
}
}
type hostPathPlugin struct {
host volume.VolumeHost
// decouple creating Recyclers/Deleters/Creaters by deferring to a function. Allows for easier testing.
newRecyclerFunc func(spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error)
newDeleterFunc func(spec *volume.Spec, host volume.VolumeHost) (volume.Deleter, error)
newCreaterFunc func(options volume.VolumeOptions, host volume.VolumeHost) (volume.Creater, error)
config volume.VolumeConfig
// decouple creating Recyclers/Deleters/Provisioners by deferring to a function. Allows for easier testing.
newRecyclerFunc func(spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error)
newDeleterFunc func(spec *volume.Spec, host volume.VolumeHost) (volume.Deleter, error)
newProvisionerFunc func(options volume.VolumeOptions, host volume.VolumeHost) (volume.Provisioner, error)
config volume.VolumeConfig
}
var _ volume.VolumePlugin = &hostPathPlugin{}
var _ volume.PersistentVolumePlugin = &hostPathPlugin{}
var _ volume.RecyclableVolumePlugin = &hostPathPlugin{}
var _ volume.DeletableVolumePlugin = &hostPathPlugin{}
var _ volume.CreatableVolumePlugin = &hostPathPlugin{}
var _ volume.ProvisionableVolumePlugin = &hostPathPlugin{}
const (
hostPathPluginName = "kubernetes.io/host-path"
@ -124,11 +123,11 @@ func (plugin *hostPathPlugin) NewDeleter(spec *volume.Spec) (volume.Deleter, err
return plugin.newDeleterFunc(spec, plugin.host)
}
func (plugin *hostPathPlugin) NewCreater(options volume.VolumeOptions) (volume.Creater, error) {
func (plugin *hostPathPlugin) NewProvisioner(options volume.VolumeOptions) (volume.Provisioner, error) {
if len(options.AccessModes) == 0 {
options.AccessModes = plugin.GetAccessModes()
}
return plugin.newCreaterFunc(options, plugin.host)
return plugin.newProvisionerFunc(options, plugin.host)
}
func newRecycler(spec *volume.Spec, host volume.VolumeHost, config volume.VolumeConfig) (volume.Recycler, error) {
@ -154,8 +153,8 @@ func newDeleter(spec *volume.Spec, host volume.VolumeHost) (volume.Deleter, erro
return &hostPathDeleter{spec.Name(), path, host, volume.NewMetricsDu(path)}, nil
}
func newCreater(options volume.VolumeOptions, host volume.VolumeHost) (volume.Creater, error) {
return &hostPathCreater{options: options, host: host}, nil
func newProvisioner(options volume.VolumeOptions, host volume.VolumeHost) (volume.Provisioner, error) {
return &hostPathProvisioner{options: options, host: host}, nil
}
// HostPath volumes represent a bare host file or directory mount.
@ -215,7 +214,7 @@ func (c *hostPathCleaner) TearDownAt(dir string) error {
return fmt.Errorf("TearDownAt() does not make sense for host paths")
}
// hostPathRecycler implements a dynamic provisioning Recycler for the HostPath plugin
// hostPathRecycler implements a Recycler for the HostPath plugin
// This implementation is meant for testing only and only works in a single node cluster
type hostPathRecycler struct {
name string
@ -246,34 +245,36 @@ func (r *hostPathRecycler) Recycle() error {
return volume.RecycleVolumeByWatchingPodUntilCompletion(pod, r.host.GetKubeClient())
}
// hostPathCreater implements a dynamic provisioning Creater for the HostPath plugin
// hostPathProvisioner implements a Provisioner for the HostPath plugin
// This implementation is meant for testing only and only works in a single node cluster.
type hostPathCreater struct {
type hostPathProvisioner struct {
host volume.VolumeHost
options volume.VolumeOptions
}
// Create for hostPath simply creates a local /tmp/hostpath_pv/%s directory as a new PersistentVolume.
// This Creater is meant for development and testing only and WILL NOT WORK in a multi-node cluster.
func (r *hostPathCreater) Create() (*api.PersistentVolume, error) {
fullpath := fmt.Sprintf("/tmp/hostpath_pv/%s", util.NewUUID())
err := os.MkdirAll(fullpath, 0750)
if err != nil {
return nil, err
// This Provisioner is meant for development and testing only and WILL NOT WORK in a multi-node cluster.
func (r *hostPathProvisioner) Provision(pv *api.PersistentVolume) error {
if pv.Spec.HostPath == nil {
return fmt.Errorf("pv.Spec.HostPath cannot be nil")
}
return os.MkdirAll(pv.Spec.HostPath.Path, 0750)
}
func (r *hostPathProvisioner) NewPersistentVolumeTemplate() (*api.PersistentVolume, error) {
fullpath := fmt.Sprintf("/tmp/hostpath_pv/%s", util.NewUUID())
return &api.PersistentVolume{
ObjectMeta: api.ObjectMeta{
GenerateName: "pv-hostpath-",
Labels: map[string]string{
"createdby": "hostpath dynamic provisioner",
Annotations: map[string]string{
"kubernetes.io/createdby": "hostpath-dynamic-provisioner",
},
},
Spec: api.PersistentVolumeSpec{
PersistentVolumeReclaimPolicy: r.options.PersistentVolumeReclaimPolicy,
AccessModes: r.options.AccessModes,
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse(fmt.Sprintf("%dMi", r.options.CapacityMB)),
api.ResourceName(api.ResourceStorage): r.options.Capacity,
},
PersistentVolumeSource: api.PersistentVolumeSource{
HostPath: &api.HostPathVolumeSource{

View File

@ -145,7 +145,7 @@ func TestDeleterTempDir(t *testing.T) {
}
}
func TestCreater(t *testing.T) {
func TestProvisioner(t *testing.T) {
tempPath := "/tmp/hostpath/"
defer os.RemoveAll(tempPath)
err := os.MkdirAll(tempPath, 0750)
@ -157,18 +157,18 @@ func TestCreater(t *testing.T) {
if err != nil {
t.Errorf("Can't find the plugin by name")
}
creater, err := plug.NewCreater(volume.VolumeOptions{CapacityMB: 100, PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimDelete})
creater, err := plug.NewProvisioner(volume.VolumeOptions{Capacity: resource.MustParse("1Gi"), PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimDelete})
if err != nil {
t.Errorf("Failed to make a new Creater: %v", err)
t.Errorf("Failed to make a new Provisioner: %v", err)
}
pv, err := creater.Create()
pv, err := creater.NewPersistentVolumeTemplate()
if err != nil {
t.Errorf("Unexpected error creating volume: %v", err)
}
if pv.Spec.HostPath.Path == "" {
t.Errorf("Expected pv.Spec.HostPath.Path to not be empty: %#v", pv)
}
expectedCapacity := resource.NewQuantity(100*1024*1024, resource.BinarySI)
expectedCapacity := resource.NewQuantity(1*1024*1024*1024, resource.BinarySI)
actualCapacity := pv.Spec.Capacity[api.ResourceStorage]
expectedAmt := expectedCapacity.Value()
actualAmt := actualCapacity.Value()

View File

@ -23,6 +23,7 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/types"
@ -39,11 +40,11 @@ type VolumeOptions struct {
// it will be replaced and expanded on by future SecurityContext work.
RootContext string
// The attributes below are required by volume.Creater
// perhaps CreaterVolumeOptions struct?
// The attributes below are required by volume.Provisioner
// TODO: refactor all of this out of volumes when an admin can configure many kinds of provisioners.
// CapacityMB is the size in MB of a volume.
CapacityMB int
// Capacity is the size of a volume.
Capacity resource.Quantity
// AccessModes of a volume
AccessModes []api.PersistentVolumeAccessMode
// Reclamation policy for a persistent volume
@ -106,12 +107,12 @@ type DeletableVolumePlugin interface {
NewDeleter(spec *Spec) (Deleter, error)
}
// CreatableVolumePlugin is an extended interface of VolumePlugin and is used to create volumes for the cluster.
type CreatableVolumePlugin interface {
// ProvisionableVolumePlugin is an extended interface of VolumePlugin and is used to create volumes for the cluster.
type ProvisionableVolumePlugin interface {
VolumePlugin
// NewCreater creates a new volume.Creater which knows how to create PersistentVolumes in accordance with
// NewProvisioner creates a new volume.Provisioner which knows how to create PersistentVolumes in accordance with
// the plugin's underlying storage provider
NewCreater(options VolumeOptions) (Creater, error)
NewProvisioner(options VolumeOptions) (Provisioner, error)
}
// VolumeHost is an interface that plugins can use to access the kubelet.
@ -365,13 +366,13 @@ func (pm *VolumePluginMgr) FindDeletablePluginBySpec(spec *Spec) (DeletableVolum
// FindCreatablePluginBySpec fetches a persistent volume plugin by name. If no plugin
// is found, returns error.
func (pm *VolumePluginMgr) FindCreatablePluginBySpec(spec *Spec) (CreatableVolumePlugin, error) {
func (pm *VolumePluginMgr) FindCreatablePluginBySpec(spec *Spec) (ProvisionableVolumePlugin, error) {
volumePlugin, err := pm.FindPluginBySpec(spec)
if err != nil {
return nil, err
}
if creatableVolumePlugin, ok := volumePlugin.(CreatableVolumePlugin); ok {
return creatableVolumePlugin, nil
if provisionableVolumePlugin, ok := volumePlugin.(ProvisionableVolumePlugin); ok {
return provisionableVolumePlugin, nil
}
return nil, fmt.Errorf("no creatable volume plugin matched")
}

View File

@ -117,10 +117,13 @@ func ProbeVolumePlugins(config VolumeConfig) []VolumePlugin {
type FakeVolumePlugin struct {
PluginName string
Host VolumeHost
Config VolumeConfig
}
var _ VolumePlugin = &FakeVolumePlugin{}
var _ RecyclableVolumePlugin = &FakeVolumePlugin{}
var _ DeletableVolumePlugin = &FakeVolumePlugin{}
var _ ProvisionableVolumePlugin = &FakeVolumePlugin{}
func (plugin *FakeVolumePlugin) Init(host VolumeHost) {
plugin.Host = host
@ -151,6 +154,10 @@ func (plugin *FakeVolumePlugin) NewDeleter(spec *Spec) (Deleter, error) {
return &FakeDeleter{"/attributesTransferredFromSpec", MetricsNil{}}, nil
}
func (plugin *FakeVolumePlugin) NewProvisioner(options VolumeOptions) (Provisioner, error) {
return &FakeProvisioner{options, plugin.Host}, nil
}
func (plugin *FakeVolumePlugin) GetAccessModes() []api.PersistentVolumeAccessMode {
return []api.PersistentVolumeAccessMode{}
}
@ -227,3 +234,36 @@ func (fd *FakeDeleter) Delete() error {
func (fd *FakeDeleter) GetPath() string {
return fd.path
}
type FakeProvisioner struct {
Options VolumeOptions
Host VolumeHost
}
func (fc *FakeProvisioner) NewPersistentVolumeTemplate() (*api.PersistentVolume, error) {
fullpath := fmt.Sprintf("/tmp/hostpath_pv/%s", util.NewUUID())
return &api.PersistentVolume{
ObjectMeta: api.ObjectMeta{
GenerateName: "pv-fakeplugin-",
Annotations: map[string]string{
"kubernetes.io/createdby": "fakeplugin-provisioner",
},
},
Spec: api.PersistentVolumeSpec{
PersistentVolumeReclaimPolicy: fc.Options.PersistentVolumeReclaimPolicy,
AccessModes: fc.Options.AccessModes,
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): fc.Options.Capacity,
},
PersistentVolumeSource: api.PersistentVolumeSource{
HostPath: &api.HostPathVolumeSource{
Path: fullpath,
},
},
},
}, nil
}
func (fc *FakeProvisioner) Provision(pv *api.PersistentVolume) error {
return nil
}

View File

@ -100,10 +100,16 @@ type Recycler interface {
Recycle() error
}
// Create adds a new resource in the storage provider and creates a PersistentVolume for the new resource.
// Calls to Create should block until complete.
type Creater interface {
Create() (*api.PersistentVolume, error)
// Provisioner is an interface that creates templates for PersistentVolumes and can create the volume
// as a new resource in the infrastructure provider.
type Provisioner interface {
// Provision creates the resource by allocating the underlying volume in a storage system.
// This method should block until completion.
Provision(*api.PersistentVolume) error
// NewPersistentVolumeTemplate creates a new PersistentVolume to be used as a template before saving.
// The provisioner will want to tweak its properties, assign correct annotations, etc.
// This func should *NOT* persist the PV in the API. That is left to the caller.
NewPersistentVolumeTemplate() (*api.PersistentVolume, error)
}
// Delete removes the resource from the underlying storage provider. Calls to this method should block until
@ -111,6 +117,7 @@ type Creater interface {
// A nil return indicates success.
type Deleter interface {
Volume
// This method should block until completion.
Delete() error
}

View File

@ -29,6 +29,7 @@ import (
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned"
client "k8s.io/kubernetes/pkg/client/unversioned"
fake_cloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/persistentvolume"
"k8s.io/kubernetes/pkg/conversion"
"k8s.io/kubernetes/pkg/volume"
@ -48,12 +49,16 @@ func TestPersistentVolumeRecycler(t *testing.T) {
binderClient := client.NewOrDie(&client.Config{Host: s.URL, GroupVersion: testapi.Default.GroupVersion()})
recyclerClient := client.NewOrDie(&client.Config{Host: s.URL, GroupVersion: testapi.Default.GroupVersion()})
testClient := client.NewOrDie(&client.Config{Host: s.URL, GroupVersion: testapi.Default.GroupVersion()})
host := volume.NewFakeVolumeHost("/tmp/fake", nil, nil)
binder := persistentvolumecontroller.NewPersistentVolumeClaimBinder(binderClient, 10*time.Minute)
plugins := []volume.VolumePlugin{&volume.FakeVolumePlugin{"plugin-name", host, volume.VolumeConfig{}}}
cloud := &fake_cloud.FakeCloud{}
binder := persistentvolumecontroller.NewPersistentVolumeClaimBinder(binderClient, 10*time.Second)
binder.Run()
defer binder.Stop()
recycler, _ := persistentvolumecontroller.NewPersistentVolumeRecycler(recyclerClient, 30*time.Minute, []volume.VolumePlugin{&volume.FakeVolumePlugin{"plugin-name", volume.NewFakeVolumeHost("/tmp/fake", nil, nil)}})
recycler, _ := persistentvolumecontroller.NewPersistentVolumeRecycler(recyclerClient, 30*time.Second, plugins, cloud)
recycler.Run()
defer recycler.Stop()