Refactored to DeltaFifo

pull/6/head
markturansky 2015-04-18 06:54:33 -04:00
parent 49883e7d01
commit b634f17ca7
3 changed files with 190 additions and 145 deletions

View File

@ -235,8 +235,8 @@ func (s *CMServer) Run(_ []string) error {
namespaceManager := namespace.NewNamespaceManager(kubeClient, s.NamespaceSyncPeriod)
namespaceManager.Run()
pvclaimBinder := volumeclaimbinder.NewPersistentVolumeClaimBinder(kubeClient)
pvclaimBinder.Run(s.PVClaimBinderSyncPeriod)
pvclaimBinder := volumeclaimbinder.NewPersistentVolumeClaimBinder(kubeClient, s.PVClaimBinderSyncPeriod)
pvclaimBinder.Run()
select {}
return nil

View File

@ -18,34 +18,35 @@ package volumeclaimbinder
import (
"fmt"
"sync"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/golang/glog"
"reflect"
)
// PersistentVolumeClaimBinder is a controller that synchronizes PersistentVolumeClaims.
type PersistentVolumeClaimBinder struct {
volumeStore *persistentVolumeOrderedIndex
claimStore cache.Store
client binderClient
// protects access to binding
lock sync.RWMutex
volumeIndex *persistentVolumeOrderedIndex
volumeController *framework.Controller
claimController *framework.Controller
client binderClient
}
// NewPersistentVolumeClaimBinder creates a new PersistentVolumeClaimBinder
func NewPersistentVolumeClaimBinder(kubeClient client.Interface) *PersistentVolumeClaimBinder {
volumeStore := NewPersistentVolumeOrderedIndex()
volumeReflector := cache.NewReflector(
func NewPersistentVolumeClaimBinder(kubeClient client.Interface, syncPeriod time.Duration) *PersistentVolumeClaimBinder {
volumeIndex := NewPersistentVolumeOrderedIndex()
binderClient := NewBinderClient(kubeClient)
_, volumeController := framework.NewInformer(
&cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return kubeClient.PersistentVolumes().List(labels.Everything(), fields.Everything())
@ -55,13 +56,28 @@ func NewPersistentVolumeClaimBinder(kubeClient client.Interface) *PersistentVolu
},
},
&api.PersistentVolume{},
volumeStore,
0,
syncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
volume := obj.(*api.PersistentVolume)
volumeIndex.Indexer.Add(volume)
syncVolume(binderClient, volume)
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldVolume := oldObj.(*api.PersistentVolume)
newVolume := newObj.(*api.PersistentVolume)
volumeIndex.Indexer.Update(newVolume)
if updateRequired(oldVolume, newVolume) {
syncVolume(binderClient, newVolume)
}
},
DeleteFunc: func(obj interface{}) {
volume := obj.(*api.PersistentVolume)
volumeIndex.Indexer.Delete(volume)
},
},
)
volumeReflector.Run()
claimStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
claimReflector := cache.NewReflector(
_, claimController := framework.NewInformer(
&cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return kubeClient.PersistentVolumeClaims(api.NamespaceAll).List(labels.Everything(), fields.Everything())
@ -71,51 +87,75 @@ func NewPersistentVolumeClaimBinder(kubeClient client.Interface) *PersistentVolu
},
},
&api.PersistentVolumeClaim{},
claimStore,
0,
syncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
claim := obj.(*api.PersistentVolumeClaim)
syncClaim(volumeIndex, binderClient, claim)
},
UpdateFunc: func(oldObj, newObj interface{}) {
// oldClaim := newObj.(*api.PersistentVolumeClaim)
newClaim := newObj.(*api.PersistentVolumeClaim)
if newClaim.Status.VolumeRef == nil {
syncClaim(volumeIndex, binderClient, newClaim)
}
},
},
)
claimReflector.Run()
binder := &PersistentVolumeClaimBinder{
volumeStore: volumeStore,
claimStore: claimStore,
client: NewBinderClient(kubeClient),
volumeController: volumeController,
claimController: claimController,
volumeIndex: volumeIndex,
client: binderClient,
}
return binder
}
// syncPersistentVolume inspects all bound PVs to determine if their bound PersistentVolumeClaim still exists.
func (controller *PersistentVolumeClaimBinder) syncPersistentVolume(obj interface{}) error {
volume := obj.(*api.PersistentVolume)
func updateRequired(oldVolume, newVolume *api.PersistentVolume) bool {
// Spec changes affect indexing and sorting volumes
if !reflect.DeepEqual(oldVolume.Spec, newVolume.Spec) {
return true
}
if !reflect.DeepEqual(oldVolume.Status, newVolume.Status) {
return true
}
return false
}
func syncVolume(binderClient binderClient, volume *api.PersistentVolume) (err error) {
glog.V(5).Infof("Synchronizing PersistentVolume[%s]\n", volume.Name)
if volume.Spec.ClaimRef != nil {
if volume.Status.Phase == api.VolumeAvailable {
volume.Status.Phase = api.VolumeBound
_, err := controller.client.UpdatePersistentVolumeStatus(volume)
_, err := binderClient.UpdatePersistentVolumeStatus(volume)
if err != nil {
return fmt.Errorf("Error updating pv.status: %v\n", err)
}
}
// verify the volume is still claimed by a user
if claim, err := controller.client.GetPersistentVolumeClaim(volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name); err == nil {
if claim, err := binderClient.GetPersistentVolumeClaim(volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name); err == nil {
glog.V(5).Infof("PersistentVolume[%s] is bound to PersistentVolumeClaim[%s]\n", volume.Name, volume.Spec.ClaimRef.Name)
controller.syncPersistentVolumeClaimStatus(volume, claim)
// rebuild the Claim's Status as needed
if claim.Status.VolumeRef == nil {
syncClaimStatus(binderClient, volume, claim)
}
} else {
//claim was deleted by user.
glog.V(3).Infof("PersistentVolumeClaim[%s] unbound from PersistentVolume[%s]\n", volume.Spec.ClaimRef.Name, volume.Name)
// volume.Spec.ClaimRef is deliberately left non-nil so that another process can recycle the newly released volume
volume.Status.Phase = api.VolumeReleased
volume, err = controller.client.UpdatePersistentVolumeStatus(volume)
volume, err = binderClient.UpdatePersistentVolumeStatus(volume)
if err != nil {
return fmt.Errorf("Error updating pv: %+v\n", err)
}
}
} else {
volume.Status.Phase = api.VolumeAvailable
_, err := controller.client.UpdatePersistentVolumeStatus(volume)
_, err := binderClient.UpdatePersistentVolumeStatus(volume)
if err != nil {
return fmt.Errorf("Error updating pv.status: %v\n", err)
}
@ -123,58 +163,7 @@ func (controller *PersistentVolumeClaimBinder) syncPersistentVolume(obj interfac
return nil
}
func (controller *PersistentVolumeClaimBinder) syncPersistentVolumeClaim(obj interface{}) error {
controller.lock.Lock()
defer controller.lock.Unlock()
claim := obj.(*api.PersistentVolumeClaim)
glog.V(5).Infof("Synchronizing PersistentVolumeClaim[%s]\n", claim.Name)
if claim.Status.VolumeRef != nil {
glog.V(5).Infof("PersistentVolumeClaim[%s] is bound to PersistentVolume[%s]\n", claim.Name, claim.Status.VolumeRef.Name)
return nil
}
pv, err := controller.volumeStore.FindBestMatchForClaim(claim)
if err != nil {
return err
}
if pv != nil {
claimRef, err := api.GetReference(claim)
if err != nil {
return fmt.Errorf("Unexpected error getting claim reference: %v\n", err)
}
// make a binding reference to the claim
pv.Spec.ClaimRef = claimRef
pv, err = controller.client.UpdatePersistentVolume(pv)
if err != nil {
// volume no longer bound
pv.Spec.ClaimRef = nil
return fmt.Errorf("Error updating volume: %+v\n", err)
} else {
glog.V(3).Infof("PersistentVolumeClaim[%s] bound to PersistentVolume[%s]\n", claim.Name, pv.Name)
pv.Status.Phase = api.VolumeBound
err := controller.syncPersistentVolumeClaimStatus(pv, claim)
if err != nil {
return fmt.Errorf("Error updating pvclaim.status: %v\n", err)
}
}
} else {
glog.V(5).Infof("No volume match found for PersistentVolumeClaim[%s]\n", claim.UID)
claim.Status.Phase = api.ClaimPending
_, err := controller.client.UpdatePersistentVolumeClaimStatus(claim)
if err != nil {
return fmt.Errorf("Error updating pvclaim.status: %v\n", err)
}
}
return nil
}
// syncPersistentVolumeClaimStatus builds and persistens a PVClaim's Status, rolling back to empty values if the update fails
func (controller *PersistentVolumeClaimBinder) syncPersistentVolumeClaimStatus(volume *api.PersistentVolume, claim *api.PersistentVolumeClaim) error {
func syncClaimStatus(binderClient binderClient, volume *api.PersistentVolume, claim *api.PersistentVolumeClaim) (err error) {
volumeRef, err := api.GetReference(volume)
if err != nil {
return fmt.Errorf("Unexpected error getting volume reference: %v\n", err)
@ -186,7 +175,7 @@ func (controller *PersistentVolumeClaimBinder) syncPersistentVolumeClaimStatus(v
claim.Status.AccessModes = volume.Spec.AccessModes
claim.Status.Capacity = volume.Spec.Capacity
_, err = controller.client.UpdatePersistentVolumeClaimStatus(claim)
_, err = binderClient.UpdatePersistentVolumeClaimStatus(claim)
if err != nil {
claim.Status.Phase = api.ClaimPending
@ -198,54 +187,56 @@ func (controller *PersistentVolumeClaimBinder) syncPersistentVolumeClaimStatus(v
return err
}
func (controller *PersistentVolumeClaimBinder) Run(period time.Duration) {
func syncClaim(volumeIndex *persistentVolumeOrderedIndex, binderClient binderClient, claim *api.PersistentVolumeClaim) (err error) {
glog.V(5).Infof("Synchronizing PersistentVolumeClaim[%s]\n", claim.Name)
if claim.Status.VolumeRef != nil {
glog.V(5).Infof("PersistentVolumeClaim[%s] is bound to PersistentVolume[%s]\n", claim.Name, claim.Status.VolumeRef.Name)
return nil
}
volume, err := volumeIndex.FindBestMatchForClaim(claim)
if err != nil {
return err
}
if volume != nil {
claimRef, err := api.GetReference(claim)
if err != nil {
return fmt.Errorf("Unexpected error getting claim reference: %v\n", err)
}
// make a binding reference to the claim
volume.Spec.ClaimRef = claimRef
volume, err = binderClient.UpdatePersistentVolume(volume)
if err != nil {
// volume no longer bound
volume.Spec.ClaimRef = nil
return fmt.Errorf("Error updating volume: %+v\n", err)
} else {
err = syncClaimStatus(binderClient, volume, claim)
if err != nil {
return fmt.Errorf("Error update claim.status: %+v\n", err)
}
}
} else {
glog.V(5).Infof("No volume match found for PersistentVolumeClaim[%s]\n", claim.UID)
if claim.Status.Phase != api.ClaimPending {
claim.Status.Phase = api.ClaimPending
_, err := binderClient.UpdatePersistentVolumeClaimStatus(claim)
if err != nil {
return fmt.Errorf("Error updating pvclaim.status: %v\n", err)
}
}
}
return nil
}
func (controller *PersistentVolumeClaimBinder) Run() {
glog.V(5).Infof("Starting PersistentVolumeClaimBinder\n")
go util.Forever(func() { controller.synchronize() }, period)
}
// Synchronizer is a generic List/ProcessFunc used by the Reconcile function & reconciliation loop,
// because we're reconciling two Kinds in this component and don't want to dupe the loop
// TODO MarkT - refactor to new DeltaFifo and new controller framework
type Synchronizer struct {
ListFunc func() []interface{}
ReconcileFunc func(interface{}) error
}
func (controller *PersistentVolumeClaimBinder) synchronize() {
volumeSynchronizer := Synchronizer{
ListFunc: controller.volumeStore.List,
ReconcileFunc: controller.syncPersistentVolume,
}
claimsSynchronizer := Synchronizer{
ListFunc: controller.claimStore.List,
ReconcileFunc: controller.syncPersistentVolumeClaim,
}
controller.reconcile(volumeSynchronizer, claimsSynchronizer)
}
func (controller *PersistentVolumeClaimBinder) reconcile(synchronizers ...Synchronizer) {
for _, synchronizer := range synchronizers {
items := synchronizer.ListFunc()
if len(items) == 0 {
continue
}
wg := sync.WaitGroup{}
wg.Add(len(items))
for ix := range items {
go func(ix int) {
defer wg.Done()
obj := items[ix]
glog.V(5).Infof("Reconciliation of %v", obj)
err := synchronizer.ReconcileFunc(obj)
if err != nil {
glog.Errorf("Error reconciling: %v", err)
}
}(ix)
}
wg.Wait()
}
go controller.claimController.Run(make(chan struct{}))
go controller.volumeController.Run(make(chan struct{}))
}
// binderClient abstracts access to PVs and PVCs

View File

@ -144,6 +144,54 @@ func TestExampleObjects(t *testing.T) {
}
}
func TestRequiresUpdate(t *testing.T) {
old := &api.PersistentVolume{
Spec: api.PersistentVolumeSpec{
AccessModes: []api.AccessModeType{api.ReadWriteOnce},
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("5Gi"),
},
PersistentVolumeSource: api.PersistentVolumeSource{
HostPath: &api.HostPathVolumeSource{
Path: "/tmp/data02",
},
},
},
}
new := &api.PersistentVolume{
Spec: api.PersistentVolumeSpec{
AccessModes: []api.AccessModeType{api.ReadWriteOnce},
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("5Gi"),
},
PersistentVolumeSource: api.PersistentVolumeSource{
HostPath: &api.HostPathVolumeSource{
Path: "/tmp/data02",
},
},
ClaimRef: &api.ObjectReference{Name: "foo"},
},
}
if !updateRequired(old, new) {
t.Errorf("Update expected for the new volume with added ClaimRef")
}
old.Spec.ClaimRef = new.Spec.ClaimRef
old.Status.Phase = api.VolumeBound
if !updateRequired(old, new) {
t.Errorf("Update expected for the new volume with added Status")
}
new.Status.Phase = old.Status.Phase
if updateRequired(old, new) {
t.Errorf("No updated expected for identical objects")
}
}
func TestBindingWithExamples(t *testing.T) {
api.ForTesting_ReferencesAllowBlankSelfLinks = true
@ -167,33 +215,39 @@ func TestBindingWithExamples(t *testing.T) {
t.Error("Unexpected error getting PVC from client: %v", err)
}
volumeIndex := NewPersistentVolumeOrderedIndex()
mockClient := &mockBinderClient{
volume: pv,
claim: claim,
}
controller := PersistentVolumeClaimBinder{
volumeStore: NewPersistentVolumeOrderedIndex(),
client: mockClient,
}
controller.volumeStore.Add(pv)
controller.syncPersistentVolume(pv)
volumeIndex.Add(pv)
syncVolume(mockClient, pv)
if pv.Status.Phase != api.VolumeAvailable {
t.Errorf("Expected phase %s but got %s", api.VolumeBound, pv.Status.Phase)
}
controller.syncPersistentVolumeClaim(claim)
if pv.Status.Phase != api.VolumeBound {
t.Errorf("Expected phase %s but got %s", api.VolumeBound, pv.Status.Phase)
if pv.Spec.ClaimRef != nil {
t.Errorf("Expected nil ClaimRef but got %+v\n", pv.Spec.ClaimRef)
}
syncClaim(volumeIndex, mockClient, claim)
if pv.Spec.ClaimRef == nil {
t.Errorf("Expected ClaimRef but got nil for volume: %+v\n", pv)
}
syncVolume(mockClient, pv)
if claim.Status.VolumeRef == nil {
t.Error("Expected claim to be bound to volume")
}
if pv.Status.Phase != api.VolumeBound {
t.Errorf("Expected phase %s but got %s", api.VolumeBound, pv.Status.Phase)
}
if claim.Status.Phase != api.ClaimBound {
t.Errorf("Expected phase %s but got %s", api.ClaimBound, claim.Status.Phase)
}
@ -206,8 +260,8 @@ func TestBindingWithExamples(t *testing.T) {
// pretend the user deleted their claim
mockClient.claim = nil
syncVolume(mockClient, pv)
controller.syncPersistentVolume(pv)
if pv.Status.Phase != api.VolumeReleased {
t.Errorf("Expected phase %s but got %s", api.VolumeReleased, pv.Status.Phase)
}