diff --git a/api/swagger-spec/v1.json b/api/swagger-spec/v1.json index d40967a0f5..52b0a3d1f6 100644 --- a/api/swagger-spec/v1.json +++ b/api/swagger-spec/v1.json @@ -10603,6 +10603,10 @@ "claimRef": { "$ref": "v1.ObjectReference", "description": "when bound, a reference to the bound claim" + }, + "persistentVolumeReclaimPolicy": { + "type": "string", + "description": "persistentVolumeReclaimPolicy is what happens to a volume when released from its claim; Default is Retain." } } }, @@ -10807,6 +10811,14 @@ "phase": { "type": "string", "description": "the current phase of a persistent volume" + }, + "message": { + "type": "string", + "description": "human-readable message indicating details about why the volume is in this state" + }, + "reason": { + "type": "string", + "description": "(brief) reason the volume is not is not available, such as failed recycling" } } }, @@ -12041,4 +12053,4 @@ } } } - } \ No newline at end of file + } diff --git a/api/swagger-spec/v1beta3.json b/api/swagger-spec/v1beta3.json index 0944cab641..055242aaac 100644 --- a/api/swagger-spec/v1beta3.json +++ b/api/swagger-spec/v1beta3.json @@ -10600,6 +10600,10 @@ "claimRef": { "$ref": "v1beta3.ObjectReference", "description": "when bound, a reference to the bound claim" + }, + "persistentVolumeReclaimPolicy": { + "type": "string", + "description": "persistentVolumeReclaimPolicy is what happens to a volume when released from its claim; Default is Retain." } } }, @@ -10804,6 +10808,14 @@ "phase": { "type": "string", "description": "the current phase of a persistent volume" + }, + "message": { + "type": "string", + "description": "human-readable message indicating details about why the volume is in this state" + }, + "reason": { + "type": "string", + "description": "(brief) reason the volume is not is not available, such as failed recycling" } } }, @@ -12047,4 +12059,4 @@ } } } - } \ No newline at end of file + } diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 79ca36491b..8d68d457aa 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -234,6 +234,11 @@ func (s *CMServer) Run(_ []string) error { pvclaimBinder := volumeclaimbinder.NewPersistentVolumeClaimBinder(kubeClient, s.PVClaimBinderSyncPeriod) pvclaimBinder.Run() + pvRecycler, err := volumeclaimbinder.NewPersistentVolumeRecycler(kubeClient, s.PVClaimBinderSyncPeriod, ProbeRecyclableVolumePlugins()) + if err != nil { + glog.Fatalf("Failed to start persistent volume recycler: %+v", err) + } + pvRecycler.Run() if len(s.ServiceAccountKeyFile) > 0 { privateKey, err := serviceaccount.ReadPrivateKey(s.ServiceAccountKeyFile) diff --git a/cmd/kube-controller-manager/app/plugins.go b/cmd/kube-controller-manager/app/plugins.go index 7f8cc61f5f..cd6b1649b8 100644 --- a/cmd/kube-controller-manager/app/plugins.go +++ b/cmd/kube-controller-manager/app/plugins.go @@ -20,6 +20,8 @@ import ( // This file exists to force the desired plugin implementations to be linked. // This should probably be part of some configuration fed into the build for a // given binary target. + + //Cloud providers _ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/aws" _ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/gce" _ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/mesos" @@ -27,4 +29,21 @@ import ( _ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/ovirt" _ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/rackspace" _ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/vagrant" + + // Volume plugins + "github.com/GoogleCloudPlatform/kubernetes/pkg/volume" + "github.com/GoogleCloudPlatform/kubernetes/pkg/volume/host_path" + "github.com/GoogleCloudPlatform/kubernetes/pkg/volume/nfs" ) + +// ProbeRecyclableVolumePlugins collects all persistent volume plugins into an easy to use list. +func ProbeRecyclableVolumePlugins() []volume.VolumePlugin { + allPlugins := []volume.VolumePlugin{} + + // The list of plugins to probe is decided by the kubelet binary, not + // by dynamic linking or other "magic". Plugins will be analyzed and + // initialized later. + allPlugins = append(allPlugins, host_path.ProbeVolumePlugins()...) + allPlugins = append(allPlugins, nfs.ProbeVolumePlugins()...) + return allPlugins +} diff --git a/examples/persistent-volumes/volumes/local-02.yaml b/examples/persistent-volumes/volumes/local-02.yaml index 24bca72539..4be4c3ce12 100644 --- a/examples/persistent-volumes/volumes/local-02.yaml +++ b/examples/persistent-volumes/volumes/local-02.yaml @@ -6,8 +6,9 @@ metadata: type: local spec: capacity: - storage: 5Gi + storage: 8Gi accessModes: - ReadWriteOnce hostPath: path: "/tmp/data02" + persistentVolumeReclaimPolicy: Recycle diff --git a/pkg/api/deep_copy_generated.go b/pkg/api/deep_copy_generated.go index 340b13e6e3..a3b2b03f42 100644 --- a/pkg/api/deep_copy_generated.go +++ b/pkg/api/deep_copy_generated.go @@ -1243,11 +1243,14 @@ func deepCopy_api_PersistentVolumeSpec(in PersistentVolumeSpec, out *PersistentV } else { out.ClaimRef = nil } + out.PersistentVolumeReclaimPolicy = in.PersistentVolumeReclaimPolicy return nil } func deepCopy_api_PersistentVolumeStatus(in PersistentVolumeStatus, out *PersistentVolumeStatus, c *conversion.Cloner) error { out.Phase = in.Phase + out.Message = in.Message + out.Reason = in.Reason return nil } diff --git a/pkg/api/testing/fuzzer.go b/pkg/api/testing/fuzzer.go index 5a48b9fbe8..e7e24f1e69 100644 --- a/pkg/api/testing/fuzzer.go +++ b/pkg/api/testing/fuzzer.go @@ -234,8 +234,11 @@ func FuzzerFor(t *testing.T, version string, src rand.Source) *fuzz.Fuzzer { }, func(pv *api.PersistentVolume, c fuzz.Continue) { c.FuzzNoCustom(pv) // fuzz self without calling this function again - types := []api.PersistentVolumePhase{api.VolumePending, api.VolumeBound, api.VolumeReleased, api.VolumeAvailable} + types := []api.PersistentVolumePhase{api.VolumeAvailable, api.VolumePending, api.VolumeBound, api.VolumeReleased, api.VolumeFailed} pv.Status.Phase = types[c.Rand.Intn(len(types))] + pv.Status.Message = c.RandString() + reclamationPolicies := []api.PersistentVolumeReclaimPolicy{api.PersistentVolumeReclaimRecycle, api.PersistentVolumeReclaimRetain} + pv.Spec.PersistentVolumeReclaimPolicy = reclamationPolicies[c.Rand.Intn(len(reclamationPolicies))] }, func(pvc *api.PersistentVolumeClaim, c fuzz.Continue) { c.FuzzNoCustom(pvc) // fuzz self without calling this function again diff --git a/pkg/api/types.go b/pkg/api/types.go index 08238bda00..64e4b99376 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -263,11 +263,33 @@ type PersistentVolumeSpec struct { // ClaimRef is expected to be non-nil when bound. // claim.VolumeName is the authoritative bind between PV and PVC. ClaimRef *ObjectReference `json:"claimRef,omitempty"` + // Optional: what happens to a persistent volume when released from its claim. + PersistentVolumeReclaimPolicy PersistentVolumeReclaimPolicy `json:"persistentVolumeReclaimPolicy,omitempty" description:"what happens to a volume when released from its claim; Valid options are Retain (default) and Recycle. Recyling must be supported by the volume plugin underlying this persistent volume."` } +// PersistentVolumeReclaimPolicy describes a policy for end-of-life maintenance of persistent volumes +type PersistentVolumeReclaimPolicy string + +const ( + // PersistentVolumeReclaimRecycle means the volume will be recycled back into the pool of unbound persistent volumes on release from its claim. + // The volume plugin must support Recycling. + PersistentVolumeReclaimRecycle PersistentVolumeReclaimPolicy = "Recycle" + // PersistentVolumeReclaimDelete means the volume will be deleted from Kubernetes on release from its claim. + // The volume plugin must support Deletion. + // TODO: implement w/ DeletableVolumePlugin + // PersistentVolumeReclaimDelete PersistentVolumeReclaimPolicy = "Delete" + // PersistentVolumeReclaimRetain means the volume will left in its current phase (Released) for manual reclamation by the administrator. + // The default policy is Retain. + PersistentVolumeReclaimRetain PersistentVolumeReclaimPolicy = "Retain" +) + type PersistentVolumeStatus struct { // Phase indicates if a volume is available, bound to a claim, or released by a claim Phase PersistentVolumePhase `json:"phase,omitempty"` + // A human-readable message indicating details about why the volume is in this state. + Message string `json:"message,omitempty"` + // Reason is a brief CamelCase string that describes any failure and is meant for machine parsing and tidy display in the CLI + Reason string `json:"reason,omitempty"` } type PersistentVolumeList struct { @@ -331,12 +353,16 @@ const ( // used for PersistentVolumes that are not available VolumePending PersistentVolumePhase = "Pending" // used for PersistentVolumes that are not yet bound + // Available volumes are held by the binder and matched to PersistentVolumeClaims VolumeAvailable PersistentVolumePhase = "Available" // used for PersistentVolumes that are bound VolumeBound PersistentVolumePhase = "Bound" // used for PersistentVolumes where the bound PersistentVolumeClaim was deleted // released volumes must be recycled before becoming available again + // this phase is used by the persistent volume claim binder to signal to another process to reclaim the resource VolumeReleased PersistentVolumePhase = "Released" + // used for PersistentVolumes that failed to be correctly recycled or deleted after being released from a claim + VolumeFailed PersistentVolumePhase = "Failed" ) type PersistentVolumeClaimPhase string diff --git a/pkg/api/v1/conversion_generated.go b/pkg/api/v1/conversion_generated.go index 2e14255849..e47b53c5a3 100644 --- a/pkg/api/v1/conversion_generated.go +++ b/pkg/api/v1/conversion_generated.go @@ -1358,6 +1358,7 @@ func convert_api_PersistentVolumeSpec_To_v1_PersistentVolumeSpec(in *api.Persist } else { out.ClaimRef = nil } + out.PersistentVolumeReclaimPolicy = PersistentVolumeReclaimPolicy(in.PersistentVolumeReclaimPolicy) return nil } @@ -1366,6 +1367,8 @@ func convert_api_PersistentVolumeStatus_To_v1_PersistentVolumeStatus(in *api.Per defaulting.(func(*api.PersistentVolumeStatus))(in) } out.Phase = PersistentVolumePhase(in.Phase) + out.Message = in.Message + out.Reason = in.Reason return nil } @@ -3663,6 +3666,7 @@ func convert_v1_PersistentVolumeSpec_To_api_PersistentVolumeSpec(in *PersistentV } else { out.ClaimRef = nil } + out.PersistentVolumeReclaimPolicy = api.PersistentVolumeReclaimPolicy(in.PersistentVolumeReclaimPolicy) return nil } @@ -3671,6 +3675,8 @@ func convert_v1_PersistentVolumeStatus_To_api_PersistentVolumeStatus(in *Persist defaulting.(func(*PersistentVolumeStatus))(in) } out.Phase = api.PersistentVolumePhase(in.Phase) + out.Message = in.Message + out.Reason = in.Reason return nil } diff --git a/pkg/api/v1/deep_copy_generated.go b/pkg/api/v1/deep_copy_generated.go index d724895bf8..978d3652be 100644 --- a/pkg/api/v1/deep_copy_generated.go +++ b/pkg/api/v1/deep_copy_generated.go @@ -1174,11 +1174,14 @@ func deepCopy_v1_PersistentVolumeSpec(in PersistentVolumeSpec, out *PersistentVo } else { out.ClaimRef = nil } + out.PersistentVolumeReclaimPolicy = in.PersistentVolumeReclaimPolicy return nil } func deepCopy_v1_PersistentVolumeStatus(in PersistentVolumeStatus, out *PersistentVolumeStatus, c *conversion.Cloner) error { out.Phase = in.Phase + out.Message = in.Message + out.Reason = in.Reason return nil } diff --git a/pkg/api/v1/defaults.go b/pkg/api/v1/defaults.go index cfc8a1fdec..59b0e8dd45 100644 --- a/pkg/api/v1/defaults.go +++ b/pkg/api/v1/defaults.go @@ -113,6 +113,9 @@ func addDefaultingFuncs() { if obj.Status.Phase == "" { obj.Status.Phase = VolumePending } + if obj.Spec.PersistentVolumeReclaimPolicy == "" { + obj.Spec.PersistentVolumeReclaimPolicy = PersistentVolumeReclaimRetain + } }, func(obj *PersistentVolumeClaim) { if obj.Status.Phase == "" { diff --git a/pkg/api/v1/defaults_test.go b/pkg/api/v1/defaults_test.go index a359ee0de1..a7d8d51b4a 100644 --- a/pkg/api/v1/defaults_test.go +++ b/pkg/api/v1/defaults_test.go @@ -258,6 +258,9 @@ func TestSetDefaultPersistentVolume(t *testing.T) { if pv2.Status.Phase != versioned.VolumePending { t.Errorf("Expected volume phase %v, got %v", versioned.VolumePending, pv2.Status.Phase) } + if pv2.Spec.PersistentVolumeReclaimPolicy != versioned.PersistentVolumeReclaimRetain { + t.Errorf("Expected pv reclaim policy %v, got %v", versioned.PersistentVolumeReclaimRetain, pv2.Spec.PersistentVolumeReclaimPolicy) + } } func TestSetDefaultPersistentVolumeClaim(t *testing.T) { diff --git a/pkg/api/v1/types.go b/pkg/api/v1/types.go index 69ea340251..8abc38b22e 100644 --- a/pkg/api/v1/types.go +++ b/pkg/api/v1/types.go @@ -280,11 +280,33 @@ type PersistentVolumeSpec struct { // ClaimRef is expected to be non-nil when bound. // claim.VolumeName is the authoritative bind between PV and PVC. ClaimRef *ObjectReference `json:"claimRef,omitempty" description:"when bound, a reference to the bound claim"` + // Optional: what happens to a persistent volume when released from its claim. + PersistentVolumeReclaimPolicy PersistentVolumeReclaimPolicy `json:"persistentVolumeReclaimPolicy,omitempty" description:"what happens to a volume when released from its claim; Valid options are Retain (default) and Recycle. Recyling must be supported by the volume plugin underlying this persistent volume."` } +// PersistentVolumeReclaimPolicy describes a policy for end-of-life maintenance of persistent volumes +type PersistentVolumeReclaimPolicy string + +const ( + // PersistentVolumeReclaimRecycle means the volume will be recycled back into the pool of unbound persistent volumes on release from its claim. + // The volume plugin must support Recycling. + PersistentVolumeReclaimRecycle PersistentVolumeReclaimPolicy = "Recycle" + // PersistentVolumeReclaimDelete means the volume will be deleted from Kubernetes on release from its claim. + // The volume plugin must support Deletion. + // TODO: implement w/ DeletableVolumePlugin + // PersistentVolumeReclaimDelete PersistentVolumeReclaimPolicy = "Delete" + // PersistentVolumeReclaimRetain means the volume will left in its current phase (Released) for manual reclamation by the administrator. + // The default policy is Retain. + PersistentVolumeReclaimRetain PersistentVolumeReclaimPolicy = "Retain" +) + type PersistentVolumeStatus struct { // Phase indicates if a volume is available, bound to a claim, or released by a claim Phase PersistentVolumePhase `json:"phase,omitempty" description:"the current phase of a persistent volume"` + // A human-readable message indicating details about why the volume is in this state. + Message string `json:"message,omitempty" description:"human-readable message indicating details about why the volume is in this state"` + // Reason is a brief CamelCase string that describes any failure and is meant for machine parsing and tidy display in the CLI + Reason string `json:"reason,omitempty" description:"(brief) reason the volume is not is not available"` } type PersistentVolumeList struct { @@ -348,12 +370,16 @@ const ( // used for PersistentVolumes that are not available VolumePending PersistentVolumePhase = "Pending" // used for PersistentVolumes that are not yet bound + // Available volumes are held by the binder and matched to PersistentVolumeClaims VolumeAvailable PersistentVolumePhase = "Available" // used for PersistentVolumes that are bound VolumeBound PersistentVolumePhase = "Bound" // used for PersistentVolumes where the bound PersistentVolumeClaim was deleted // released volumes must be recycled before becoming available again + // this phase is used by the persistent volume claim binder to signal to another process to reclaim the resource VolumeReleased PersistentVolumePhase = "Released" + // used for PersistentVolumes that failed to be correctly recycled or deleted after being released from a claim + VolumeFailed PersistentVolumePhase = "Failed" ) type PersistentVolumeClaimPhase string diff --git a/pkg/api/v1beta3/conversion_generated.go b/pkg/api/v1beta3/conversion_generated.go index eb8e630ddd..e373b9acfa 100644 --- a/pkg/api/v1beta3/conversion_generated.go +++ b/pkg/api/v1beta3/conversion_generated.go @@ -1216,6 +1216,7 @@ func convert_api_PersistentVolumeSpec_To_v1beta3_PersistentVolumeSpec(in *api.Pe } else { out.ClaimRef = nil } + out.PersistentVolumeReclaimPolicy = PersistentVolumeReclaimPolicy(in.PersistentVolumeReclaimPolicy) return nil } @@ -1224,6 +1225,8 @@ func convert_api_PersistentVolumeStatus_To_v1beta3_PersistentVolumeStatus(in *ap defaulting.(func(*api.PersistentVolumeStatus))(in) } out.Phase = PersistentVolumePhase(in.Phase) + out.Message = in.Message + out.Reason = in.Reason return nil } @@ -3275,6 +3278,7 @@ func convert_v1beta3_PersistentVolumeSpec_To_api_PersistentVolumeSpec(in *Persis } else { out.ClaimRef = nil } + out.PersistentVolumeReclaimPolicy = api.PersistentVolumeReclaimPolicy(in.PersistentVolumeReclaimPolicy) return nil } @@ -3283,6 +3287,8 @@ func convert_v1beta3_PersistentVolumeStatus_To_api_PersistentVolumeStatus(in *Pe defaulting.(func(*PersistentVolumeStatus))(in) } out.Phase = api.PersistentVolumePhase(in.Phase) + out.Message = in.Message + out.Reason = in.Reason return nil } diff --git a/pkg/api/v1beta3/deep_copy_generated.go b/pkg/api/v1beta3/deep_copy_generated.go index 19f5f118c3..441f4ef5ec 100644 --- a/pkg/api/v1beta3/deep_copy_generated.go +++ b/pkg/api/v1beta3/deep_copy_generated.go @@ -1178,11 +1178,14 @@ func deepCopy_v1beta3_PersistentVolumeSpec(in PersistentVolumeSpec, out *Persist } else { out.ClaimRef = nil } + out.PersistentVolumeReclaimPolicy = in.PersistentVolumeReclaimPolicy return nil } func deepCopy_v1beta3_PersistentVolumeStatus(in PersistentVolumeStatus, out *PersistentVolumeStatus, c *conversion.Cloner) error { out.Phase = in.Phase + out.Message = in.Message + out.Reason = in.Reason return nil } diff --git a/pkg/api/v1beta3/defaults.go b/pkg/api/v1beta3/defaults.go index 74f931dd13..5e8793d331 100644 --- a/pkg/api/v1beta3/defaults.go +++ b/pkg/api/v1beta3/defaults.go @@ -117,6 +117,9 @@ func addDefaultingFuncs() { if obj.Status.Phase == "" { obj.Status.Phase = VolumePending } + if obj.Spec.PersistentVolumeReclaimPolicy == "" { + obj.Spec.PersistentVolumeReclaimPolicy = PersistentVolumeReclaimRetain + } }, func(obj *PersistentVolumeClaim) { if obj.Status.Phase == "" { diff --git a/pkg/api/v1beta3/defaults_test.go b/pkg/api/v1beta3/defaults_test.go index 745be4c4f9..058cfcd955 100644 --- a/pkg/api/v1beta3/defaults_test.go +++ b/pkg/api/v1beta3/defaults_test.go @@ -195,6 +195,9 @@ func TestSetDefaultPersistentVolume(t *testing.T) { if pv2.Status.Phase != versioned.VolumePending { t.Errorf("Expected volume phase %v, got %v", versioned.VolumePending, pv2.Status.Phase) } + if pv2.Spec.PersistentVolumeReclaimPolicy != versioned.PersistentVolumeReclaimRetain { + t.Errorf("Expected pv reclaim policy %v, got %v", versioned.PersistentVolumeReclaimRetain, pv2.Spec.PersistentVolumeReclaimPolicy) + } } func TestSetDefaultPersistentVolumeClaim(t *testing.T) { diff --git a/pkg/api/v1beta3/types.go b/pkg/api/v1beta3/types.go index d8cd581362..f3235aaa1a 100644 --- a/pkg/api/v1beta3/types.go +++ b/pkg/api/v1beta3/types.go @@ -280,11 +280,33 @@ type PersistentVolumeSpec struct { // ClaimRef is expected to be non-nil when bound. // claim.VolumeName is the authoritative bind between PV and PVC. ClaimRef *ObjectReference `json:"claimRef,omitempty" description:"when bound, a reference to the bound claim"` + // Optional: what happens to a persistent volume when released from its claim. + PersistentVolumeReclaimPolicy PersistentVolumeReclaimPolicy `json:"persistentVolumeReclaimPolicy,omitempty" description:"what happens to a volume when released from its claim; Valid options are Retain (default) and Recycle. Recyling must be supported by the volume plugin underlying this persistent volume."` } +// PersistentVolumeReclaimPolicy describes a policy for end-of-life maintenance of persistent volumes +type PersistentVolumeReclaimPolicy string + +const ( + // PersistentVolumeReclaimRecycle means the volume will be recycled back into the pool of unbound persistent volumes on release from its claim. + // The volume plugin must support Recycling. + PersistentVolumeReclaimRecycle PersistentVolumeReclaimPolicy = "Recycle" + // PersistentVolumeReclaimDelete means the volume will be deleted from Kubernetes on release from its claim. + // The volume plugin must support Deletion. + // TODO: implement w/ DeletableVolumePlugin + // PersistentVolumeReclaimDelete PersistentVolumeReclaimPolicy = "Delete" + // PersistentVolumeReclaimRetain means the volume will left in its current phase (Released) for manual reclamation by the administrator. + // The default policy is Retain. + PersistentVolumeReclaimRetain PersistentVolumeReclaimPolicy = "Retain" +) + type PersistentVolumeStatus struct { // Phase indicates if a volume is available, bound to a claim, or released by a claim Phase PersistentVolumePhase `json:"phase,omitempty" description:"the current phase of a persistent volume"` + // A human-readable message indicating details about why the volume is in this state. + Message string `json:"message,omitempty" description:"human-readable message indicating details about why the volume is in this state"` + // Reason is a brief CamelCase string that describes any failure and is meant for machine parsing and tidy display in the CLI + Reason string `json:"reason,omitempty" description:"(brief) reason the volume is not is not available"` } type PersistentVolumeList struct { @@ -348,12 +370,16 @@ const ( // used for PersistentVolumes that are not available VolumePending PersistentVolumePhase = "Pending" // used for PersistentVolumes that are not yet bound + // Available volumes are held by the binder and matched to PersistentVolumeClaims VolumeAvailable PersistentVolumePhase = "Available" // used for PersistentVolumes that are bound VolumeBound PersistentVolumePhase = "Bound" // used for PersistentVolumes where the bound PersistentVolumeClaim was deleted // released volumes must be recycled before becoming available again + // this phase is used by the persistent volume claim binder to signal to another process to reclaim the resource VolumeReleased PersistentVolumePhase = "Released" + // used for PersistentVolumes that failed to be correctly recycled or deleted after being released from a claim + VolumeFailed PersistentVolumePhase = "Failed" ) type PersistentVolumeClaimPhase string diff --git a/pkg/client/persistentvolume_test.go b/pkg/client/persistentvolume_test.go index 28777c0f85..d06e56548a 100644 --- a/pkg/client/persistentvolume_test.go +++ b/pkg/client/persistentvolume_test.go @@ -148,7 +148,8 @@ func TestPersistentVolumeStatusUpdate(t *testing.T) { }, }, Status: api.PersistentVolumeStatus{ - Phase: api.VolumeBound, + Phase: api.VolumeBound, + Message: "foo", }, } c := &testClient{ diff --git a/pkg/kubectl/describe.go b/pkg/kubectl/describe.go index a3947047e5..5901441b05 100644 --- a/pkg/kubectl/describe.go +++ b/pkg/kubectl/describe.go @@ -311,6 +311,8 @@ func (d *PersistentVolumeDescriber) Describe(namespace, name string) (string, er } else { fmt.Fprintf(out, "Claim:\t%s\n", "") } + fmt.Fprintf(out, "Reclaim Policy:\t%d\n", pv.Spec.PersistentVolumeReclaimPolicy) + fmt.Fprintf(out, "Message:\t%d\n", pv.Status.Message) return nil }) } diff --git a/pkg/kubectl/resource_printer.go b/pkg/kubectl/resource_printer.go index 90ad20f3ef..e0f17a9ecc 100644 --- a/pkg/kubectl/resource_printer.go +++ b/pkg/kubectl/resource_printer.go @@ -259,7 +259,7 @@ var resourceQuotaColumns = []string{"NAME"} var namespaceColumns = []string{"NAME", "LABELS", "STATUS"} var secretColumns = []string{"NAME", "TYPE", "DATA"} var serviceAccountColumns = []string{"NAME", "SECRETS"} -var persistentVolumeColumns = []string{"NAME", "LABELS", "CAPACITY", "ACCESSMODES", "STATUS", "CLAIM"} +var persistentVolumeColumns = []string{"NAME", "LABELS", "CAPACITY", "ACCESSMODES", "STATUS", "CLAIM", "REASON"} var persistentVolumeClaimColumns = []string{"NAME", "LABELS", "STATUS", "VOLUME"} var componentStatusColumns = []string{"NAME", "STATUS", "MESSAGE", "ERROR"} @@ -732,7 +732,7 @@ func printPersistentVolume(pv *api.PersistentVolume, w io.Writer, withNamespace aQty := pv.Spec.Capacity[api.ResourceStorage] aSize := aQty.Value() - _, err := fmt.Fprintf(w, "%s\t%s\t%d\t%s\t%s\t%s\n", name, formatLabels(pv.Labels), aSize, modesStr, pv.Status.Phase, claimRefUID) + _, err := fmt.Fprintf(w, "%s\t%s\t%d\t%s\t%s\t%s\t%s\n", name, formatLabels(pv.Labels), aSize, modesStr, pv.Status.Phase, claimRefUID, pv.Status.Reason) return err } diff --git a/pkg/registry/persistentvolume/etcd/etcd_test.go b/pkg/registry/persistentvolume/etcd/etcd_test.go index 0ce3d2912a..37afbfd57a 100644 --- a/pkg/registry/persistentvolume/etcd/etcd_test.go +++ b/pkg/registry/persistentvolume/etcd/etcd_test.go @@ -59,9 +59,12 @@ func validNewPersistentVolume(name string) *api.PersistentVolume { PersistentVolumeSource: api.PersistentVolumeSource{ HostPath: &api.HostPathVolumeSource{Path: "/foo"}, }, + PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimRetain, }, Status: api.PersistentVolumeStatus{ - Phase: api.VolumePending, + Phase: api.VolumePending, + Message: "bar", + Reason: "foo", }, } return pv diff --git a/pkg/util/util.go b/pkg/util/util.go index 639ac8f3cf..7ba9c78f3d 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -508,3 +508,11 @@ func GetClient(req *http.Request) string { } return "unknown" } + +func ShortenString(str string, n int) string { + if len(str) <= n { + return str + } else { + return str[:n] + } +} diff --git a/pkg/volume/host_path/host_path.go b/pkg/volume/host_path/host_path.go index 214cd28e4f..a0749b0156 100644 --- a/pkg/volume/host_path/host_path.go +++ b/pkg/volume/host_path/host_path.go @@ -18,23 +18,35 @@ package host_path import ( "fmt" + "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/types" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/mount" "github.com/GoogleCloudPlatform/kubernetes/pkg/volume" ) // This is the primary entrypoint for volume plugins. +// Tests covering recycling should not use this func but instead +// use their own array of plugins w/ a custom recyclerFunc as appropriate func ProbeVolumePlugins() []volume.VolumePlugin { - return []volume.VolumePlugin{&hostPathPlugin{nil}} + return []volume.VolumePlugin{&hostPathPlugin{nil, newRecycler}} +} + +func ProbeRecyclableVolumePlugins(recyclerFunc func(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error)) []volume.VolumePlugin { + return []volume.VolumePlugin{&hostPathPlugin{nil, recyclerFunc}} } type hostPathPlugin struct { host volume.VolumeHost + // decouple creating recyclers by deferring to a function. Allows for easier testing. + newRecyclerFunc func(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error) } var _ volume.VolumePlugin = &hostPathPlugin{} +var _ volume.PersistentVolumePlugin = &hostPathPlugin{} +var _ volume.RecyclableVolumePlugin = &hostPathPlugin{} const ( hostPathPluginName = "kubernetes.io/host-path" @@ -70,6 +82,18 @@ func (plugin *hostPathPlugin) NewCleaner(volName string, podUID types.UID, _ mou return &hostPath{""}, nil } +func (plugin *hostPathPlugin) NewRecycler(spec *volume.Spec) (volume.Recycler, error) { + return plugin.newRecyclerFunc(spec, plugin.host) +} + +func newRecycler(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error) { + if spec.VolumeSource.HostPath != nil { + return &hostPathRecycler{spec.Name, spec.VolumeSource.HostPath.Path, host}, nil + } else { + return &hostPathRecycler{spec.Name, spec.PersistentVolumeSource.HostPath.Path, host}, nil + } +} + // HostPath volumes represent a bare host file or directory mount. // The direct at the specified path will be directly exposed to the container. type hostPath struct { @@ -99,3 +123,64 @@ func (hp *hostPath) TearDown() error { func (hp *hostPath) TearDownAt(dir string) error { return fmt.Errorf("TearDownAt() does not make sense for host paths") } + +// hostPathRecycler scrubs a hostPath volume by running "rm -rf" on the volume in a pod +// This recycler only works on a single host cluster and is for testing purposes only. +type hostPathRecycler struct { + name string + path string + host volume.VolumeHost +} + +func (r *hostPathRecycler) GetPath() string { + return r.path +} + +// Recycler provides methods to reclaim the volume resource. +// A HostPath is recycled by scheduling a pod to run "rm -rf" on the contents of the volume. This is meant for +// development and testing in a single node cluster only. +// Recycle blocks until the pod has completed or any error occurs. +// The scrubber pod's is expected to succeed within 30 seconds when testing localhost. +func (r *hostPathRecycler) Recycle() error { + timeout := int64(30 * time.Second) + pod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + GenerateName: "pv-scrubber-" + util.ShortenString(r.name, 44) + "-", + Namespace: api.NamespaceDefault, + }, + Spec: api.PodSpec{ + ActiveDeadlineSeconds: &timeout, + RestartPolicy: api.RestartPolicyNever, + Volumes: []api.Volume{ + { + Name: "vol", + VolumeSource: api.VolumeSource{ + HostPath: &api.HostPathVolumeSource{r.path}, + }, + }, + }, + Containers: []api.Container{ + { + Name: "scrubber", + Image: "busybox", + // delete the contents of the volume, but not the directory itself + Command: []string{"/bin/sh"}, + // the scrubber: + // 1. validates the /scrub directory exists + // 2. creates a text file in the directory to be scrubbed + // 3. performs rm -rf on the directory + // 4. tests to see if the directory is empty + // the pod fails if the error code is returned + Args: []string{"-c", "test -e /scrub && echo $(date) > /scrub/trash.txt && rm -rf /scrub/* && test -z \"$(ls -A /scrub)\" || exit 1"}, + VolumeMounts: []api.VolumeMount{ + { + Name: "vol", + MountPath: "/scrub", + }, + }, + }, + }, + }, + } + return volume.ScrubPodVolumeAndWatchUntilCompletion(pod, r.host.GetKubeClient()) +} diff --git a/pkg/volume/host_path/host_path_test.go b/pkg/volume/host_path/host_path_test.go index 84ba54b128..0dfeeec89c 100644 --- a/pkg/volume/host_path/host_path_test.go +++ b/pkg/volume/host_path/host_path_test.go @@ -59,6 +59,47 @@ func TestGetAccessModes(t *testing.T) { } } +func TestRecycler(t *testing.T) { + plugMgr := volume.VolumePluginMgr{} + plugMgr.InitPlugins([]volume.VolumePlugin{&hostPathPlugin{nil, newMockRecycler}}, volume.NewFakeVolumeHost("/tmp/fake", nil, nil)) + + spec := &volume.Spec{PersistentVolumeSource: api.PersistentVolumeSource{HostPath: &api.HostPathVolumeSource{Path: "/foo"}}} + plug, err := plugMgr.FindRecyclablePluginBySpec(spec) + if err != nil { + t.Errorf("Can't find the plugin by name") + } + recycler, err := plug.NewRecycler(spec) + if err != nil { + t.Error("Failed to make a new Recyler: %v", err) + } + if recycler.GetPath() != spec.PersistentVolumeSource.HostPath.Path { + t.Errorf("Expected %s but got %s", spec.PersistentVolumeSource.HostPath.Path, recycler.GetPath()) + } + if err := recycler.Recycle(); err != nil { + t.Errorf("Mock Recycler expected to return nil but got %s", err) + } +} + +func newMockRecycler(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error) { + return &mockRecycler{ + path: spec.PersistentVolumeSource.HostPath.Path, + }, nil +} + +type mockRecycler struct { + path string + host volume.VolumeHost +} + +func (r *mockRecycler) GetPath() string { + return r.path +} + +func (r *mockRecycler) Recycle() error { + // return nil means recycle passed + return nil +} + func TestPlugin(t *testing.T) { plugMgr := volume.VolumePluginMgr{} plugMgr.InitPlugins(ProbeVolumePlugins(), volume.NewFakeVolumeHost("fake", nil, nil)) diff --git a/pkg/volume/nfs/nfs.go b/pkg/volume/nfs/nfs.go index a3183112aa..74f0f3bb82 100644 --- a/pkg/volume/nfs/nfs.go +++ b/pkg/volume/nfs/nfs.go @@ -19,25 +19,33 @@ package nfs import ( "fmt" "os" + "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/mount" "github.com/GoogleCloudPlatform/kubernetes/pkg/volume" + "github.com/golang/glog" ) // This is the primary entrypoint for volume plugins. +// Tests covering recycling should not use this func but instead +// use their own array of plugins w/ a custom recyclerFunc as appropriate func ProbeVolumePlugins() []volume.VolumePlugin { - return []volume.VolumePlugin{&nfsPlugin{nil}} + return []volume.VolumePlugin{&nfsPlugin{nil, newRecycler}} } type nfsPlugin struct { host volume.VolumeHost + // decouple creating recyclers by deferring to a function. Allows for easier testing. + newRecyclerFunc func(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error) } var _ volume.VolumePlugin = &nfsPlugin{} +var _ volume.PersistentVolumePlugin = &nfsPlugin{} +var _ volume.RecyclableVolumePlugin = &nfsPlugin{} const ( nfsPluginName = "kubernetes.io/nfs" @@ -103,6 +111,28 @@ func (plugin *nfsPlugin) newCleanerInternal(volName string, podUID types.UID, mo }, nil } +func (plugin *nfsPlugin) NewRecycler(spec *volume.Spec) (volume.Recycler, error) { + return plugin.newRecyclerFunc(spec, plugin.host) +} + +func newRecycler(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error) { + if spec.VolumeSource.HostPath != nil { + return &nfsRecycler{ + name: spec.Name, + server: spec.VolumeSource.NFS.Server, + path: spec.VolumeSource.NFS.Path, + host: host, + }, nil + } else { + return &nfsRecycler{ + name: spec.Name, + server: spec.PersistentVolumeSource.NFS.Server, + path: spec.PersistentVolumeSource.NFS.Path, + host: host, + }, nil + } +} + // NFS volumes represent a bare host file or directory mount of an NFS export. type nfs struct { volName string @@ -112,6 +142,8 @@ type nfs struct { readOnly bool mounter mount.Interface plugin *nfsPlugin + // decouple creating recyclers by deferring to a function. Allows for easier testing. + newRecyclerFunc func(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error) } // SetUp attaches the disk and bind mounts to the volume path. @@ -199,3 +231,66 @@ func (nfsVolume *nfs) TearDownAt(dir string) error { return nil } + +// nfsRecycler scrubs an NFS volume by running "rm -rf" on the volume in a pod. +type nfsRecycler struct { + name string + server string + path string + host volume.VolumeHost +} + +func (r *nfsRecycler) GetPath() string { + return r.path +} + +// Recycler provides methods to reclaim the volume resource. +// A NFS volume is recycled by scheduling a pod to run "rm -rf" on the contents of the volume. +// Recycle blocks until the pod has completed or any error occurs. +// The scrubber pod's is expected to succeed within 5 minutes else an error will be returned +func (r *nfsRecycler) Recycle() error { + timeout := int64(300 * time.Second) // 5 minutes + pod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + GenerateName: "pv-scrubber-" + util.ShortenString(r.name, 44) + "-", + Namespace: api.NamespaceDefault, + }, + Spec: api.PodSpec{ + ActiveDeadlineSeconds: &timeout, + RestartPolicy: api.RestartPolicyNever, + Volumes: []api.Volume{ + { + Name: "vol", + VolumeSource: api.VolumeSource{ + NFS: &api.NFSVolumeSource{ + Server: r.server, + Path: r.path, + }, + }, + }, + }, + Containers: []api.Container{ + { + Name: "scrubber", + Image: "busybox", + // delete the contents of the volume, but not the directory itself + Command: []string{"/bin/sh"}, + // the scrubber: + // 1. validates the /scrub directory exists + // 2. creates a text file to be scrubbed + // 3. performs rm -rf on the directory + // 4. tests to see if the directory is empty + // the pod fails if the error code is returned + Args: []string{"-c", "test -e /scrub && echo $(date) > /scrub/trash.txt && rm -rf /scrub/* && test -z \"$(ls -A /scrub)\" || exit 1"}, + VolumeMounts: []api.VolumeMount{ + { + Name: "vol", + MountPath: "/scrub", + }, + }, + }, + }, + }, + } + return volume.ScrubPodVolumeAndWatchUntilCompletion(pod, r.host.GetKubeClient()) +} diff --git a/pkg/volume/nfs/nfs_test.go b/pkg/volume/nfs/nfs_test.go index 948397eb3c..2b7520def8 100644 --- a/pkg/volume/nfs/nfs_test.go +++ b/pkg/volume/nfs/nfs_test.go @@ -60,6 +60,47 @@ func TestGetAccessModes(t *testing.T) { } } +func TestRecycler(t *testing.T) { + plugMgr := volume.VolumePluginMgr{} + plugMgr.InitPlugins([]volume.VolumePlugin{&nfsPlugin{newRecyclerFunc: newMockRecycler}}, volume.NewFakeVolumeHost("/tmp/fake", nil, nil)) + + spec := &volume.Spec{PersistentVolumeSource: api.PersistentVolumeSource{NFS: &api.NFSVolumeSource{Path: "/foo"}}} + plug, err := plugMgr.FindRecyclablePluginBySpec(spec) + if err != nil { + t.Errorf("Can't find the plugin by name") + } + recycler, err := plug.NewRecycler(spec) + if err != nil { + t.Error("Failed to make a new Recyler: %v", err) + } + if recycler.GetPath() != spec.PersistentVolumeSource.NFS.Path { + t.Errorf("Expected %s but got %s", spec.PersistentVolumeSource.NFS.Path, recycler.GetPath()) + } + if err := recycler.Recycle(); err != nil { + t.Errorf("Mock Recycler expected to return nil but got %s", err) + } +} + +func newMockRecycler(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error) { + return &mockRecycler{ + path: spec.PersistentVolumeSource.NFS.Path, + }, nil +} + +type mockRecycler struct { + path string + host volume.VolumeHost +} + +func (r *mockRecycler) GetPath() string { + return r.path +} + +func (r *mockRecycler) Recycle() error { + // return nil means recycle passed + return nil +} + func contains(modes []api.PersistentVolumeAccessMode, mode api.PersistentVolumeAccessMode) bool { for _, m := range modes { if m == mode { diff --git a/pkg/volume/plugins.go b/pkg/volume/plugins.go index 54a5f68a89..d94807fd89 100644 --- a/pkg/volume/plugins.go +++ b/pkg/volume/plugins.go @@ -79,6 +79,15 @@ type PersistentVolumePlugin interface { GetAccessModes() []api.PersistentVolumeAccessMode } +// RecyclableVolumePlugin is an extended interface of VolumePlugin and is used +// by persistent volumes that want to be recycled before being made available again to new claims +type RecyclableVolumePlugin interface { + VolumePlugin + // NewRecycler creates a new volume.Recycler which knows how to reclaim this resource + // after the volume's release from a PersistentVolumeClaim + NewRecycler(spec *Spec) (Recycler, error) +} + // VolumeHost is an interface that plugins can use to access the kubelet. type VolumeHost interface { // GetPluginDir returns the absolute path to a directory under which @@ -217,7 +226,20 @@ func (pm *VolumePluginMgr) FindPluginByName(name string) (VolumePlugin, error) { return pm.plugins[matches[0]], nil } -// FindPluginByName fetches a plugin by name or by legacy name. If no plugin +// FindPersistentPluginBySpec looks for a persistent volume plugin that can support a given volume +// specification. If no plugin is found, return an error +func (pm *VolumePluginMgr) FindPersistentPluginBySpec(spec *Spec) (PersistentVolumePlugin, error) { + volumePlugin, err := pm.FindPluginBySpec(spec) + if err != nil { + return nil, fmt.Errorf("Could not find volume plugin for spec: %+v", spec) + } + if persistentVolumePlugin, ok := volumePlugin.(PersistentVolumePlugin); ok { + return persistentVolumePlugin, nil + } + return nil, fmt.Errorf("no persistent volume plugin matched") +} + +// FindPersistentPluginByName fetches a persistent volume plugin by name. If no plugin // is found, returns error. func (pm *VolumePluginMgr) FindPersistentPluginByName(name string) (PersistentVolumePlugin, error) { volumePlugin, err := pm.FindPluginByName(name) @@ -227,5 +249,18 @@ func (pm *VolumePluginMgr) FindPersistentPluginByName(name string) (PersistentVo if persistentVolumePlugin, ok := volumePlugin.(PersistentVolumePlugin); ok { return persistentVolumePlugin, nil } - return nil, fmt.Errorf("no persistent volume plugin matched") + return nil, fmt.Errorf("no persistent volume plugin matched: %+v") +} + +// FindRecyclablePluginByName fetches a persistent volume plugin by name. If no plugin +// is found, returns error. +func (pm *VolumePluginMgr) FindRecyclablePluginBySpec(spec *Spec) (RecyclableVolumePlugin, error) { + volumePlugin, err := pm.FindPluginBySpec(spec) + if err != nil { + return nil, err + } + if recyclableVolumePlugin, ok := volumePlugin.(RecyclableVolumePlugin); ok { + return recyclableVolumePlugin, nil + } + return nil, fmt.Errorf("no recyclable volume plugin matched") } diff --git a/pkg/volume/testing.go b/pkg/volume/testing.go index 6502b326cf..3ac6d9736e 100644 --- a/pkg/volume/testing.go +++ b/pkg/volume/testing.go @@ -82,6 +82,7 @@ type FakeVolumePlugin struct { } var _ VolumePlugin = &FakeVolumePlugin{} +var _ RecyclableVolumePlugin = &FakeVolumePlugin{} func (plugin *FakeVolumePlugin) Init(host VolumeHost) { plugin.Host = host @@ -104,6 +105,10 @@ func (plugin *FakeVolumePlugin) NewCleaner(volName string, podUID types.UID, mou return &FakeVolume{podUID, volName, plugin}, nil } +func (plugin *FakeVolumePlugin) NewRecycler(spec *Spec) (Recycler, error) { + return &FakeRecycler{"/attributesTransferredFromSpec"}, nil +} + func (plugin *FakeVolumePlugin) GetAccessModes() []api.PersistentVolumeAccessMode { return []api.PersistentVolumeAccessMode{} } @@ -133,3 +138,16 @@ func (fv *FakeVolume) TearDown() error { func (fv *FakeVolume) TearDownAt(dir string) error { return os.RemoveAll(dir) } + +type FakeRecycler struct { + path string +} + +func (fr *FakeRecycler) Recycle() error { + // nil is success, else error + return nil +} + +func (fr *FakeRecycler) GetPath() string { + return fr.path +} diff --git a/pkg/volume/util.go b/pkg/volume/util.go index 3efa382393..d9a1c87ecf 100644 --- a/pkg/volume/util.go +++ b/pkg/volume/util.go @@ -17,7 +17,18 @@ limitations under the License. package volume import ( + "fmt" + "time" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" + "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + + "github.com/golang/glog" ) func GetAccessModesAsString(modes []api.PersistentVolumeAccessMode) string { @@ -51,3 +62,89 @@ func contains(modes []api.PersistentVolumeAccessMode, mode api.PersistentVolumeA } return false } + +// ScrubPodVolumeAndWatchUntilCompletion is intended for use with volume Recyclers. This function will +// save the given Pod to the API and watch it until it completes, fails, or the pod's ActiveDeadlineSeconds is exceeded, whichever comes first. +// An attempt to delete a scrubber pod is always attempted before returning. +// pod - the pod designed by a volume plugin to scrub the volume's contents +// client - kube client for API operations. +func ScrubPodVolumeAndWatchUntilCompletion(pod *api.Pod, kubeClient client.Interface) error { + return internalScrubPodVolumeAndWatchUntilCompletion(pod, newScrubberClient(kubeClient)) +} + +// same as above func comments, except 'scrubberClient' is a narrower pod API interface to ease testing +func internalScrubPodVolumeAndWatchUntilCompletion(pod *api.Pod, scrubberClient scrubberClient) error { + glog.V(5).Infof("Creating scrubber pod for volume %s\n", pod.Name) + pod, err := scrubberClient.CreatePod(pod) + if err != nil { + return fmt.Errorf("Unexpected error creating a pod to scrub volume %s: %+v\n", pod.Name, err) + } + + defer scrubberClient.DeletePod(pod.Name, pod.Namespace) + + nextPod := scrubberClient.WatchPod(pod.Name, pod.Namespace, pod.ResourceVersion) + for { + watchedPod := nextPod() + if watchedPod.Status.Phase == api.PodSucceeded { + // volume.Recycle() returns nil on success, else error + return nil + } + if watchedPod.Status.Phase == api.PodFailed { + // volume.Recycle() returns nil on success, else error + if watchedPod.Status.Message != "" { + return fmt.Errorf(watchedPod.Status.Message) + } else { + return fmt.Errorf("Pod failed, pod.Status.Message unknown.") + } + } + } +} + +// scrubberClient abstracts access to a Pod by providing a narrower interface. +// this makes it easier to mock a client for testing +type scrubberClient interface { + CreatePod(pod *api.Pod) (*api.Pod, error) + GetPod(name, namespace string) (*api.Pod, error) + DeletePod(name, namespace string) error + WatchPod(name, namespace, resourceVersion string) func() *api.Pod +} + +func newScrubberClient(client client.Interface) scrubberClient { + return &realScrubberClient{client} +} + +type realScrubberClient struct { + client client.Interface +} + +func (c *realScrubberClient) CreatePod(pod *api.Pod) (*api.Pod, error) { + return c.client.Pods(pod.Namespace).Create(pod) +} + +func (c *realScrubberClient) GetPod(name, namespace string) (*api.Pod, error) { + return c.client.Pods(namespace).Get(name) +} + +func (c *realScrubberClient) DeletePod(name, namespace string) error { + return c.client.Pods(namespace).Delete(name, nil) +} + +func (c *realScrubberClient) WatchPod(name, namespace, resourceVersion string) func() *api.Pod { + fieldSelector, _ := fields.ParseSelector("metadata.name=" + name) + + podLW := &cache.ListWatch{ + ListFunc: func() (runtime.Object, error) { + return c.client.Pods(namespace).List(labels.Everything(), fieldSelector) + }, + WatchFunc: func(resourceVersion string) (watch.Interface, error) { + return c.client.Pods(namespace).Watch(labels.Everything(), fieldSelector, resourceVersion) + }, + } + queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc) + cache.NewReflector(podLW, &api.Pod{}, queue, 1*time.Minute).Run() + + return func() *api.Pod { + obj := queue.Pop() + return obj.(*api.Pod) + } +} diff --git a/pkg/volume/util_test.go b/pkg/volume/util_test.go new file mode 100644 index 0000000000..b921d013f5 --- /dev/null +++ b/pkg/volume/util_test.go @@ -0,0 +1,102 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package volume + +import ( + "fmt" + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "strings" +) + +func TestScrubberSuccess(t *testing.T) { + client := &mockScrubberClient{} + scrubber := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "scrubber-test", + Namespace: api.NamespaceDefault, + }, + Status: api.PodStatus{ + Phase: api.PodSucceeded, + }, + } + + err := internalScrubPodVolumeAndWatchUntilCompletion(scrubber, client) + if err != nil { + t.Errorf("Unexpected error watching scrubber pod: %+v", err) + } + if !client.deletedCalled { + t.Errorf("Expected deferred client.Delete to be called on scrubber pod") + } +} + +func TestScrubberFailure(t *testing.T) { + client := &mockScrubberClient{} + scrubber := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "scrubber-test", + Namespace: api.NamespaceDefault, + }, + Status: api.PodStatus{ + Phase: api.PodFailed, + Message: "foo", + }, + } + + err := internalScrubPodVolumeAndWatchUntilCompletion(scrubber, client) + if err == nil { + t.Fatalf("Expected pod failure but got nil error returned") + } + if err != nil { + if !strings.Contains(err.Error(), "foo") { + t.Errorf("Expected pod.Status.Message %s but got %s", scrubber.Status.Message, err) + } + } + if !client.deletedCalled { + t.Errorf("Expected deferred client.Delete to be called on scrubber pod") + } +} + +type mockScrubberClient struct { + pod *api.Pod + deletedCalled bool +} + +func (c *mockScrubberClient) CreatePod(pod *api.Pod) (*api.Pod, error) { + c.pod = pod + return c.pod, nil +} + +func (c *mockScrubberClient) GetPod(name, namespace string) (*api.Pod, error) { + if c.pod != nil { + return c.pod, nil + } else { + return nil, fmt.Errorf("pod does not exist") + } +} + +func (c *mockScrubberClient) DeletePod(name, namespace string) error { + c.deletedCalled = true + return nil +} + +func (c *mockScrubberClient) WatchPod(name, namespace, resourceVersion string) func() *api.Pod { + return func() *api.Pod { + return c.pod + } +} diff --git a/pkg/volume/volume.go b/pkg/volume/volume.go index e1af5f699f..ee668400a8 100644 --- a/pkg/volume/volume.go +++ b/pkg/volume/volume.go @@ -29,7 +29,7 @@ type Volume interface { GetPath() string } -// Builder interface provides method to set up/mount the volume. +// Builder interface provides methods to set up/mount the volume. type Builder interface { // Uses Interface to provide the path for Docker binds. Volume @@ -43,7 +43,7 @@ type Builder interface { SetUpAt(dir string) error } -// Cleaner interface provides method to cleanup/unmount the volumes. +// Cleaner interface provides methods to cleanup/unmount the volumes. type Cleaner interface { Volume // TearDown unmounts the volume from a self-determined directory and @@ -54,6 +54,14 @@ type Cleaner interface { TearDownAt(dir string) error } +// Recycler provides methods to reclaim the volume resource. +type Recycler interface { + Volume + // Recycle reclaims the resource. Calls to this method should block until the recycling task is complete. + // Any error returned indicates the volume has failed to be reclaimed. A nil return indicates success. + Recycle() error +} + func RenameDirectory(oldPath, newName string) (string, error) { newPath, err := ioutil.TempDir(path.Dir(oldPath), newName) if err != nil { diff --git a/pkg/volumeclaimbinder/persistent_volume_claim_binder.go b/pkg/volumeclaimbinder/persistent_volume_claim_binder.go index 2b62325057..8b4c94bfb2 100644 --- a/pkg/volumeclaimbinder/persistent_volume_claim_binder.go +++ b/pkg/volumeclaimbinder/persistent_volume_claim_binder.go @@ -99,7 +99,10 @@ func (binder *PersistentVolumeClaimBinder) addVolume(obj interface{}) { binder.lock.Lock() defer binder.lock.Unlock() volume := obj.(*api.PersistentVolume) - syncVolume(binder.volumeIndex, binder.client, volume) + err := syncVolume(binder.volumeIndex, binder.client, volume) + if err != nil { + glog.Errorf("PVClaimBinder could not add volume %s: %+v", volume.Name, err) + } } func (binder *PersistentVolumeClaimBinder) updateVolume(oldObj, newObj interface{}) { @@ -107,7 +110,10 @@ func (binder *PersistentVolumeClaimBinder) updateVolume(oldObj, newObj interface defer binder.lock.Unlock() newVolume := newObj.(*api.PersistentVolume) binder.volumeIndex.Update(newVolume) - syncVolume(binder.volumeIndex, binder.client, newVolume) + err := syncVolume(binder.volumeIndex, binder.client, newVolume) + if err != nil { + glog.Errorf("PVClaimBinder could not update volume %s: %+v", newVolume.Name, err) + } } func (binder *PersistentVolumeClaimBinder) deleteVolume(obj interface{}) { @@ -121,18 +127,24 @@ func (binder *PersistentVolumeClaimBinder) addClaim(obj interface{}) { binder.lock.Lock() defer binder.lock.Unlock() claim := obj.(*api.PersistentVolumeClaim) - syncClaim(binder.volumeIndex, binder.client, claim) + err := syncClaim(binder.volumeIndex, binder.client, claim) + if err != nil { + glog.Errorf("PVClaimBinder could not add claim %s: %+v", claim.Name, err) + } } func (binder *PersistentVolumeClaimBinder) updateClaim(oldObj, newObj interface{}) { binder.lock.Lock() defer binder.lock.Unlock() newClaim := newObj.(*api.PersistentVolumeClaim) - syncClaim(binder.volumeIndex, binder.client, newClaim) + err := syncClaim(binder.volumeIndex, binder.client, newClaim) + if err != nil { + glog.Errorf("PVClaimBinder could not update claim %s: %+v", newClaim.Name, err) + } } func syncVolume(volumeIndex *persistentVolumeOrderedIndex, binderClient binderClient, volume *api.PersistentVolume) (err error) { - glog.V(5).Infof("Synchronizing PersistentVolume[%s]\n", volume.Name) + glog.V(5).Infof("Synchronizing PersistentVolume[%s], current phase: %s\n", volume.Name, volume.Status.Phase) // volumes can be in one of the following states: // @@ -140,12 +152,28 @@ func syncVolume(volumeIndex *persistentVolumeOrderedIndex, binderClient binderCl // VolumeAvailable -- not bound to a claim, but processed at least once and found in this controller's volumeIndex. // VolumeBound -- bound to a claim because volume.Spec.ClaimRef != nil. Claim status may not be correct. // VolumeReleased -- volume.Spec.ClaimRef != nil but the claim has been deleted by the user. + // VolumeFailed -- volume.Spec.ClaimRef != nil and the volume failed processing in the recycler currentPhase := volume.Status.Phase nextPhase := currentPhase switch currentPhase { // pending volumes are available only after indexing in order to be matched to claims. case api.VolumePending: + if volume.Spec.ClaimRef != nil { + // Pending volumes that have a ClaimRef were recently recycled. The Recycler set the phase to VolumePending + // to start the volume again at the beginning of this lifecycle. + // ClaimRef is the last bind between persistent volume and claim. + // The claim has already been deleted by the user at this point + oldClaimRef := volume.Spec.ClaimRef + volume.Spec.ClaimRef = nil + _, err = binderClient.UpdatePersistentVolume(volume) + if err != nil { + // rollback on error, keep the ClaimRef until we can successfully update the volume + volume.Spec.ClaimRef = oldClaimRef + return fmt.Errorf("Unexpected error saving PersistentVolume: %+v", err) + } + } + _, exists, err := volumeIndex.Get(volume) if err != nil { return err @@ -178,10 +206,11 @@ func syncVolume(volumeIndex *persistentVolumeOrderedIndex, binderClient binderCl } } } + //bound volumes require verification of their bound claims case api.VolumeBound: if volume.Spec.ClaimRef == nil { - return fmt.Errorf("PersistentVolume[%s] expected to be bound but found nil claimRef: %+v", volume) + return fmt.Errorf("PersistentVolume[%s] expected to be bound but found nil claimRef: %+v", volume.Name, volume) } else { _, err := binderClient.GetPersistentVolumeClaim(volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name) if err != nil { @@ -192,12 +221,22 @@ func syncVolume(volumeIndex *persistentVolumeOrderedIndex, binderClient binderCl } } } + // released volumes require recycling case api.VolumeReleased: + if volume.Spec.ClaimRef == nil { + return fmt.Errorf("PersistentVolume[%s] expected to be bound but found nil claimRef: %+v", volume.Name, volume) + } else { + // another process is watching for released volumes. + // PersistentVolumeReclaimPolicy is set per PersistentVolume + } + + // volumes are removed by processes external to this binder and must be removed from the cluster + case api.VolumeFailed: if volume.Spec.ClaimRef == nil { return fmt.Errorf("PersistentVolume[%s] expected to be bound but found nil claimRef: %+v", volume) } else { - // TODO: implement Recycle method on plugins + glog.V(5).Infof("PersistentVolume[%s] previously failed recycling. Skipping.\n", volume.Name) } } @@ -206,6 +245,7 @@ func syncVolume(volumeIndex *persistentVolumeOrderedIndex, binderClient binderCl // a change in state will trigger another update through this controller. // each pass through this controller evaluates current phase and decides whether or not to change to the next phase + glog.V(5).Infof("PersistentVolume[%s] changing phase from %s to %s\n", volume.Name, currentPhase, nextPhase) volume, err := binderClient.UpdatePersistentVolumeStatus(volume) if err != nil { // Rollback to previous phase @@ -262,6 +302,7 @@ func syncClaim(volumeIndex *persistentVolumeOrderedIndex, binderClient binderCli } if volume.Spec.ClaimRef == nil { + glog.V(5).Infof("Rebuilding bind on pv.Spec.ClaimRef\n") claimRef, err := api.GetReference(claim) if err != nil { return fmt.Errorf("Unexpected error getting claim reference: %v\n", err) @@ -326,6 +367,7 @@ func (controller *PersistentVolumeClaimBinder) Stop() { type binderClient interface { GetPersistentVolume(name string) (*api.PersistentVolume, error) UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error) + DeletePersistentVolume(volume *api.PersistentVolume) error UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) GetPersistentVolumeClaim(namespace, name string) (*api.PersistentVolumeClaim, error) UpdatePersistentVolumeClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) @@ -348,6 +390,10 @@ func (c *realBinderClient) UpdatePersistentVolume(volume *api.PersistentVolume) return c.client.PersistentVolumes().Update(volume) } +func (c *realBinderClient) DeletePersistentVolume(volume *api.PersistentVolume) error { + return c.client.PersistentVolumes().Delete(volume.Name) +} + func (c *realBinderClient) UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) { return c.client.PersistentVolumes().UpdateStatus(volume) } diff --git a/pkg/volumeclaimbinder/persistent_volume_claim_binder_test.go b/pkg/volumeclaimbinder/persistent_volume_claim_binder_test.go index 755c887ab2..0f2e106159 100644 --- a/pkg/volumeclaimbinder/persistent_volume_claim_binder_test.go +++ b/pkg/volumeclaimbinder/persistent_volume_claim_binder_test.go @@ -26,6 +26,8 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/testclient" + "github.com/GoogleCloudPlatform/kubernetes/pkg/volume" + "github.com/GoogleCloudPlatform/kubernetes/pkg/volume/host_path" ) func TestRunStop(t *testing.T) { @@ -98,13 +100,14 @@ func TestExampleObjects(t *testing.T) { Spec: api.PersistentVolumeSpec{ AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce}, Capacity: api.ResourceList{ - api.ResourceName(api.ResourceStorage): resource.MustParse("5Gi"), + api.ResourceName(api.ResourceStorage): resource.MustParse("8Gi"), }, PersistentVolumeSource: api.PersistentVolumeSource{ HostPath: &api.HostPathVolumeSource{ Path: "/tmp/data02", }, }, + PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimRecycle, }, }, }, @@ -179,6 +182,7 @@ func TestBindingWithExamples(t *testing.T) { client := &testclient.Fake{ReactFn: testclient.ObjectReaction(o, latest.RESTMapper)} pv, err := client.PersistentVolumes().Get("any") + pv.Spec.PersistentVolumeReclaimPolicy = api.PersistentVolumeReclaimRecycle if err != nil { t.Error("Unexpected error getting PV from client: %v", err) } @@ -194,6 +198,15 @@ func TestBindingWithExamples(t *testing.T) { claim: claim, } + plugMgr := volume.VolumePluginMgr{} + plugMgr.InitPlugins(host_path.ProbeRecyclableVolumePlugins(newMockRecycler), volume.NewFakeVolumeHost("/tmp/fake", nil, nil)) + + recycler := &PersistentVolumeRecycler{ + kubeClient: client, + client: mockClient, + pluginMgr: plugMgr, + } + // adds the volume to the index, making the volume available syncVolume(volumeIndex, mockClient, pv) if pv.Status.Phase != api.VolumeAvailable { @@ -232,6 +245,31 @@ func TestBindingWithExamples(t *testing.T) { if pv.Status.Phase != api.VolumeReleased { t.Errorf("Expected phase %s but got %s", api.VolumeReleased, pv.Status.Phase) } + if pv.Spec.ClaimRef == nil { + t.Errorf("Expected non-nil ClaimRef: %+v", pv.Spec) + } + + mockClient.volume = pv + + // released volumes with a PersistentVolumeReclaimPolicy (recycle/delete) can have further processing + err = recycler.reclaimVolume(pv) + if err != nil { + t.Errorf("Unexpected error reclaiming volume: %+v", err) + } + if pv.Status.Phase != api.VolumePending { + t.Errorf("Expected phase %s but got %s", api.VolumePending, pv.Status.Phase) + } + + // after the recycling changes the phase to Pending, the binder picks up again + // to remove any vestiges of binding and make the volume Available again + syncVolume(volumeIndex, mockClient, pv) + + if pv.Status.Phase != api.VolumeAvailable { + t.Errorf("Expected phase %s but got %s", api.VolumeAvailable, pv.Status.Phase) + } + if pv.Spec.ClaimRef != nil { + t.Errorf("Expected nil ClaimRef: %+v", pv.Spec) + } } func TestMissingFromIndex(t *testing.T) { @@ -324,6 +362,11 @@ func (c *mockBinderClient) UpdatePersistentVolume(volume *api.PersistentVolume) return volume, nil } +func (c *mockBinderClient) DeletePersistentVolume(volume *api.PersistentVolume) error { + c.volume = nil + return nil +} + func (c *mockBinderClient) UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) { return volume, nil } @@ -343,3 +386,23 @@ func (c *mockBinderClient) UpdatePersistentVolumeClaim(claim *api.PersistentVolu func (c *mockBinderClient) UpdatePersistentVolumeClaimStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) { return claim, nil } + +func newMockRecycler(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error) { + return &mockRecycler{ + path: spec.PersistentVolumeSource.HostPath.Path, + }, nil +} + +type mockRecycler struct { + path string + host volume.VolumeHost +} + +func (r *mockRecycler) GetPath() string { + return r.path +} + +func (r *mockRecycler) Recycle() error { + // return nil means recycle passed + return nil +} diff --git a/pkg/volumeclaimbinder/persistent_volume_recycler.go b/pkg/volumeclaimbinder/persistent_volume_recycler.go new file mode 100644 index 0000000000..c1b6bff0f2 --- /dev/null +++ b/pkg/volumeclaimbinder/persistent_volume_recycler.go @@ -0,0 +1,231 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package volumeclaimbinder + +import ( + "fmt" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" + "github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework" + "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/volume" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/types" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/mount" + "github.com/golang/glog" +) + +// PersistentVolumeRecycler is a controller that watches for PersistentVolumes that are released from their claims. +// This controller will Recycle those volumes whose reclaim policy is set to PersistentVolumeReclaimRecycle and make them +// available again for a new claim. +type PersistentVolumeRecycler struct { + volumeController *framework.Controller + stopChannel chan struct{} + client recyclerClient + kubeClient client.Interface + pluginMgr volume.VolumePluginMgr +} + +// PersistentVolumeRecycler creates a new PersistentVolumeRecycler +func NewPersistentVolumeRecycler(kubeClient client.Interface, syncPeriod time.Duration, plugins []volume.VolumePlugin) (*PersistentVolumeRecycler, error) { + recyclerClient := NewRecyclerClient(kubeClient) + recycler := &PersistentVolumeRecycler{ + client: recyclerClient, + kubeClient: kubeClient, + } + + if err := recycler.pluginMgr.InitPlugins(plugins, recycler); err != nil { + return nil, fmt.Errorf("Could not initialize volume plugins for PVClaimBinder: %+v", err) + } + + _, volumeController := framework.NewInformer( + &cache.ListWatch{ + ListFunc: func() (runtime.Object, error) { + return kubeClient.PersistentVolumes().List(labels.Everything(), fields.Everything()) + }, + WatchFunc: func(resourceVersion string) (watch.Interface, error) { + return kubeClient.PersistentVolumes().Watch(labels.Everything(), fields.Everything(), resourceVersion) + }, + }, + &api.PersistentVolume{}, + syncPeriod, + framework.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + pv := obj.(*api.PersistentVolume) + recycler.reclaimVolume(pv) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + pv := newObj.(*api.PersistentVolume) + recycler.reclaimVolume(pv) + }, + }, + ) + + recycler.volumeController = volumeController + return recycler, nil +} + +func (recycler *PersistentVolumeRecycler) reclaimVolume(pv *api.PersistentVolume) error { + if pv.Status.Phase == api.VolumeReleased && pv.Spec.ClaimRef != nil { + glog.V(5).Infof("Reclaiming PersistentVolume[%s]\n", pv.Name) + + latest, err := recycler.client.GetPersistentVolume(pv.Name) + if err != nil { + return fmt.Errorf("Could not find PersistentVolume %s", pv.Name) + } + if latest.Status.Phase != api.VolumeReleased { + return fmt.Errorf("PersistentVolume[%s] phase is %s, expected %s. Skipping.", pv.Name, latest.Status.Phase, api.VolumeReleased) + } + + // handleRecycle blocks until completion + // TODO: allow parallel recycling operations to increase throughput + // TODO implement handleDelete in a separate PR w/ cloud volumes + switch pv.Spec.PersistentVolumeReclaimPolicy { + case api.PersistentVolumeReclaimRecycle: + err = recycler.handleRecycle(pv) + case api.PersistentVolumeReclaimRetain: + glog.V(5).Infof("Volume %s is set to retain after release. Skipping.\n", pv.Name) + default: + err = fmt.Errorf("No PersistentVolumeReclaimPolicy defined for spec: %+v", pv) + } + if err != nil { + errMsg := fmt.Sprintf("Could not recycle volume spec: %+v", err) + glog.Errorf(errMsg) + return fmt.Errorf(errMsg) + } + } + return nil +} + +func (recycler *PersistentVolumeRecycler) handleRecycle(pv *api.PersistentVolume) error { + glog.V(5).Infof("Recycling PersistentVolume[%s]\n", pv.Name) + + currentPhase := pv.Status.Phase + nextPhase := currentPhase + + spec := volume.NewSpecFromPersistentVolume(pv) + plugin, err := recycler.pluginMgr.FindRecyclablePluginBySpec(spec) + if err != nil { + return fmt.Errorf("Could not find recyclable volume plugin for spec: %+v", err) + } + volRecycler, err := plugin.NewRecycler(spec) + if err != nil { + return fmt.Errorf("Could not obtain Recycler for spec: %+v", err) + } + // blocks until completion + err = volRecycler.Recycle() + if err != nil { + glog.Errorf("PersistentVolume[%s] failed recycling: %+v", err) + pv.Status.Message = fmt.Sprintf("Recycling error: %s", err) + nextPhase = api.VolumeFailed + } else { + glog.V(5).Infof("PersistentVolume[%s] successfully recycled\n", pv.Name) + nextPhase = api.VolumePending + if err != nil { + glog.Errorf("Error updating pv.Status: %+v", err) + } + } + + if currentPhase != nextPhase { + glog.V(5).Infof("PersistentVolume[%s] changing phase from %s to %s\n", pv.Name, currentPhase, nextPhase) + pv.Status.Phase = nextPhase + _, err := recycler.client.UpdatePersistentVolumeStatus(pv) + if err != nil { + // Rollback to previous phase + pv.Status.Phase = currentPhase + } + } + + return nil +} + +// Run starts this recycler's control loops +func (recycler *PersistentVolumeRecycler) Run() { + glog.V(5).Infof("Starting PersistentVolumeRecycler\n") + if recycler.stopChannel == nil { + recycler.stopChannel = make(chan struct{}) + go recycler.volumeController.Run(recycler.stopChannel) + } +} + +// Stop gracefully shuts down this binder +func (recycler *PersistentVolumeRecycler) Stop() { + glog.V(5).Infof("Stopping PersistentVolumeRecycler\n") + if recycler.stopChannel != nil { + close(recycler.stopChannel) + recycler.stopChannel = nil + } +} + +// recyclerClient abstracts access to PVs +type recyclerClient interface { + GetPersistentVolume(name string) (*api.PersistentVolume, error) + UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error) + UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) +} + +func NewRecyclerClient(c client.Interface) recyclerClient { + return &realRecyclerClient{c} +} + +type realRecyclerClient struct { + client client.Interface +} + +func (c *realRecyclerClient) GetPersistentVolume(name string) (*api.PersistentVolume, error) { + return c.client.PersistentVolumes().Get(name) +} + +func (c *realRecyclerClient) UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error) { + return c.client.PersistentVolumes().Update(volume) +} + +func (c *realRecyclerClient) UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) { + return c.client.PersistentVolumes().UpdateStatus(volume) +} + +// PersistentVolumeRecycler is host to the volume plugins, but does not actually mount any volumes. +// Because no mounting is performed, most of the VolumeHost methods are not implemented. +func (f *PersistentVolumeRecycler) GetPluginDir(podUID string) string { + return "" +} + +func (f *PersistentVolumeRecycler) GetPodVolumeDir(podUID types.UID, pluginName, volumeName string) string { + return "" +} + +func (f *PersistentVolumeRecycler) GetPodPluginDir(podUID types.UID, pluginName string) string { + return "" +} + +func (f *PersistentVolumeRecycler) GetKubeClient() client.Interface { + return f.kubeClient +} + +func (f *PersistentVolumeRecycler) NewWrapperBuilder(spec *volume.Spec, pod *api.Pod, opts volume.VolumeOptions, mounter mount.Interface) (volume.Builder, error) { + return nil, fmt.Errorf("NewWrapperBuilder not supported by PVClaimBinder's VolumeHost implementation") +} + +func (f *PersistentVolumeRecycler) NewWrapperCleaner(spec *volume.Spec, podUID types.UID, mounter mount.Interface) (volume.Cleaner, error) { + return nil, fmt.Errorf("NewWrapperCleaner not supported by PVClaimBinder's VolumeHost implementation") +} diff --git a/test/e2e/persistent_volumes.go b/test/e2e/persistent_volumes.go new file mode 100644 index 0000000000..95d080ac1b --- /dev/null +++ b/test/e2e/persistent_volumes.go @@ -0,0 +1,200 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "fmt" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "time" +) + +// Marked with [Skipped] to skip the test by default (see driver.go), +// the test needs privileged containers, which are disabled by default. +// Run the test with "go run hack/e2e.go ... --ginkgo.focus=PersistentVolume" +var _ = Describe("[Skipped] persistentVolumes", func() { + var c *client.Client + var ns string + + BeforeEach(func() { + var err error + c, err = loadClient() + Expect(err).NotTo(HaveOccurred()) + ns_, err := createTestingNS("pv", c) + ns = ns_.Name + Expect(err).NotTo(HaveOccurred()) + }) + + AfterEach(func() { + By(fmt.Sprintf("Destroying namespace for this suite %v", ns)) + if err := c.Namespaces().Delete(ns); err != nil { + Failf("Couldn't delete ns %s", err) + } + }) + + It("PersistentVolume", func() { + config := VolumeTestConfig{ + namespace: ns, + prefix: "nfs", + serverImage: "gcr.io/google_containers/volume-nfs", + serverPorts: []int{2049}, + } + + defer func() { + volumeTestCleanup(c, config) + }() + + pod := startVolumeServer(c, config) + serverIP := pod.Status.PodIP + Logf("NFS server IP address: %v", serverIP) + + pv := makePersistentVolume(serverIP) + pvc := makePersistentVolumeClaim(ns) + + Logf("Creating PersistentVolume using NFS") + pv, err := c.PersistentVolumes().Create(pv) + Expect(err).NotTo(HaveOccurred()) + + Logf("Creating PersistentVolumeClaim") + pvc, err = c.PersistentVolumeClaims(ns).Create(pvc) + Expect(err).NotTo(HaveOccurred()) + + // allow the binder a chance to catch up. should not be more than 20s. + waitForPersistentVolumePhase(api.VolumeBound, c, pv.Name, 1*time.Second, 30*time.Second) + + pv, err = c.PersistentVolumes().Get(pv.Name) + Expect(err).NotTo(HaveOccurred()) + if pv.Spec.ClaimRef == nil { + Failf("Expected PersistentVolume to be bound, but got nil ClaimRef: %+v", pv) + } + + Logf("Deleting PersistentVolumeClaim to trigger PV Recycling") + err = c.PersistentVolumeClaims(ns).Delete(pvc.Name) + Expect(err).NotTo(HaveOccurred()) + + // allow the recycler a chance to catch up. it has to perform NFS scrub, which can be slow in e2e. + waitForPersistentVolumePhase(api.VolumeAvailable, c, pv.Name, 5*time.Second, 300*time.Second) + + pv, err = c.PersistentVolumes().Get(pv.Name) + Expect(err).NotTo(HaveOccurred()) + if pv.Spec.ClaimRef != nil { + Failf("Expected PersistentVolume to be unbound, but found non-nil ClaimRef: %+v", pv) + } + + // The NFS Server pod we're using contains an index.html file + // Verify the file was really scrubbed from the volume + podTemplate := makeCheckPod(ns, serverIP) + checkpod, err := c.Pods(ns).Create(podTemplate) + expectNoError(err, "Failed to create checker pod: %v", err) + err = waitForPodSuccessInNamespace(c, checkpod.Name, checkpod.Spec.Containers[0].Name, checkpod.Namespace) + Expect(err).NotTo(HaveOccurred()) + }) +}) + +func makePersistentVolume(serverIP string) *api.PersistentVolume { + return &api.PersistentVolume{ + ObjectMeta: api.ObjectMeta{ + GenerateName: "nfs-", + }, + Spec: api.PersistentVolumeSpec{ + PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimRecycle, + Capacity: api.ResourceList{ + api.ResourceName(api.ResourceStorage): resource.MustParse("2Gi"), + }, + PersistentVolumeSource: api.PersistentVolumeSource{ + NFS: &api.NFSVolumeSource{ + Server: serverIP, + Path: "/", + ReadOnly: false, + }, + }, + AccessModes: []api.PersistentVolumeAccessMode{ + api.ReadWriteOnce, + api.ReadOnlyMany, + api.ReadWriteMany, + }, + }, + } +} + +func makePersistentVolumeClaim(ns string) *api.PersistentVolumeClaim { + return &api.PersistentVolumeClaim{ + ObjectMeta: api.ObjectMeta{ + GenerateName: "pvc-", + Namespace: ns, + }, + Spec: api.PersistentVolumeClaimSpec{ + AccessModes: []api.PersistentVolumeAccessMode{ + api.ReadWriteOnce, + api.ReadOnlyMany, + api.ReadWriteMany, + }, + Resources: api.ResourceRequirements{ + Requests: api.ResourceList{ + api.ResourceName(api.ResourceStorage): resource.MustParse("1Gi"), + }, + }, + }, + } +} + +func makeCheckPod(ns string, nfsserver string) *api.Pod { + // Prepare pod that mounts the NFS volume again and + // checks that /mnt/index.html was scrubbed there + return &api.Pod{ + TypeMeta: api.TypeMeta{ + Kind: "Pod", + APIVersion: "v1beta3", + }, + ObjectMeta: api.ObjectMeta{ + GenerateName: "checker-", + Namespace: ns, + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: "scrub-checker", + Image: "busybox", + Command: []string{"/bin/sh"}, + Args: []string{"-c", "test ! -e /mnt/index.html || exit 1"}, + VolumeMounts: []api.VolumeMount{ + { + Name: "nfs-volume", + MountPath: "/mnt", + }, + }, + }, + }, + Volumes: []api.Volume{ + { + Name: "nfs-volume", + VolumeSource: api.VolumeSource{ + NFS: &api.NFSVolumeSource{ + Server: nfsserver, + Path: "/", + }, + }, + }, + }, + }, + } + +} diff --git a/test/e2e/util.go b/test/e2e/util.go index e9d6db1aca..3186c7cfa8 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -300,6 +300,26 @@ func waitForDefaultServiceAccountInNamespace(c *client.Client, namespace string) return waitForServiceAccountInNamespace(c, namespace, "default", serviceAccountPoll, serviceAccountProvisionTimeout) } +// waitForPersistentVolumePhase waits for a PersistentVolume to be in a specific phase or until timeout occurs, whichever comes first. +func waitForPersistentVolumePhase(phase api.PersistentVolumePhase, c *client.Client, pvName string, poll, timeout time.Duration) error { + Logf("Waiting up to %v for PersistentVolume %s to have phase %s", timeout, pvName, phase) + for start := time.Now(); time.Since(start) < timeout; time.Sleep(poll) { + pv, err := c.PersistentVolumes().Get(pvName) + if err != nil { + Logf("Get persistent volume %s in failed, ignoring for %v: %v", pvName, poll, err) + continue + } else { + if pv.Status.Phase == phase { + Logf("PersistentVolume %s found and phase=%s (%v)", pvName, phase, time.Since(start)) + return nil + } else { + Logf("PersistentVolume %s found but phase is %s instead of %s.", pvName, pv.Status.Phase, phase) + } + } + } + return fmt.Errorf("PersistentVolume %s not in phase %s within %v", pvName, phase, timeout) +} + // createNS should be used by every test, note that we append a common prefix to the provided test name. // Please see NewFramework instead of using this directly. func createTestingNS(baseName string, c *client.Client) (*api.Namespace, error) {