k3s/pkg/controller/persistentvolume/persistentvolume_claim_bind...

491 lines
18 KiB
Go

/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package persistentvolume
import (
"fmt"
"sync"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/client/cache"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/conversion"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog"
)
// PersistentVolumeClaimBinder is a controller that synchronizes PersistentVolumeClaims.
type PersistentVolumeClaimBinder struct {
volumeIndex *persistentVolumeOrderedIndex
volumeController *framework.Controller
claimController *framework.Controller
client binderClient
stopChannels map[string]chan struct{}
lock sync.RWMutex
}
// NewPersistentVolumeClaimBinder creates a new PersistentVolumeClaimBinder
func NewPersistentVolumeClaimBinder(kubeClient client.Interface, syncPeriod time.Duration) *PersistentVolumeClaimBinder {
volumeIndex := NewPersistentVolumeOrderedIndex()
binderClient := NewBinderClient(kubeClient)
binder := &PersistentVolumeClaimBinder{
volumeIndex: volumeIndex,
client: binderClient,
}
_, volumeController := framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return kubeClient.PersistentVolumes().List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return kubeClient.PersistentVolumes().Watch(options)
},
},
&api.PersistentVolume{},
// TODO: Can we have much longer period here?
syncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: binder.addVolume,
UpdateFunc: binder.updateVolume,
DeleteFunc: binder.deleteVolume,
},
)
_, claimController := framework.NewInformer(
&cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
return kubeClient.PersistentVolumeClaims(api.NamespaceAll).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return kubeClient.PersistentVolumeClaims(api.NamespaceAll).Watch(options)
},
},
&api.PersistentVolumeClaim{},
// TODO: Can we have much longer period here?
syncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: binder.addClaim,
UpdateFunc: binder.updateClaim,
DeleteFunc: binder.deleteClaim,
},
)
binder.claimController = claimController
binder.volumeController = volumeController
return binder
}
func (binder *PersistentVolumeClaimBinder) addVolume(obj interface{}) {
binder.lock.Lock()
defer binder.lock.Unlock()
pv, ok := obj.(*api.PersistentVolume)
if !ok {
glog.Errorf("Expected PersistentVolume but handler received %+v", obj)
return
}
if err := syncVolume(binder.volumeIndex, binder.client, pv); err != nil {
glog.Errorf("PVClaimBinder could not add volume %s: %+v", pv.Name, err)
}
}
func (binder *PersistentVolumeClaimBinder) updateVolume(oldObj, newObj interface{}) {
binder.lock.Lock()
defer binder.lock.Unlock()
newVolume, ok := newObj.(*api.PersistentVolume)
if !ok {
glog.Errorf("Expected PersistentVolume but handler received %+v", newObj)
return
}
if err := binder.volumeIndex.Update(newVolume); err != nil {
glog.Errorf("Error updating volume %s in index: %v", newVolume.Name, err)
return
}
if err := syncVolume(binder.volumeIndex, binder.client, newVolume); err != nil {
glog.Errorf("PVClaimBinder could not update volume %s: %+v", newVolume.Name, err)
}
}
func (binder *PersistentVolumeClaimBinder) deleteVolume(obj interface{}) {
binder.lock.Lock()
defer binder.lock.Unlock()
volume, ok := obj.(*api.PersistentVolume)
if !ok {
glog.Errorf("Expected PersistentVolume but handler received %+v", obj)
return
}
if err := binder.volumeIndex.Delete(volume); err != nil {
glog.Errorf("Error deleting volume %s from index: %v", volume.Name, err)
}
}
func (binder *PersistentVolumeClaimBinder) addClaim(obj interface{}) {
binder.lock.Lock()
defer binder.lock.Unlock()
claim, ok := obj.(*api.PersistentVolumeClaim)
if !ok {
glog.Errorf("Expected PersistentVolumeClaim but handler received %+v", obj)
return
}
if err := syncClaim(binder.volumeIndex, binder.client, claim); err != nil {
glog.Errorf("PVClaimBinder could not add claim %s: %+v", claim.Name, err)
}
}
func (binder *PersistentVolumeClaimBinder) updateClaim(oldObj, newObj interface{}) {
binder.lock.Lock()
defer binder.lock.Unlock()
newClaim, ok := newObj.(*api.PersistentVolumeClaim)
if !ok {
glog.Errorf("Expected PersistentVolumeClaim but handler received %+v", newObj)
return
}
if err := syncClaim(binder.volumeIndex, binder.client, newClaim); err != nil {
glog.Errorf("PVClaimBinder could not update claim %s: %+v", newClaim.Name, err)
}
}
func (binder *PersistentVolumeClaimBinder) deleteClaim(obj interface{}) {
binder.lock.Lock()
defer binder.lock.Unlock()
var volume *api.PersistentVolume
if pvc, ok := obj.(*api.PersistentVolumeClaim); ok {
if pvObj, exists, _ := binder.volumeIndex.GetByKey(pvc.Spec.VolumeName); exists {
if pv, ok := pvObj.(*api.PersistentVolume); ok {
volume = pv
}
}
}
if unk, ok := obj.(cache.DeletedFinalStateUnknown); ok && unk.Obj != nil {
if pv, ok := unk.Obj.(*api.PersistentVolume); ok {
volume = pv
}
}
// sync the volume when its claim is deleted. Explicitly sync'ing the volume here in response to
// claim deletion prevents the volume from waiting until the next sync period for its Release.
if volume != nil {
err := syncVolume(binder.volumeIndex, binder.client, volume)
if err != nil {
glog.Errorf("PVClaimBinder could not update volume %s from deleteClaim handler: %+v", volume.Name, err)
}
}
}
func syncVolume(volumeIndex *persistentVolumeOrderedIndex, binderClient binderClient, volume *api.PersistentVolume) (err error) {
glog.V(5).Infof("Synchronizing PersistentVolume[%s], current phase: %s\n", volume.Name, volume.Status.Phase)
// volumes can be in one of the following states:
//
// VolumePending -- default value -- not bound to a claim and not yet processed through this controller.
// VolumeAvailable -- not bound to a claim, but processed at least once and found in this controller's volumeIndex.
// VolumeBound -- bound to a claim because volume.Spec.ClaimRef != nil. Claim status may not be correct.
// VolumeReleased -- volume.Spec.ClaimRef != nil but the claim has been deleted by the user.
// VolumeFailed -- volume.Spec.ClaimRef != nil and the volume failed processing in the recycler
currentPhase := volume.Status.Phase
nextPhase := currentPhase
_, exists, err := volumeIndex.Get(volume)
if err != nil {
return err
}
if !exists {
volumeIndex.Add(volume)
}
if isBeingProvisioned(volume) {
glog.V(4).Infof("Skipping PersistentVolume[%s], waiting for provisioning to finish", volume.Name)
return nil
}
switch currentPhase {
case api.VolumePending:
// 3 possible states:
// 1. ClaimRef != nil and Claim exists: Prebound to claim. Make volume available for binding (it will match PVC).
// 2. ClaimRef != nil and Claim !exists: Recently recycled. Remove bind. Make volume available for new claim.
// 3. ClaimRef == nil: Neither recycled nor prebound. Make volume available for binding.
nextPhase = api.VolumeAvailable
if volume.Spec.ClaimRef != nil {
claim, err := binderClient.GetPersistentVolumeClaim(volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name)
if errors.IsNotFound(err) {
// Pending volumes that have a ClaimRef where the claim is missing were recently recycled.
// The Recycler set the phase to VolumePending to start the volume at the beginning of this lifecycle.
// removing ClaimRef unbinds the volume
clone, err := conversion.NewCloner().DeepCopy(volume)
if err != nil {
return fmt.Errorf("Error cloning pv: %v", err)
}
volumeClone, ok := clone.(*api.PersistentVolume)
if !ok {
return fmt.Errorf("Unexpected pv cast error : %v\n", volumeClone)
}
volumeClone.Spec.ClaimRef = nil
if updatedVolume, err := binderClient.UpdatePersistentVolume(volumeClone); err != nil {
return fmt.Errorf("Unexpected error saving PersistentVolume: %+v", err)
} else {
volume = updatedVolume
volumeIndex.Update(volume)
}
} else if err != nil {
return fmt.Errorf("Error getting PersistentVolumeClaim[%s/%s]: %v", volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name, err)
}
// Dynamically provisioned claims remain Pending until its volume is completely provisioned.
// The provisioner updates the PV and triggers this update for the volume. Explicitly sync'ing
// the claim here prevents the need to wait until the next sync period when the claim would normally
// advance to Bound phase. Otherwise, the maximum wait time for the claim to be Bound is the default sync period.
if claim != nil && claim.Status.Phase == api.ClaimPending && keyExists(qosProvisioningKey, claim.Annotations) && isProvisioningComplete(volume) {
syncClaim(volumeIndex, binderClient, claim)
}
}
glog.V(5).Infof("PersistentVolume[%s] is available\n", volume.Name)
// available volumes await a claim
case api.VolumeAvailable:
if volume.Spec.ClaimRef != nil {
_, err := binderClient.GetPersistentVolumeClaim(volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name)
if err == nil {
// change of phase will trigger an update event with the newly bound volume
glog.V(5).Infof("PersistentVolume[%s] is now bound\n", volume.Name)
nextPhase = api.VolumeBound
} else {
if errors.IsNotFound(err) {
nextPhase = api.VolumeReleased
}
}
}
//bound volumes require verification of their bound claims
case api.VolumeBound:
if volume.Spec.ClaimRef == nil {
return fmt.Errorf("PersistentVolume[%s] expected to be bound but found nil claimRef: %+v", volume.Name, volume)
} else {
_, err := binderClient.GetPersistentVolumeClaim(volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name)
if err != nil {
if errors.IsNotFound(err) {
nextPhase = api.VolumeReleased
} else {
return err
}
}
}
// released volumes require recycling
case api.VolumeReleased:
if volume.Spec.ClaimRef == nil {
return fmt.Errorf("PersistentVolume[%s] expected to be bound but found nil claimRef: %+v", volume.Name, volume)
} else {
// another process is watching for released volumes.
// PersistentVolumeReclaimPolicy is set per PersistentVolume
// Recycle - sets the PV to Pending and back under this controller's management
// Delete - delete events are handled by this controller's watch. PVs are removed from the index.
}
// volumes are removed by processes external to this binder and must be removed from the cluster
case api.VolumeFailed:
if volume.Spec.ClaimRef == nil {
return fmt.Errorf("PersistentVolume[%s] expected to be bound but found nil claimRef: %+v", volume.Name, volume)
} else {
glog.V(5).Infof("PersistentVolume[%s] previously failed recycling. Skipping.\n", volume.Name)
}
}
if currentPhase != nextPhase {
volume.Status.Phase = nextPhase
// a change in state will trigger another update through this controller.
// each pass through this controller evaluates current phase and decides whether or not to change to the next phase
glog.V(5).Infof("PersistentVolume[%s] changing phase from %s to %s\n", volume.Name, currentPhase, nextPhase)
volume, err := binderClient.UpdatePersistentVolumeStatus(volume)
if err != nil {
// Rollback to previous phase
volume.Status.Phase = currentPhase
}
volumeIndex.Update(volume)
}
return nil
}
func syncClaim(volumeIndex *persistentVolumeOrderedIndex, binderClient binderClient, claim *api.PersistentVolumeClaim) (err error) {
glog.V(5).Infof("Synchronizing PersistentVolumeClaim[%s]\n", claim.Name)
switch claim.Status.Phase {
case api.ClaimPending:
// claims w/ a storage-class annotation for provisioning with *only* match volumes with a ClaimRef of the claim.
volume, err := volumeIndex.findBestMatchForClaim(claim)
if err != nil {
return err
}
if volume == nil {
glog.V(5).Infof("A volume match does not exist for persistent claim: %s", claim.Name)
return nil
}
if isBeingProvisioned(volume) {
glog.V(5).Infof("PersistentVolume[%s] for PersistentVolumeClaim[%s/%s] is still being provisioned.", volume.Name, claim.Namespace, claim.Name)
return nil
}
claimRef, err := api.GetReference(claim)
if err != nil {
return fmt.Errorf("Unexpected error getting claim reference: %v\n", err)
}
// Make a binding reference to the claim by persisting claimRef on the volume.
// The local cache must be updated with the new bind to prevent subsequent
// claims from binding to the volume.
if volume.Spec.ClaimRef == nil {
clone, err := conversion.NewCloner().DeepCopy(volume)
if err != nil {
return fmt.Errorf("Error cloning pv: %v", err)
}
volumeClone, ok := clone.(*api.PersistentVolume)
if !ok {
return fmt.Errorf("Unexpected pv cast error : %v\n", volumeClone)
}
volumeClone.Spec.ClaimRef = claimRef
if updatedVolume, err := binderClient.UpdatePersistentVolume(volumeClone); err != nil {
return fmt.Errorf("Unexpected error saving PersistentVolume.Status: %+v", err)
} else {
volume = updatedVolume
volumeIndex.Update(updatedVolume)
}
}
// the bind is persisted on the volume above and will always match the claim in a search.
// claim would remain Pending if the update fails, so processing this state is idempotent.
// this only needs to be processed once.
if claim.Spec.VolumeName != volume.Name {
claim.Spec.VolumeName = volume.Name
claim, err = binderClient.UpdatePersistentVolumeClaim(claim)
if err != nil {
return fmt.Errorf("Error updating claim with VolumeName %s: %+v\n", volume.Name, err)
}
}
claim.Status.Phase = api.ClaimBound
claim.Status.AccessModes = volume.Spec.AccessModes
claim.Status.Capacity = volume.Spec.Capacity
_, err = binderClient.UpdatePersistentVolumeClaimStatus(claim)
if err != nil {
return fmt.Errorf("Unexpected error saving claim status: %+v", err)
}
case api.ClaimBound:
// no-op. Claim is bound, values from PV are set. PVCs are technically mutable in the API server
// and we don't want to handle those changes at this time.
default:
return fmt.Errorf("Unknown state for PVC: %#v", claim)
}
glog.V(5).Infof("PersistentVolumeClaim[%s] is bound\n", claim.Name)
return nil
}
func isBeingProvisioned(volume *api.PersistentVolume) bool {
value, found := volume.Annotations[pvProvisioningRequiredAnnotationKey]
if found && value != pvProvisioningCompletedAnnotationValue {
return true
}
return false
}
// Run starts all of this binder's control loops
func (controller *PersistentVolumeClaimBinder) Run() {
glog.V(5).Infof("Starting PersistentVolumeClaimBinder\n")
if controller.stopChannels == nil {
controller.stopChannels = make(map[string]chan struct{})
}
if _, exists := controller.stopChannels["volumes"]; !exists {
controller.stopChannels["volumes"] = make(chan struct{})
go controller.volumeController.Run(controller.stopChannels["volumes"])
}
if _, exists := controller.stopChannels["claims"]; !exists {
controller.stopChannels["claims"] = make(chan struct{})
go controller.claimController.Run(controller.stopChannels["claims"])
}
}
// Stop gracefully shuts down this binder
func (controller *PersistentVolumeClaimBinder) Stop() {
glog.V(5).Infof("Stopping PersistentVolumeClaimBinder\n")
for name, stopChan := range controller.stopChannels {
close(stopChan)
delete(controller.stopChannels, name)
}
}
// binderClient abstracts access to PVs and PVCs
type binderClient interface {
GetPersistentVolume(name string) (*api.PersistentVolume, error)
UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error)
DeletePersistentVolume(volume *api.PersistentVolume) error
UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error)
GetPersistentVolumeClaim(namespace, name string) (*api.PersistentVolumeClaim, error)
UpdatePersistentVolumeClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error)
UpdatePersistentVolumeClaimStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error)
}
func NewBinderClient(c client.Interface) binderClient {
return &realBinderClient{c}
}
type realBinderClient struct {
client client.Interface
}
func (c *realBinderClient) GetPersistentVolume(name string) (*api.PersistentVolume, error) {
return c.client.PersistentVolumes().Get(name)
}
func (c *realBinderClient) UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
return c.client.PersistentVolumes().Update(volume)
}
func (c *realBinderClient) DeletePersistentVolume(volume *api.PersistentVolume) error {
return c.client.PersistentVolumes().Delete(volume.Name)
}
func (c *realBinderClient) UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
return c.client.PersistentVolumes().UpdateStatus(volume)
}
func (c *realBinderClient) GetPersistentVolumeClaim(namespace, name string) (*api.PersistentVolumeClaim, error) {
return c.client.PersistentVolumeClaims(namespace).Get(name)
}
func (c *realBinderClient) UpdatePersistentVolumeClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) {
return c.client.PersistentVolumeClaims(claim.Namespace).Update(claim)
}
func (c *realBinderClient) UpdatePersistentVolumeClaimStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) {
return c.client.PersistentVolumeClaims(claim.Namespace).UpdateStatus(claim)
}