narrowed client interface to allow easier testing. added PVC deletion test case.

pull/6/head
markturansky 2015-04-16 22:09:20 -04:00
parent f26df6a983
commit e1b885c9ad
2 changed files with 125 additions and 23 deletions

View File

@ -36,7 +36,8 @@ import (
type PersistentVolumeClaimBinder struct {
volumeStore *persistentVolumeOrderedIndex
claimStore cache.Store
client client.Interface
client binderClient
// protects access to binding
lock sync.RWMutex
}
@ -78,7 +79,7 @@ func NewPersistentVolumeClaimBinder(kubeClient client.Interface) *PersistentVolu
binder := &PersistentVolumeClaimBinder{
volumeStore: volumeStore,
claimStore: claimStore,
client: kubeClient,
client: NewBinderClient(kubeClient),
}
return binder
@ -87,31 +88,37 @@ func NewPersistentVolumeClaimBinder(kubeClient client.Interface) *PersistentVolu
// syncPersistentVolume inspects all bound PVs to determine if their bound PersistentVolumeClaim still exists.
func (controller *PersistentVolumeClaimBinder) syncPersistentVolume(obj interface{}) error {
volume := obj.(*api.PersistentVolume)
glog.V(5).Infof("Synchronizing PersistentVolume[%s]%s\n", volume.Name)
glog.V(5).Infof("Synchronizing PersistentVolume[%s]\n", volume.Name)
if volume.Spec.ClaimRef != nil {
if volume.Status.Phase == api.VolumeAvailable {
volume.Status.Phase = api.VolumeBound
_, err := controller.client.PersistentVolumes().Update(volume)
_, err := controller.client.UpdatePersistentVolumeStatus(volume)
if err != nil {
return fmt.Errorf("Error updating pv.status: %v\n", err)
}
}
// verify the volume is still claimed by a user
if claim, err := controller.client.PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(volume.Spec.ClaimRef.Name); err == nil {
if claim, err := controller.client.GetPersistentVolumeClaim(volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name); err == nil {
glog.V(5).Infof("PersistentVolume[%s] is bound to PersistentVolumeClaim[%s]\n", volume.Name, volume.Spec.ClaimRef.Name)
controller.syncPersistentVolumeClaimStatus(volume, claim)
} else {
//claim was deleted by user.
glog.V(3).Infof("PersistentVolumeClaim[%s] unbound from PersistentVolume[%s]\n", volume.Spec.ClaimRef.Name, volume.Name)
// volume.Spec.ClaimRef is deliberately left non-nil so that another process can recycle the newly release volume
// volume.Spec.ClaimRef is deliberately left non-nil so that another process can recycle the newly released volume
volume.Status.Phase = api.VolumeReleased
volume, err = controller.client.PersistentVolumes().UpdateStatus(volume)
volume, err = controller.client.UpdatePersistentVolumeStatus(volume)
if err != nil {
return fmt.Errorf("Error updating pv: %+v\n", err)
}
}
} else {
volume.Status.Phase = api.VolumeAvailable
_, err := controller.client.UpdatePersistentVolumeStatus(volume)
if err != nil {
return fmt.Errorf("Error updating pv.status: %v\n", err)
}
}
return nil
}
@ -141,7 +148,7 @@ func (controller *PersistentVolumeClaimBinder) syncPersistentVolumeClaim(obj int
// make a binding reference to the claim
pv.Spec.ClaimRef = claimRef
pv, err = controller.client.PersistentVolumes().Update(pv)
pv, err = controller.client.UpdatePersistentVolume(pv)
if err != nil {
// volume no longer bound
@ -157,6 +164,11 @@ func (controller *PersistentVolumeClaimBinder) syncPersistentVolumeClaim(obj int
}
} else {
glog.V(5).Infof("No volume match found for PersistentVolumeClaim[%s]\n", claim.UID)
claim.Status.Phase = api.ClaimPending
_, err := controller.client.UpdatePersistentVolumeClaimStatus(claim)
if err != nil {
return fmt.Errorf("Error updating pvclaim.status: %v\n", err)
}
}
return nil
}
@ -174,7 +186,7 @@ func (controller *PersistentVolumeClaimBinder) syncPersistentVolumeClaimStatus(v
claim.Status.AccessModes = volume.Spec.AccessModes
claim.Status.Capacity = volume.Spec.Capacity
_, err = controller.client.PersistentVolumeClaims(claim.Namespace).UpdateStatus(claim)
_, err = controller.client.UpdatePersistentVolumeClaimStatus(claim)
if err != nil {
claim.Status.Phase = api.ClaimPending
@ -192,7 +204,8 @@ func (controller *PersistentVolumeClaimBinder) Run(period time.Duration) {
}
// Synchronizer is a generic List/ProcessFunc used by the Reconcile function & reconciliation loop,
// because we're reconciling two Kinds in this component and I didn't want to dupe the loop
// because we're reconciling two Kinds in this component and don't want to dupe the loop
// TODO MarkT - refactor to new DeltaFifo and new controller framework
type Synchronizer struct {
ListFunc func() []interface{}
ReconcileFunc func(interface{}) error
@ -221,7 +234,7 @@ func (controller *PersistentVolumeClaimBinder) reconcile(synchronizers ...Synchr
wg := sync.WaitGroup{}
wg.Add(len(items))
for ix := range items {
func(ix int) {
go func(ix int) {
defer wg.Done()
obj := items[ix]
glog.V(5).Infof("Reconciliation of %v", obj)
@ -234,3 +247,45 @@ func (controller *PersistentVolumeClaimBinder) reconcile(synchronizers ...Synchr
wg.Wait()
}
}
// binderClient abstracts access to PVs and PVCs
type binderClient interface {
GetPersistentVolume(name string) (*api.PersistentVolume, error)
UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error)
UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error)
GetPersistentVolumeClaim(namespace, name string) (*api.PersistentVolumeClaim, error)
UpdatePersistentVolumeClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error)
UpdatePersistentVolumeClaimStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error)
}
func NewBinderClient(c client.Interface) binderClient {
return &realBinderClient{c}
}
type realBinderClient struct {
client client.Interface
}
func (c *realBinderClient) GetPersistentVolume(name string) (*api.PersistentVolume, error) {
return c.client.PersistentVolumes().Get(name)
}
func (c *realBinderClient) UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
return c.client.PersistentVolumes().Update(volume)
}
func (c *realBinderClient) UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
return c.client.PersistentVolumes().UpdateStatus(volume)
}
func (c *realBinderClient) GetPersistentVolumeClaim(namespace, name string) (*api.PersistentVolumeClaim, error) {
return c.client.PersistentVolumeClaims(namespace).Get(name)
}
func (c *realBinderClient) UpdatePersistentVolumeClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) {
return c.client.PersistentVolumeClaims(claim.Namespace).Update(claim)
}
func (c *realBinderClient) UpdatePersistentVolumeClaimStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) {
return c.client.PersistentVolumeClaims(claim.Namespace).UpdateStatus(claim)
}

