provisioning: Implement provisioner

Jan Safranek 2016-05-17 14:55:25 +02:00
parent 75b0e2ad63
commit 514d595881
4 changed files with 197 additions and 25 deletions

View File

@ -384,6 +384,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))

View File

@ -282,6 +282,7 @@ func (s *CMServer) Run(_ []string) error {

View File

@ -80,6 +80,30 @@ const annBoundByController = ""
// Value of this annotation should be empty.
const annClass = ""
// This annotation is added to a PV that has been dynamically provisioned by
// Kubernetes. It's value is name of volume plugin that created the volume.
// It serves both user (to show where a PV comes from) and Kubernetes (to
// recognize dynamically provisioned PVs in its decissions).
const annDynamicallyProvisioned = ""
// Name of a tag attached to a real volume in cloud (e.g. AWS EBS or GCE PD)
// with namespace of a persistent volume claim used to create this volume.
const cloudVolumeCreatedForClaimNamespaceTag = ""
// Name of a tag attached to a real volume in cloud (e.g. AWS EBS or GCE PD)
// with name of a persistent volume claim used to create this volume.
const cloudVolumeCreatedForClaimNameTag = ""
// Name of a tag attached to a real volume in cloud (e.g. AWS EBS or GCE PD)
// with name of appropriate Kubernetes persistent volume .
const cloudVolumeCreatedForVolumeNameTag = ""
// Number of retries when we create a PV object for a provisioned volume.
const createProvisionedPVRetryCount = 5
// Interval between retries when we create a PV object for a provisioned volume.
const createProvisionedPVInterval = 10 * time.Second
// PersistentVolumeController is a controller that synchronizes
// PersistentVolumeClaims and PersistentVolumes. It starts two
// framework.Controllers that watch PerstentVolume and PersistentVolumeClaim
@ -95,6 +119,8 @@ type PersistentVolumeController struct {
eventRecorder record.EventRecorder
cloud cloudprovider.Interface
recyclePluginMgr vol.VolumePluginMgr
provisioner vol.ProvisionableVolumePlugin
clusterName string
// PersistentVolumeController keeps track of long running operations and
// makes sure it won't start the same operation twice in parallel.
@ -112,6 +138,9 @@ type PersistentVolumeController struct {
// For testing only: hook to call before an asynchronous operation starts.
// Not used when set to nil.
preOperationHook func(operationName string, operationArgument interface{})
createProvisionedPVRetryCount int
createProvisionedPVInterval time.Duration
// NewPersistentVolumeController creates a new PersistentVolumeController
@ -120,19 +149,29 @@ func NewPersistentVolumeController(
syncPeriod time.Duration,
provisioner vol.ProvisionableVolumePlugin,
recyclers []vol.VolumePlugin,
cloud cloudprovider.Interface) *PersistentVolumeController {
cloud cloudprovider.Interface,
clusterName string) *PersistentVolumeController {
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(&unversioned_core.EventSinkImpl{Interface: kubeClient.Core().Events("")})
recorder := broadcaster.NewRecorder(api.EventSource{Component: "persistentvolume-controller"})
controller := &PersistentVolumeController{
kubeClient: kubeClient,
eventRecorder: recorder,
runningOperations: make(map[string]bool),
cloud: cloud,
kubeClient: kubeClient,
eventRecorder: recorder,
runningOperations: make(map[string]bool),
cloud: cloud,
provisioner: provisioner,
clusterName: clusterName,
createProvisionedPVRetryCount: createProvisionedPVRetryCount,
createProvisionedPVInterval: createProvisionedPVInterval,
controller.recyclePluginMgr.InitPlugins(recyclers, controller)
if controller.provisioner != nil {
if err := controller.provisioner.Init(controller); err != nil {
glog.Errorf("PersistentVolumeController: error initializing provisioner plugin: %v", err)
volumeSource := &cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
@ -376,25 +415,10 @@ func (ctrl *PersistentVolumeController) syncUnboundClaim(claim *api.PersistentVo
// No PV could be found
// OBSERVATION: pvc is "Pending", will retry
if hasAnnotation(claim.ObjectMeta, annClass) {
// TODO: provisioning
//plugin := findProvisionerPluginForPV(pv) // Need to flesh this out
//if plugin != nil {
//FIXME: left off here
// No match was found and provisioning was requested.
// maintain a map with the current provisioner goroutines that are running
// if the key is already present in the map, return
// launch the goroutine that:
// 1. calls plugin.Provision to make the storage asset
// 2. gets back a PV object (partially filled)
// 3. create the PV API object, with claimRef -> pvc
// 4. deletes itself from the map when it's done
// return
//} else {
// make an event calling out that no provisioner was configured
// return, try later?
if err = ctrl.provisionClaim(claim); err != nil {
return err
return nil
// Mark the claim as Pending and try to find a match in the next
// periodic syncClaim
@ -1036,7 +1060,6 @@ func (ctrl *PersistentVolumeController) recycleVolumeOperation(arg interface{})
glog.Errorf("Cannot convert recycleVolumeOperation argument to volume, got %+v", arg)
glog.V(4).Infof("recycleVolumeOperation [%s] started", volume.Name)
// This method may have been waiting for a volume lock for some time.
@ -1241,6 +1264,149 @@ func (ctrl *PersistentVolumeController) doDeleteVolume(volume *api.PersistentVol
return nil
// provisionClaim starts new asynchronous operation to provision a claim.
func (ctrl *PersistentVolumeController) provisionClaim(claim *api.PersistentVolumeClaim) error {
glog.V(4).Infof("provisionClaim[%s]: started", claimToClaimKey(claim))
ctrl.scheduleOperation("provision-"+string(claim.UID), ctrl.provisionClaimOperation, claim)
return nil
// provisionClaimOperation provisions a volume. This method is running in
// standalone goroutine and already has all necessary locks.
func (ctrl *PersistentVolumeController) provisionClaimOperation(claimObj interface{}) {
claim, ok := claimObj.(*api.PersistentVolumeClaim)
if !ok {
glog.Errorf("Cannot convert provisionClaimOperation argument to claim, got %+v", claimObj)
glog.V(4).Infof("provisionClaimOperation [%s] started", claimToClaimKey(claim))
// A previous doProvisionClaim may just have finished while we were waiting for
// the locks. Check that PV (with deterministic name) hasn't been provisioned
// yet.
pvName := ctrl.getProvisionedVolumeNameForClaim(claim)
volume, err := ctrl.kubeClient.Core().PersistentVolumes().Get(pvName)
if err == nil && volume != nil {
// Volume has been already provisioned, nothing to do.
glog.V(4).Infof("provisionClaimOperation [%s]: volume already exists, skipping", claimToClaimKey(claim))
// Prepare a claimRef to the claim early (to fail before a volume is
// provisioned)
claimRef, err := api.GetReference(claim)
if err != nil {
glog.V(3).Infof("unexpected error getting claim reference: %v", err)
// TODO: find provisionable plugin based on a class/profile
plugin := ctrl.provisioner
if plugin == nil {
// No provisioner found. Emit an event.
ctrl.eventRecorder.Event(claim, api.EventTypeWarning, "ProvisioningFailed", "No provisioner plugin found for the claim!")
glog.V(2).Infof("no provisioner plugin found for claim %s!", claimToClaimKey(claim))
// The controller will retry provisioning the volume in every
// syncVolume() call.
// Gather provisioning options
tags := make(map[string]string)
tags[cloudVolumeCreatedForClaimNamespaceTag] = claim.Namespace
tags[cloudVolumeCreatedForClaimNameTag] = claim.Name
tags[cloudVolumeCreatedForVolumeNameTag] = pvName
options := vol.VolumeOptions{
Capacity: claim.Spec.Resources.Requests[api.ResourceName(api.ResourceStorage)],
AccessModes: claim.Spec.AccessModes,
PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimDelete,
CloudTags: &tags,
ClusterName: ctrl.clusterName,
PVName: pvName,
// Provision the volume
provisioner, err := plugin.NewProvisioner(options)
if err != nil {
strerr := fmt.Sprintf("Failed to create provisioner: %v", err)
glog.V(2).Infof("failed to create provisioner for claim %q: %v", claimToClaimKey(claim), err)
ctrl.eventRecorder.Event(claim, api.EventTypeWarning, "ProvisioningFailed", strerr)
volume, err = provisioner.Provision()
if err != nil {
strerr := fmt.Sprintf("Failed to provision volume: %v", err)
glog.V(2).Infof("failed to provision volume for claim %q: %v", claimToClaimKey(claim), err)
ctrl.eventRecorder.Event(claim, api.EventTypeWarning, "ProvisioningFailed", strerr)
glog.V(3).Infof("volume %q for claim %q created", volume.Name, claimToClaimKey(claim))
// Create Kubernetes PV object for the volume.
volume.Name = pvName
// Bind it to the claim
volume.Spec.ClaimRef = claimRef
volume.Status.Phase = api.VolumeBound
// Add annBoundByController (used in deleting the volume)
setAnnotation(&volume.ObjectMeta, annBoundByController, "yes")
setAnnotation(&volume.ObjectMeta, annDynamicallyProvisioned, plugin.Name())
// Try to create the PV object several times
for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ {
glog.V(4).Infof("provisionClaimOperation [%s]: trying to save volume %s", claimToClaimKey(claim), volume.Name)
if _, err = ctrl.kubeClient.Core().PersistentVolumes().Create(volume); err == nil {
// Save succeeded.
glog.V(3).Infof("volume %q for claim %q saved", volume.Name, claimToClaimKey(claim))
// Save failed, try again after a while.
glog.V(3).Infof("failed to save volume %q for claim %q: %v", volume.Name, claimToClaimKey(claim), err)
if err != nil {
// Save failed. Now we have a storage asset outside of Kubernetes,
// but we don't have appropriate PV object for it.
// Emit some event here and try to delete the storage asset several
// times.
strerr := fmt.Sprintf("Error creating provisioned PV object for claim %s: %v. Deleting the volume.", claimToClaimKey(claim), err)
ctrl.eventRecorder.Event(claim, api.EventTypeWarning, "ProvisioningFailed", strerr)
for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ {
if err = ctrl.doDeleteVolume(volume); err == nil {
// Delete succeeded
glog.V(4).Infof("provisionClaimOperation [%s]: cleaning volume %s succeeded", claimToClaimKey(claim), volume.Name)
// Delete failed, try again after a while.
glog.V(3).Infof("failed to delete volume %q: %v", volume.Name, i, err)
if err != nil {
// Delete failed several times. There is orphaned volume and there
// is nothing we can do about it.
strerr := fmt.Sprintf("Error cleaning provisioned volume for claim %s: %v. Please delete manually.", claimToClaimKey(claim), err)
ctrl.eventRecorder.Event(claim, api.EventTypeWarning, "ProvisioningCleanupFailed", strerr)
} else {
glog.V(2).Infof("volume %q provisioned for claim %q", volume.Name, claimToClaimKey(claim))
// getProvisionedVolumeNameForClaim returns PV.Name for the provisioned volume.
// The name must be unique
func (ctrl *PersistentVolumeController) getProvisionedVolumeNameForClaim(claim *api.PersistentVolumeClaim) string {
return "pv-provisioned-for-" + string(claim.UID)
// scheduleOperation starts given asynchronous operation on given volume. It
// makes sure the operation is already not running.
func (ctrl *PersistentVolumeController) scheduleOperation(operationName string, operation func(arg interface{}), arg interface{}) {

View File

@ -424,6 +424,10 @@ func newPersistentVolumeController(kubeClient clientset.Interface) *PersistentVo
kubeClient: kubeClient,
eventRecorder: record.NewFakeRecorder(1000),
runningOperations: make(map[string]bool),
// Speed up the testing
createProvisionedPVRetryCount: createProvisionedPVRetryCount,
createProvisionedPVInterval: 5 * time.Millisecond,
return ctrl