diff --git a/pkg/api/testing/fuzzer.go b/pkg/api/testing/fuzzer.go index 774bda50e8..e62d790a2a 100644 --- a/pkg/api/testing/fuzzer.go +++ b/pkg/api/testing/fuzzer.go @@ -201,6 +201,14 @@ func FuzzerFor(t *testing.T, version string, src rand.Source) *fuzz.Fuzzer { c.FuzzNoCustom(s) // fuzz self without calling this function again s.Type = api.SecretTypeOpaque }, + func(pv *api.PersistentVolume, c fuzz.Continue) { + c.FuzzNoCustom(pv) // fuzz self without calling this function again + pv.Status.Phase = api.VolumePending + }, + func(pvc *api.PersistentVolumeClaim, c fuzz.Continue) { + c.FuzzNoCustom(pvc) // fuzz self without calling this function again + pvc.Status.Phase = api.ClaimPending + }, func(s *api.NamespaceSpec, c fuzz.Continue) { s.Finalizers = []api.FinalizerName{api.FinalizerKubernetes} }, diff --git a/pkg/api/types.go b/pkg/api/types.go index ccbd8d2957..f80f7fb332 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -317,6 +317,8 @@ const ( type PersistentVolumePhase string const ( + // used for PersistentVolumes that are not available + VolumePending PersistentVolumePhase = "Pending" // used for PersistentVolumes that are not yet bound VolumeAvailable PersistentVolumePhase = "Available" // used for PersistentVolumes that are bound diff --git a/pkg/api/v1beta1/defaults.go b/pkg/api/v1beta1/defaults.go index 4d8a9b9cdf..d56ba77090 100644 --- a/pkg/api/v1beta1/defaults.go +++ b/pkg/api/v1beta1/defaults.go @@ -111,6 +111,16 @@ func init() { obj.Type = SecretTypeOpaque } }, + func(obj *PersistentVolume) { + if obj.Status.Phase == "" { + obj.Status.Phase = VolumePending + } + }, + func(obj *PersistentVolumeClaim) { + if obj.Status.Phase == "" { + obj.Status.Phase = ClaimPending + } + }, func(obj *Endpoints) { if obj.Protocol == "" { obj.Protocol = ProtocolTCP diff --git a/pkg/api/v1beta1/defaults_test.go b/pkg/api/v1beta1/defaults_test.go index 3f304d1a51..5822d69bc0 100644 --- a/pkg/api/v1beta1/defaults_test.go +++ b/pkg/api/v1beta1/defaults_test.go @@ -164,6 +164,26 @@ func TestSetDefaultSecret(t *testing.T) { } } +func TestSetDefaultPersistentVolume(t *testing.T) { + pv := ¤t.PersistentVolume{} + obj2 := roundTrip(t, runtime.Object(pv)) + pv2 := obj2.(*current.PersistentVolume) + + if pv2.Status.Phase != current.VolumePending { + t.Errorf("Expected volume phase %v, got %v", current.VolumePending, pv2.Status.Phase) + } +} + +func TestSetDefaultPersistentVolumeClaim(t *testing.T) { + pvc := ¤t.PersistentVolumeClaim{} + obj2 := roundTrip(t, runtime.Object(pvc)) + pvc2 := obj2.(*current.PersistentVolumeClaim) + + if pvc2.Status.Phase != current.ClaimPending { + t.Errorf("Expected claim phase %v, got %v", current.ClaimPending, pvc2.Status.Phase) + } +} + // Test that we use "legacy" fields if "modern" fields are not provided. func TestSetDefaulEndpointsLegacy(t *testing.T) { in := ¤t.Endpoints{ diff --git a/pkg/api/v1beta1/types.go b/pkg/api/v1beta1/types.go index bf977cbac7..3480b530cd 100644 --- a/pkg/api/v1beta1/types.go +++ b/pkg/api/v1beta1/types.go @@ -229,6 +229,8 @@ const ( type PersistentVolumePhase string const ( + // used for PersistentVolumes that are not available + VolumePending PersistentVolumePhase = "Pending" // used for PersistentVolumes that are not yet bound VolumeAvailable PersistentVolumePhase = "Available" // used for PersistentVolumes that are bound diff --git a/pkg/api/v1beta2/defaults.go b/pkg/api/v1beta2/defaults.go index 27516e4b61..f8830ed46e 100644 --- a/pkg/api/v1beta2/defaults.go +++ b/pkg/api/v1beta2/defaults.go @@ -112,6 +112,16 @@ func init() { obj.Type = SecretTypeOpaque } }, + func(obj *PersistentVolume) { + if obj.Status.Phase == "" { + obj.Status.Phase = VolumePending + } + }, + func(obj *PersistentVolumeClaim) { + if obj.Status.Phase == "" { + obj.Status.Phase = ClaimPending + } + }, func(obj *Endpoints) { if obj.Protocol == "" { obj.Protocol = ProtocolTCP diff --git a/pkg/api/v1beta2/defaults_test.go b/pkg/api/v1beta2/defaults_test.go index 112a362c9e..0c9b7173da 100644 --- a/pkg/api/v1beta2/defaults_test.go +++ b/pkg/api/v1beta2/defaults_test.go @@ -154,6 +154,26 @@ func TestSetDefaultService(t *testing.T) { } } +func TestSetDefaultPersistentVolume(t *testing.T) { + pv := ¤t.PersistentVolume{} + obj2 := roundTrip(t, runtime.Object(pv)) + pv2 := obj2.(*current.PersistentVolume) + + if pv2.Status.Phase != current.VolumePending { + t.Errorf("Expected volume phase %v, got %v", current.VolumePending, pv2.Status.Phase) + } +} + +func TestSetDefaultPersistentVolumeClaim(t *testing.T) { + pvc := ¤t.PersistentVolumeClaim{} + obj2 := roundTrip(t, runtime.Object(pvc)) + pvc2 := obj2.(*current.PersistentVolumeClaim) + + if pvc2.Status.Phase != current.ClaimPending { + t.Errorf("Expected claim phase %v, got %v", current.ClaimPending, pvc2.Status.Phase) + } +} + func TestSetDefaultSecret(t *testing.T) { s := ¤t.Secret{} obj2 := roundTrip(t, runtime.Object(s)) diff --git a/pkg/api/v1beta2/types.go b/pkg/api/v1beta2/types.go index 8b86646ec3..59f2e52732 100644 --- a/pkg/api/v1beta2/types.go +++ b/pkg/api/v1beta2/types.go @@ -198,6 +198,8 @@ const ( type PersistentVolumePhase string const ( + // used for PersistentVolumes that are not available + VolumePending PersistentVolumePhase = "Pending" // used for PersistentVolumes that are not yet bound VolumeAvailable PersistentVolumePhase = "Available" // used for PersistentVolumes that are bound diff --git a/pkg/api/v1beta3/defaults.go b/pkg/api/v1beta3/defaults.go index a5a21b61a2..9a3e859b11 100644 --- a/pkg/api/v1beta3/defaults.go +++ b/pkg/api/v1beta3/defaults.go @@ -102,6 +102,16 @@ func init() { obj.Type = SecretTypeOpaque } }, + func(obj *PersistentVolume) { + if obj.Status.Phase == "" { + obj.Status.Phase = VolumePending + } + }, + func(obj *PersistentVolumeClaim) { + if obj.Status.Phase == "" { + obj.Status.Phase = ClaimPending + } + }, func(obj *Endpoints) { for i := range obj.Subsets { ss := &obj.Subsets[i] diff --git a/pkg/api/v1beta3/defaults_test.go b/pkg/api/v1beta3/defaults_test.go index 1d5e07b89f..bbd7d3ba81 100644 --- a/pkg/api/v1beta3/defaults_test.go +++ b/pkg/api/v1beta3/defaults_test.go @@ -174,6 +174,26 @@ func TestSetDefaultSecret(t *testing.T) { } } +func TestSetDefaultPersistentVolume(t *testing.T) { + pv := ¤t.PersistentVolume{} + obj2 := roundTrip(t, runtime.Object(pv)) + pv2 := obj2.(*current.PersistentVolume) + + if pv2.Status.Phase != current.VolumePending { + t.Errorf("Expected volume phase %v, got %v", current.VolumePending, pv2.Status.Phase) + } +} + +func TestSetDefaultPersistentVolumeClaim(t *testing.T) { + pvc := ¤t.PersistentVolumeClaim{} + obj2 := roundTrip(t, runtime.Object(pvc)) + pvc2 := obj2.(*current.PersistentVolumeClaim) + + if pvc2.Status.Phase != current.ClaimPending { + t.Errorf("Expected claim phase %v, got %v", current.ClaimPending, pvc2.Status.Phase) + } +} + func TestSetDefaulEndpointsProtocol(t *testing.T) { in := ¤t.Endpoints{Subsets: []current.EndpointSubset{ {Ports: []current.EndpointPort{{}, {Protocol: "UDP"}, {}}}, diff --git a/pkg/api/v1beta3/types.go b/pkg/api/v1beta3/types.go index 4c2a62a61c..a252437c58 100644 --- a/pkg/api/v1beta3/types.go +++ b/pkg/api/v1beta3/types.go @@ -334,6 +334,8 @@ const ( type PersistentVolumePhase string const ( + // used for PersistentVolumes that are not available + VolumePending PersistentVolumePhase = "Pending" // used for PersistentVolumes that are not yet bound VolumeAvailable PersistentVolumePhase = "Available" // used for PersistentVolumes that are bound diff --git a/pkg/registry/persistentvolume/etcd/etcd_test.go b/pkg/registry/persistentvolume/etcd/etcd_test.go index 8a83922205..5ad2ebf6d6 100644 --- a/pkg/registry/persistentvolume/etcd/etcd_test.go +++ b/pkg/registry/persistentvolume/etcd/etcd_test.go @@ -58,6 +58,9 @@ func validNewPersistentVolume(name string) *api.PersistentVolume { HostPath: &api.HostPathVolumeSource{Path: "/foo"}, }, }, + Status: api.PersistentVolumeStatus{ + Phase: api.VolumePending, + }, } return pv } diff --git a/pkg/registry/persistentvolumeclaim/etcd/etcd_test.go b/pkg/registry/persistentvolumeclaim/etcd/etcd_test.go index e3c44ce4ee..4307724d02 100644 --- a/pkg/registry/persistentvolumeclaim/etcd/etcd_test.go +++ b/pkg/registry/persistentvolumeclaim/etcd/etcd_test.go @@ -59,6 +59,9 @@ func validNewPersistentVolumeClaim(name, ns string) *api.PersistentVolumeClaim { }, }, }, + Status: api.PersistentVolumeClaimStatus{ + Phase: api.ClaimPending, + }, } return pv } diff --git a/pkg/volumeclaimbinder/persistent_volume_claim_binder.go b/pkg/volumeclaimbinder/persistent_volume_claim_binder.go index 90576f7491..bc097f260f 100644 --- a/pkg/volumeclaimbinder/persistent_volume_claim_binder.go +++ b/pkg/volumeclaimbinder/persistent_volume_claim_binder.go @@ -18,10 +18,11 @@ package volumeclaimbinder import ( "fmt" - "reflect" + "sync" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" "github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework" @@ -40,12 +41,17 @@ type PersistentVolumeClaimBinder struct { claimController *framework.Controller client binderClient stopChannels map[string]chan struct{} + lock sync.RWMutex } // NewPersistentVolumeClaimBinder creates a new PersistentVolumeClaimBinder func NewPersistentVolumeClaimBinder(kubeClient client.Interface, syncPeriod time.Duration) *PersistentVolumeClaimBinder { volumeIndex := NewPersistentVolumeOrderedIndex() binderClient := NewBinderClient(kubeClient) + binder := &PersistentVolumeClaimBinder{ + volumeIndex: volumeIndex, + client: binderClient, + } _, volumeController := framework.NewInformer( &cache.ListWatch{ @@ -59,23 +65,9 @@ func NewPersistentVolumeClaimBinder(kubeClient client.Interface, syncPeriod time &api.PersistentVolume{}, syncPeriod, framework.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - volume := obj.(*api.PersistentVolume) - volumeIndex.Indexer.Add(volume) - syncVolume(binderClient, volume) - }, - UpdateFunc: func(oldObj, newObj interface{}) { - oldVolume := oldObj.(*api.PersistentVolume) - newVolume := newObj.(*api.PersistentVolume) - volumeIndex.Indexer.Update(newVolume) - if updateRequired(oldVolume, newVolume) { - syncVolume(binderClient, newVolume) - } - }, - DeleteFunc: func(obj interface{}) { - volume := obj.(*api.PersistentVolume) - volumeIndex.Indexer.Delete(volume) - }, + AddFunc: binder.addVolume, + UpdateFunc: binder.updateVolume, + DeleteFunc: binder.deleteVolume, }, ) _, claimController := framework.NewInformer( @@ -90,75 +82,186 @@ func NewPersistentVolumeClaimBinder(kubeClient client.Interface, syncPeriod time &api.PersistentVolumeClaim{}, syncPeriod, framework.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - claim := obj.(*api.PersistentVolumeClaim) - syncClaim(volumeIndex, binderClient, claim) - }, - UpdateFunc: func(oldObj, newObj interface{}) { - newClaim := newObj.(*api.PersistentVolumeClaim) - if newClaim.Status.VolumeRef == nil { - syncClaim(volumeIndex, binderClient, newClaim) - } - }, + AddFunc: binder.addClaim, + UpdateFunc: binder.updateClaim, }, ) - binder := &PersistentVolumeClaimBinder{ - volumeController: volumeController, - claimController: claimController, - volumeIndex: volumeIndex, - client: binderClient, - } + binder.claimController = claimController + binder.volumeController = volumeController return binder } -func updateRequired(oldVolume, newVolume *api.PersistentVolume) bool { - // Spec changes affect indexing and sorting volumes - if !reflect.DeepEqual(oldVolume.Spec, newVolume.Spec) { - return true - } - if !reflect.DeepEqual(oldVolume.Status, newVolume.Status) { - return true - } - return false +func (binder *PersistentVolumeClaimBinder) addVolume(obj interface{}) { + binder.lock.Lock() + defer binder.lock.Unlock() + volume := obj.(*api.PersistentVolume) + syncVolume(binder.volumeIndex, binder.client, volume) } -func syncVolume(binderClient binderClient, volume *api.PersistentVolume) (err error) { +func (binder *PersistentVolumeClaimBinder) updateVolume(oldObj, newObj interface{}) { + binder.lock.Lock() + defer binder.lock.Unlock() + newVolume := newObj.(*api.PersistentVolume) + binder.volumeIndex.Update(newVolume) + syncVolume(binder.volumeIndex, binder.client, newVolume) +} + +func (binder *PersistentVolumeClaimBinder) deleteVolume(obj interface{}) { + binder.lock.Lock() + defer binder.lock.Unlock() + volume := obj.(*api.PersistentVolume) + binder.volumeIndex.Delete(volume) +} + +func (binder *PersistentVolumeClaimBinder) addClaim(obj interface{}) { + binder.lock.Lock() + defer binder.lock.Unlock() + claim := obj.(*api.PersistentVolumeClaim) + syncClaim(binder.volumeIndex, binder.client, claim) +} + +func (binder *PersistentVolumeClaimBinder) updateClaim(oldObj, newObj interface{}) { + binder.lock.Lock() + defer binder.lock.Unlock() + newClaim := newObj.(*api.PersistentVolumeClaim) + syncClaim(binder.volumeIndex, binder.client, newClaim) +} + +func syncVolume(volumeIndex *persistentVolumeOrderedIndex, binderClient binderClient, volume *api.PersistentVolume) (err error) { glog.V(5).Infof("Synchronizing PersistentVolume[%s]\n", volume.Name) - if volume.Spec.ClaimRef != nil { - if volume.Status.Phase == api.VolumeAvailable { - volume.Status.Phase = api.VolumeBound - _, err := binderClient.UpdatePersistentVolumeStatus(volume) - if err != nil { - return fmt.Errorf("Error updating pv.status: %v\n", err) + // volumes can be in one of the following states: + // + // VolumePending -- default value -- not bound to a claim and not yet processed through this controller. + // VolumeAvailable -- not bound to a claim, but processed at least once and found in this controller's volumeIndex. + // VolumeBound -- bound to a claim because volume.Spec.ClaimRef != nil. Claim status may not be correct. + // VolumeReleased -- volume.Spec.ClaimRef != nil but the claim has been deleted by the user. + currentPhase := volume.Status.Phase + nextPhase := currentPhase + + switch currentPhase { + // pending volumes are available only after indexing in order to be matched to claims. + case api.VolumePending: + _, exists, err := volumeIndex.Get(volume) + if err != nil { + return err + } + if !exists { + volumeIndex.Add(volume) + } + glog.V(5).Infof("PersistentVolume[%s] is now available\n", volume.Name) + nextPhase = api.VolumeAvailable + + // available volumes await a claim + case api.VolumeAvailable: + if volume.Spec.ClaimRef != nil { + _, err := binderClient.GetPersistentVolumeClaim(volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name) + if err == nil { + // change of phase will trigger an update event with the newly bound volume + glog.V(5).Infof("PersistentVolume[%s] is now bound\n", volume.Name) + nextPhase = api.VolumeBound + } else { + if errors.IsNotFound(err) { + nextPhase = api.VolumeReleased + } } } + //bound volumes require verification of their bound claims + case api.VolumeBound: + if volume.Spec.ClaimRef == nil { + return fmt.Errorf("PersistentVolume[%s] expected to be bound but found nil claimRef: %+v", volume) + } else { + claim, err := binderClient.GetPersistentVolumeClaim(volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name) + if err == nil { + // bound and active. Build claim status as needed. + if claim.Status.VolumeRef == nil { + syncClaimStatus(binderClient, volume, claim) + } + } else { + if errors.IsNotFound(err) { + nextPhase = api.VolumeReleased + } else { + return err + } + } + } + // released volumes require recycling + case api.VolumeReleased: + if volume.Spec.ClaimRef == nil { + return fmt.Errorf("PersistentVolume[%s] expected to be bound but found nil claimRef: %+v", volume) + } else { + // TODO: implement Recycle method on plugins + } + } + + if currentPhase != nextPhase { + volume.Status.Phase = nextPhase + volume, err := binderClient.UpdatePersistentVolumeStatus(volume) + if err != nil { + // Rollback to previous phase + volume.Status.Phase = currentPhase + } + volumeIndex.Update(volume) + } + + return nil +} + +func syncClaim(volumeIndex *persistentVolumeOrderedIndex, binderClient binderClient, claim *api.PersistentVolumeClaim) (err error) { + glog.V(5).Infof("Synchronizing PersistentVolumeClaim[%s]\n", claim.Name) + + // claims can be in one of the following states: + // + // ClaimPending -- default value -- not bound to a claim. A volume that matches the claim may not exist. + // ClaimBound -- bound to a volume. claim.Status.VolumeRef != nil + currentPhase := claim.Status.Phase + nextPhase := currentPhase + + switch currentPhase { + + // pending claims await a matching volume + case api.ClaimPending: + volume, err := volumeIndex.FindBestMatchForClaim(claim) + if err != nil { + return err + } + if volume == nil { + return fmt.Errorf("A volume match does not exist for persistent claim: %s", claim.Name) + } - // verify the volume is still claimed by a user - if claim, err := binderClient.GetPersistentVolumeClaim(volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name); err == nil { - glog.V(5).Infof("PersistentVolume[%s] is bound to PersistentVolumeClaim[%s]\n", volume.Name, volume.Spec.ClaimRef.Name) - // rebuild the Claim's Status as needed - if claim.Status.VolumeRef == nil { - syncClaimStatus(binderClient, volume, claim) - } - } else { - //claim was deleted by user. - glog.V(3).Infof("PersistentVolumeClaim[%s] unbound from PersistentVolume[%s]\n", volume.Spec.ClaimRef.Name, volume.Name) - // volume.Spec.ClaimRef is deliberately left non-nil so that another process can recycle the newly released volume - volume.Status.Phase = api.VolumeReleased - volume, err = binderClient.UpdatePersistentVolumeStatus(volume) - if err != nil { - return fmt.Errorf("Error updating pv: %+v\n", err) - } - } - } else { - volume.Status.Phase = api.VolumeAvailable - _, err := binderClient.UpdatePersistentVolumeStatus(volume) + claimRef, err := api.GetReference(claim) if err != nil { - return fmt.Errorf("Error updating pv.status: %v\n", err) + return fmt.Errorf("Unexpected error getting claim reference: %v\n", err) } + + // make a binding reference to the claim. + // triggers update of the volume in this controller, which builds claim status + volume.Spec.ClaimRef = claimRef + volume, err = binderClient.UpdatePersistentVolume(volume) + if err == nil { + nextPhase = api.ClaimBound + } + if err != nil { + // Rollback by unsetting the ClaimRef on the volume pointer. + // the volume in the index will be unbound again and ready to be matched. + volume.Spec.ClaimRef = nil + // Rollback by restoring original phase to claim pointer + nextPhase = api.ClaimPending + return fmt.Errorf("Error updating volume: %+v\n", err) + } + + // bound claims requires no maintenance. Deletion by the user is the last lifecycle phase. + case api.ClaimBound: + // This is the end of a claim's lifecycle. + // After claim deletion, a volume is recycled when it verifies its claim is unbound + glog.V(5).Infof("PersistentVolumeClaime[%s] is bound\n", claim.Name) + } + + if currentPhase != nextPhase { + claim.Status.Phase = nextPhase + binderClient.UpdatePersistentVolumeClaimStatus(claim) } return nil } @@ -168,6 +271,7 @@ func syncClaimStatus(binderClient binderClient, volume *api.PersistentVolume, cl if err != nil { return fmt.Errorf("Unexpected error getting volume reference: %v\n", err) } + // all "actuals" are transferred from PV to PVC so the user knows what // type of volume they actually got for their claim claim.Status.Phase = api.ClaimBound @@ -176,63 +280,16 @@ func syncClaimStatus(binderClient binderClient, volume *api.PersistentVolume, cl claim.Status.Capacity = volume.Spec.Capacity _, err = binderClient.UpdatePersistentVolumeClaimStatus(claim) - if err != nil { claim.Status.Phase = api.ClaimPending claim.Status.VolumeRef = nil claim.Status.AccessModes = nil claim.Status.Capacity = nil } - return err } -func syncClaim(volumeIndex *persistentVolumeOrderedIndex, binderClient binderClient, claim *api.PersistentVolumeClaim) (err error) { - glog.V(5).Infof("Synchronizing PersistentVolumeClaim[%s]\n", claim.Name) - - if claim.Status.VolumeRef != nil { - glog.V(5).Infof("PersistentVolumeClaim[%s] is bound to PersistentVolume[%s]\n", claim.Name, claim.Status.VolumeRef.Name) - return nil - } - - volume, err := volumeIndex.FindBestMatchForClaim(claim) - if err != nil { - return err - } - - if volume != nil { - claimRef, err := api.GetReference(claim) - if err != nil { - return fmt.Errorf("Unexpected error getting claim reference: %v\n", err) - } - - // make a binding reference to the claim - volume.Spec.ClaimRef = claimRef - volume, err = binderClient.UpdatePersistentVolume(volume) - - if err != nil { - // volume no longer bound - volume.Spec.ClaimRef = nil - return fmt.Errorf("Error updating volume: %+v\n", err) - } else { - err = syncClaimStatus(binderClient, volume, claim) - if err != nil { - return fmt.Errorf("Error update claim.status: %+v\n", err) - } - } - } else { - glog.V(5).Infof("No volume match found for PersistentVolumeClaim[%s]\n", claim.UID) - if claim.Status.Phase != api.ClaimPending { - claim.Status.Phase = api.ClaimPending - _, err := binderClient.UpdatePersistentVolumeClaimStatus(claim) - if err != nil { - return fmt.Errorf("Error updating pvclaim.status: %v\n", err) - } - } - } - return nil -} - +// Run starts all of this binder's control loops func (controller *PersistentVolumeClaimBinder) Run() { glog.V(5).Infof("Starting PersistentVolumeClaimBinder\n") if controller.stopChannels == nil { @@ -250,6 +307,7 @@ func (controller *PersistentVolumeClaimBinder) Run() { } } +// Stop gracefully shuts down this binder func (controller *PersistentVolumeClaimBinder) Stop() { glog.V(5).Infof("Stopping PersistentVolumeClaimBinder\n") for name, stopChan := range controller.stopChannels { diff --git a/pkg/volumeclaimbinder/persistent_volume_claim_binder_test.go b/pkg/volumeclaimbinder/persistent_volume_claim_binder_test.go index d1999a4a5d..985ec257db 100644 --- a/pkg/volumeclaimbinder/persistent_volume_claim_binder_test.go +++ b/pkg/volumeclaimbinder/persistent_volume_claim_binder_test.go @@ -17,12 +17,12 @@ limitations under the License. package volumeclaimbinder import ( - "fmt" "reflect" "testing" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/testclient" @@ -51,7 +51,6 @@ func TestRunStop(t *testing.T) { } func TestExampleObjects(t *testing.T) { - scenarios := map[string]struct { expected interface{} }{ @@ -167,56 +166,7 @@ func TestExampleObjects(t *testing.T) { } } -func TestRequiresUpdate(t *testing.T) { - old := &api.PersistentVolume{ - Spec: api.PersistentVolumeSpec{ - AccessModes: []api.AccessModeType{api.ReadWriteOnce}, - Capacity: api.ResourceList{ - api.ResourceName(api.ResourceStorage): resource.MustParse("5Gi"), - }, - PersistentVolumeSource: api.PersistentVolumeSource{ - HostPath: &api.HostPathVolumeSource{ - Path: "/tmp/data02", - }, - }, - }, - } - - new := &api.PersistentVolume{ - Spec: api.PersistentVolumeSpec{ - AccessModes: []api.AccessModeType{api.ReadWriteOnce}, - Capacity: api.ResourceList{ - api.ResourceName(api.ResourceStorage): resource.MustParse("5Gi"), - }, - PersistentVolumeSource: api.PersistentVolumeSource{ - HostPath: &api.HostPathVolumeSource{ - Path: "/tmp/data02", - }, - }, - ClaimRef: &api.ObjectReference{Name: "foo"}, - }, - } - - if !updateRequired(old, new) { - t.Errorf("Update expected for the new volume with added ClaimRef") - } - - old.Spec.ClaimRef = new.Spec.ClaimRef - old.Status.Phase = api.VolumeBound - - if !updateRequired(old, new) { - t.Errorf("Update expected for the new volume with added Status") - } - - new.Status.Phase = old.Status.Phase - - if updateRequired(old, new) { - t.Errorf("No updated expected for identical objects") - } -} - func TestBindingWithExamples(t *testing.T) { - api.ForTesting_ReferencesAllowBlankSelfLinks = true o := testclient.NewObjects(api.Scheme) if err := testclient.AddObjectsFromPath("../../examples/persistent-volumes/claims/claim-01.yaml", o); err != nil { @@ -245,7 +195,7 @@ func TestBindingWithExamples(t *testing.T) { } volumeIndex.Add(pv) - syncVolume(mockClient, pv) + syncVolume(volumeIndex, mockClient, pv) if pv.Status.Phase != api.VolumeAvailable { t.Errorf("Expected phase %s but got %s", api.VolumeBound, pv.Status.Phase) @@ -261,10 +211,13 @@ func TestBindingWithExamples(t *testing.T) { t.Errorf("Expected ClaimRef but got nil for volume: %+v\n", pv) } - syncVolume(mockClient, pv) + // first sync verifies the new bound claim, advances state, triggering update + syncVolume(volumeIndex, mockClient, pv) + // second sync verifies claim, sees missing claim status and builds it + syncVolume(volumeIndex, mockClient, pv) if claim.Status.VolumeRef == nil { - t.Error("Expected claim to be bound to volume") + t.Fatalf("Expected claim to be bound to volume") } if pv.Status.Phase != api.VolumeBound { @@ -283,7 +236,7 @@ func TestBindingWithExamples(t *testing.T) { // pretend the user deleted their claim mockClient.claim = nil - syncVolume(mockClient, pv) + syncVolume(volumeIndex, mockClient, pv) if pv.Status.Phase != api.VolumeReleased { t.Errorf("Expected phase %s but got %s", api.VolumeReleased, pv.Status.Phase) @@ -311,7 +264,7 @@ func (c *mockBinderClient) GetPersistentVolumeClaim(namespace, name string) (*ap if c.claim != nil { return c.claim, nil } else { - return nil, fmt.Errorf("Claim does not exist") + return nil, errors.NewNotFound("persistentVolume", name) } } diff --git a/test/integration/persistent_volumes_test.go b/test/integration/persistent_volumes_test.go index 87be82de4e..6801b5ad40 100644 --- a/test/integration/persistent_volumes_test.go +++ b/test/integration/persistent_volumes_test.go @@ -76,8 +76,25 @@ func TestPersistentVolumeClaimBinder(t *testing.T) { t.Errorf("expected 3 PVCs, got %#v", len(claims.Items)) } - // make sure the binder has caught up - time.Sleep(2 * time.Second) + // the binder will eventually catch up and set status on Claims + watch, err := client.PersistentVolumeClaims(api.NamespaceDefault).Watch(labels.Everything(), fields.Everything(), "0") + if err != nil { + t.Fatalf("Couldn't subscribe to PersistentVolumeClaims: %v", err) + } + defer watch.Stop() + + boundCount := 0 + expectedBoundCount := 2 + for { + event := <-watch.ResultChan() + claim := event.Object.(*api.PersistentVolumeClaim) + if claim.Status.VolumeRef != nil { + boundCount++ + } + if boundCount == expectedBoundCount { + break + } + } for _, claim := range createTestClaims() { claim, err := client.PersistentVolumeClaims(api.NamespaceDefault).Get(claim.Name) @@ -86,7 +103,7 @@ func TestPersistentVolumeClaimBinder(t *testing.T) { } if (claim.Name == "claim01" || claim.Name == "claim02") && claim.Status.VolumeRef == nil { - t.Errorf("Expected claim to be bound: %v", claim) + t.Errorf("Expected claim to be bound: %+v", claim) } if claim.Name == "claim03" && claim.Status.VolumeRef != nil { t.Errorf("Expected claim03 to be unbound: %v", claim)