Addressed feedback, improved flow and comments

pull/6/head
markturansky 2015-04-16 13:26:08 -04:00
parent 242567460d
commit fb412e47e4
2 changed files with 110 additions and 85 deletions

View File

@ -45,6 +45,7 @@ import (
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/pflag"
"github.com/GoogleCloudPlatform/kubernetes/pkg/volumeclaimbinder"
)
// CMServer is the main context object for the controller manager.
@ -58,6 +59,7 @@ type CMServer struct {
NodeSyncPeriod time.Duration
ResourceQuotaSyncPeriod time.Duration
NamespaceSyncPeriod time.Duration
PVClaimBinderSyncPeriod time.Duration
RegisterRetryCount int
MachineList util.StringList
SyncNodeList bool
@ -90,6 +92,7 @@ func NewCMServer() *CMServer {
NodeSyncPeriod: 10 * time.Second,
ResourceQuotaSyncPeriod: 10 * time.Second,
NamespaceSyncPeriod: 5 * time.Minute,
PVClaimBinderSyncPeriod: 10 * time.Second,
RegisterRetryCount: 10,
PodEvictionTimeout: 5 * time.Minute,
NodeMilliCPU: 1000,
@ -113,6 +116,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
"fewer calls to cloud provider, but may delay addition of new nodes to cluster.")
fs.DurationVar(&s.ResourceQuotaSyncPeriod, "resource_quota_sync_period", s.ResourceQuotaSyncPeriod, "The period for syncing quota usage status in the system")
fs.DurationVar(&s.NamespaceSyncPeriod, "namespace_sync_period", s.NamespaceSyncPeriod, "The period for syncing namespace life-cycle updates")
fs.DurationVar(&s.PVClaimBinderSyncPeriod, "pvclaimbinder_sync_period", s.PVClaimBinderSyncPeriod, "The period for syncing persistent volumes and persistent volume claims")
fs.DurationVar(&s.PodEvictionTimeout, "pod_eviction_timeout", s.PodEvictionTimeout, "The grace peroid for deleting pods on failed nodes.")
fs.Float32Var(&s.DeletingPodsQps, "deleting_pods_qps", 0.1, "Number of nodes per second on which pods are deleted in case of node failure.")
fs.IntVar(&s.DeletingPodsBurst, "deleting_pods_burst", 10, "Number of nodes on which pods are bursty deleted in case of node failure. For more details look into RateLimiter.")
@ -231,6 +235,9 @@ func (s *CMServer) Run(_ []string) error {
namespaceManager := namespace.NewNamespaceManager(kubeClient, s.NamespaceSyncPeriod)
namespaceManager.Run()
pvclaimBinder := volumeclaimbinder.NewPersistentVolumeClaimBinder(kubeClient)
pvclaimBinder.Run(s.PVClaimBinderSyncPeriod)
select {}
return nil
}

View File

@ -17,10 +17,10 @@ limitations under the License.
package volumeclaimbinder
import (
"fmt"
"sync"
"time"
"fmt"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
@ -37,6 +37,8 @@ type PersistentVolumeClaimBinder struct {
volumeStore *persistentVolumeOrderedIndex
claimStore cache.Store
client client.Interface
// protects access to binding
lock sync.RWMutex
}
// NewPersistentVolumeClaimBinder creates a new PersistentVolumeClaimBinder
@ -82,6 +84,106 @@ func NewPersistentVolumeClaimBinder(kubeClient client.Interface) *PersistentVolu
return binder
}
// syncPersistentVolume inspects all bound PVs to determine if their bound PersistentVolumeClaim still exists.
func (controller *PersistentVolumeClaimBinder) syncPersistentVolume(obj interface{}) error {
volume := obj.(*api.PersistentVolume)
glog.V(5).Infof("Synchronizing PersistentVolume[%s]%s\n", volume.Name)
if volume.Spec.ClaimRef != nil {
if volume.Status.Phase == api.VolumeAvailable {
volume.Status.Phase = api.VolumeBound
_, err := controller.client.PersistentVolumes().Update(volume)
if err != nil {
return fmt.Errorf("Error updating pv.status: %v\n", err)
}
}
// verify the volume is still claimed by a user
if claim, err := controller.client.PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(volume.Spec.ClaimRef.Name); err == nil {
glog.V(5).Infof("PersistentVolume[%s] is bound to PersistentVolumeClaim[%s]\n", volume.Name, volume.Spec.ClaimRef.Name)
controller.syncPersistentVolumeClaimStatus(volume, claim)
} else {
//claim was deleted by user.
glog.V(3).Infof("PersistentVolumeClaim[%s] unbound from PersistentVolume[%s]\n", volume.Spec.ClaimRef.Name, volume.Name)
// volume.Spec.ClaimRef is deliberately left non-nil so that another process can recycle the newly release volume
volume.Status.Phase = api.VolumeReleased
volume, err = controller.client.PersistentVolumes().UpdateStatus(volume)
if err != nil {
return fmt.Errorf("Error updating pv: %+v\n", err)
}
}
}
return nil
}
func (controller *PersistentVolumeClaimBinder) syncPersistentVolumeClaim(obj interface{}) error {
claim := obj.(*api.PersistentVolumeClaim)
glog.V(5).Infof("Synchronizing PersistentVolumeClaim[%s]\n", claim.Name)
if claim.Status.VolumeRef != nil {
glog.V(5).Infof("PersistentVolumeClaim[%s] is bound to PersistentVolume[%s]\n", claim.Name, claim.Status.VolumeRef.Name)
return nil
}
pv, err := controller.volumeStore.FindBestMatchForClaim(claim)
if err != nil {
return err
}
if pv != nil {
claimRef, err := api.GetReference(claim)
if err != nil {
return fmt.Errorf("Unexpected error getting claim reference: %v\n", err)
}
// make a binding reference to the claim
pv.Spec.ClaimRef = claimRef
pv, err = controller.client.PersistentVolumes().Update(pv)
if err != nil {
// volume no longer bound
pv.Spec.ClaimRef = nil
return fmt.Errorf("Error updating volume: %+v\n", err)
} else {
glog.V(3).Infof("PersistentVolumeClaim[%s] bound to PersistentVolume[%s]\n", claim.Name, pv.Name)
pv.Status.Phase = api.VolumeBound
err := controller.syncPersistentVolumeClaimStatus(pv, claim)
if err != nil {
return fmt.Errorf("Error updating pvclaim.status: %v\n", err)
}
}
} else {
glog.V(5).Infof("No volume match found for PersistentVolumeClaim[%s]\n", claim.UID)
}
return nil
}
// syncPersistentVolumeClaimStatus builds and persistens a PVClaim's Status, rolling back to empty values if the update fails
func (controller *PersistentVolumeClaimBinder) syncPersistentVolumeClaimStatus(volume *api.PersistentVolume, claim *api.PersistentVolumeClaim) error {
volumeRef, err := api.GetReference(volume)
if err != nil {
return fmt.Errorf("Unexpected error getting volume reference: %v\n", err)
}
// all "actuals" are transferred from PV to PVC so the user knows what
// type of volume they actually got for their claim
claim.Status.Phase = api.ClaimBound
claim.Status.VolumeRef = volumeRef
claim.Status.AccessModes = volume.Spec.AccessModes
claim.Status.Capacity = volume.Spec.Capacity
_, err = controller.client.PersistentVolumeClaims(claim.Namespace).UpdateStatus(claim)
if err != nil {
claim.Status.Phase = api.ClaimPending
claim.Status.VolumeRef = nil
claim.Status.AccessModes = nil
claim.Status.Capacity = nil
}
return err
}
func (controller *PersistentVolumeClaimBinder) Run(period time.Duration) {
glog.V(5).Infof("Starting PersistentVolumeClaimBinder\n")
go util.Forever(func() { controller.synchronize() }, period)
@ -130,87 +232,3 @@ func (controller *PersistentVolumeClaimBinder) reconcile(synchronizers ...Synchr
wg.Wait()
}
}
// syncPersistentVolume inspects all bound PVs to determine if their bound PersistentVolumeClaim still exists.
func (controller *PersistentVolumeClaimBinder) syncPersistentVolume(obj interface{}) error {
volume := obj.(*api.PersistentVolume)
glog.V(5).Infof("Synchronizing persistent volume: %s\n", volume.Name)
// verify the volume is still claimed by a user
if volume.Spec.ClaimRef != nil {
if _, err := controller.client.PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(volume.Spec.ClaimRef.Name); err == nil {
glog.V(5).Infof("PersistentVolume[%s] is bound to PersistentVolumeClaim[%s]\n", volume.Name, volume.Spec.ClaimRef.Name)
} else {
//claim was deleted by user.
glog.V(3).Infof("PersistentVolumeClaim[UID=%s] unbound from PersistentVolume[UID=%s]\n", volume.Spec.ClaimRef.UID, volume.UID)
volume.Spec.ClaimRef = nil
volume.Status.Phase = api.VolumeReleased
volume, err = controller.client.PersistentVolumes().Update(volume)
if err != nil {
glog.V(3).Infof("Error updating volume: %+v\n", err)
}
}
}
return nil
}
func (controller *PersistentVolumeClaimBinder) syncPersistentVolumeClaim(obj interface{}) error {
claim := obj.(*api.PersistentVolumeClaim)
glog.V(5).Infof("Synchronizing persistent volume claim: %s\n", claim.Name)
if claim.Status.VolumeRef != nil {
glog.V(5).Infof("PersistentVolumeClaim[UID=%s] is bound to PersistentVolume[UID=%s]\n", claim.Name, claim.Status.VolumeRef.Name)
return nil
}
volume, err := controller.volumeStore.FindBestMatchForClaim(claim)
if err != nil {
return err
}
if volume != nil {
claimRef, err := api.GetReference(claim)
if err != nil {
return fmt.Errorf("Unexpected error getting claim reference: %v\n", err)
}
volumeRef, err := api.GetReference(volume)
if err != nil {
return fmt.Errorf("Unexpected error getting volume reference: %v\n", err)
}
// make a binding reference to the claim
volume.Spec.ClaimRef = claimRef
volume, err = controller.client.PersistentVolumes().Update(volume)
if err != nil {
glog.V(3).Infof("Error updating volume: %+v\n", err)
} else {
// all "actuals" are transferred from PV to PVC so the user knows what
// type of volume they actually got for their claim
claim.Status.Phase = api.ClaimBound
claim.Status.VolumeRef = volumeRef
claim.Status.AccessModes = volume.Spec.AccessModes
claim.Status.Capacity = volume.Spec.Capacity
_, err = controller.client.PersistentVolumeClaims(claim.Namespace).UpdateStatus(claim)
if err != nil {
glog.V(3).Infof("Error updating claim: %+v\n", err)
// uset ClaimRef on the pointer to make it available for binding again
volume.Spec.ClaimRef = nil
volume, err = controller.client.PersistentVolumes().Update(volume)
// unset VolumeRef on the pointer so this claim can be processed next sync loop
claim.Status.VolumeRef = nil
} else {
glog.V(2).Infof("PersistentVolumeClaim[UID=%s] bound to PersistentVolume[UID=%s]\n", claim.UID, volume.UID)
}
}
} else {
glog.V(5).Infof("No volume match found for %s\n", claim.UID)
}
return nil
}