View File

@ -20,6 +20,7 @@ import (
"reflect"
"testing"
"fmt"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
@ -166,19 +167,27 @@ func TestBindingWithExamples(t *testing.T) {
t.Error("Unexpected error getting PVC from client: %v", err)
}
controller := NewPersistentVolumeClaimBinder(client)
err = controller.volumeStore.Add(pv)
if err != nil {
t.Error("Unexpected error: %v", err)
mockClient := &mockBinderClient{
volume: pv,
claim: claim,
}
if _, exists, _ := controller.volumeStore.Get(pv); !exists {
t.Error("Expected to find volume in the index")
controller := PersistentVolumeClaimBinder{
volumeStore: NewPersistentVolumeOrderedIndex(),
client: mockClient,
}
err = controller.syncPersistentVolumeClaim(claim)
if err != nil {
t.Error("Unexpected error: %v", err)
controller.volumeStore.Add(pv)
controller.syncPersistentVolume(pv)
if pv.Status.Phase != api.VolumeAvailable {
t.Errorf("Expected phase %s but got %s", api.VolumeBound, pv.Status.Phase)
}
controller.syncPersistentVolumeClaim(claim)
if pv.Status.Phase != api.VolumeBound {
t.Errorf("Expected phase %s but got %s", api.VolumeBound, pv.Status.Phase)
}
if claim.Status.VolumeRef == nil {
@ -192,9 +201,47 @@ func TestBindingWithExamples(t *testing.T) {
t.Errorf("Expected phase %s but got %s", api.ClaimBound, claim.Status.Phase)
}
if claim.Status.AccessModes[0] != pv.Spec.AccessModes[0] {
t.Errorf("Expected phase %s but got %s", api.ClaimBound, claim.Status.Phase)
t.Errorf("Expected access mode %s but got %s", claim.Status.AccessModes[0], pv.Spec.AccessModes[0])
}
if claim.Status.Phase != api.ClaimBound {
t.Errorf("Expected phase %s but got %s", api.ClaimBound, claim.Status.Phase)
// pretend the user deleted their claim
mockClient.claim = nil
controller.syncPersistentVolume(pv)
if pv.Status.Phase != api.VolumeReleased {
t.Errorf("Expected phase %s but got %s", api.VolumeReleased, pv.Status.Phase)
}
}
type mockBinderClient struct {
volume *api.PersistentVolume
claim *api.PersistentVolumeClaim
}
func (c *mockBinderClient) GetPersistentVolume(name string) (*api.PersistentVolume, error) {
return c.volume, nil
}
func (c *mockBinderClient) UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
return volume, nil
}
func (c *mockBinderClient) UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
return volume, nil
}
func (c *mockBinderClient) GetPersistentVolumeClaim(namespace, name string) (*api.PersistentVolumeClaim, error) {
if c.claim != nil {
return c.claim, nil
} else {
return nil, fmt.Errorf("Claim does not exist")
}
}
func (c *mockBinderClient) UpdatePersistentVolumeClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) {
return claim, nil
}
func (c *mockBinderClient) UpdatePersistentVolumeClaimStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) {
return claim, nil
}