Merge pull request #9024 from markturansky/recyc_controllers

PersistentVolumeRecycler controller
pull/6/head
Quinton Hoole 2015-06-05 11:39:08 -07:00
commit 8b01ecb53a
37 changed files with 1281 additions and 22 deletions

View File

@ -10603,6 +10603,10 @@
"claimRef": {
"$ref": "v1.ObjectReference",
"description": "when bound, a reference to the bound claim"
},
"persistentVolumeReclaimPolicy": {
"type": "string",
"description": "persistentVolumeReclaimPolicy is what happens to a volume when released from its claim; Default is Retain."
}
}
},
@ -10807,6 +10811,14 @@
"phase": {
"type": "string",
"description": "the current phase of a persistent volume"
},
"message": {
"type": "string",
"description": "human-readable message indicating details about why the volume is in this state"
},
"reason": {
"type": "string",
"description": "(brief) reason the volume is not is not available, such as failed recycling"
}
}
},
@ -12041,4 +12053,4 @@
}
}
}
}
}

View File

@ -10600,6 +10600,10 @@
"claimRef": {
"$ref": "v1beta3.ObjectReference",
"description": "when bound, a reference to the bound claim"
},
"persistentVolumeReclaimPolicy": {
"type": "string",
"description": "persistentVolumeReclaimPolicy is what happens to a volume when released from its claim; Default is Retain."
}
}
},
@ -10804,6 +10808,14 @@
"phase": {
"type": "string",
"description": "the current phase of a persistent volume"
},
"message": {
"type": "string",
"description": "human-readable message indicating details about why the volume is in this state"
},
"reason": {
"type": "string",
"description": "(brief) reason the volume is not is not available, such as failed recycling"
}
}
},
@ -12047,4 +12059,4 @@
}
}
}
}
}

View File

@ -234,6 +234,11 @@ func (s *CMServer) Run(_ []string) error {
pvclaimBinder := volumeclaimbinder.NewPersistentVolumeClaimBinder(kubeClient, s.PVClaimBinderSyncPeriod)
pvclaimBinder.Run()
pvRecycler, err := volumeclaimbinder.NewPersistentVolumeRecycler(kubeClient, s.PVClaimBinderSyncPeriod, ProbeRecyclableVolumePlugins())
if err != nil {
glog.Fatalf("Failed to start persistent volume recycler: %+v", err)
}
pvRecycler.Run()
if len(s.ServiceAccountKeyFile) > 0 {
privateKey, err := serviceaccount.ReadPrivateKey(s.ServiceAccountKeyFile)

View File

@ -20,6 +20,8 @@ import (
// This file exists to force the desired plugin implementations to be linked.
// This should probably be part of some configuration fed into the build for a
// given binary target.
//Cloud providers
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/aws"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/gce"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/mesos"
@ -27,4 +29,21 @@ import (
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/ovirt"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/rackspace"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/vagrant"
// Volume plugins
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume"
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume/host_path"
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume/nfs"
)
// ProbeRecyclableVolumePlugins collects all persistent volume plugins into an easy to use list.
func ProbeRecyclableVolumePlugins() []volume.VolumePlugin {
allPlugins := []volume.VolumePlugin{}
// The list of plugins to probe is decided by the kubelet binary, not
// by dynamic linking or other "magic". Plugins will be analyzed and
// initialized later.
allPlugins = append(allPlugins, host_path.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, nfs.ProbeVolumePlugins()...)
return allPlugins
}

View File

@ -6,8 +6,9 @@ metadata:
type: local
spec:
capacity:
storage: 5Gi
storage: 8Gi
accessModes:
- ReadWriteOnce
hostPath:
path: "/tmp/data02"
persistentVolumeReclaimPolicy: Recycle

View File

@ -1243,11 +1243,14 @@ func deepCopy_api_PersistentVolumeSpec(in PersistentVolumeSpec, out *PersistentV
} else {
out.ClaimRef = nil
}
out.PersistentVolumeReclaimPolicy = in.PersistentVolumeReclaimPolicy
return nil
}
func deepCopy_api_PersistentVolumeStatus(in PersistentVolumeStatus, out *PersistentVolumeStatus, c *conversion.Cloner) error {
out.Phase = in.Phase
out.Message = in.Message
out.Reason = in.Reason
return nil
}

View File

@ -234,8 +234,11 @@ func FuzzerFor(t *testing.T, version string, src rand.Source) *fuzz.Fuzzer {
},
func(pv *api.PersistentVolume, c fuzz.Continue) {
c.FuzzNoCustom(pv) // fuzz self without calling this function again
types := []api.PersistentVolumePhase{api.VolumePending, api.VolumeBound, api.VolumeReleased, api.VolumeAvailable}
types := []api.PersistentVolumePhase{api.VolumeAvailable, api.VolumePending, api.VolumeBound, api.VolumeReleased, api.VolumeFailed}
pv.Status.Phase = types[c.Rand.Intn(len(types))]
pv.Status.Message = c.RandString()
reclamationPolicies := []api.PersistentVolumeReclaimPolicy{api.PersistentVolumeReclaimRecycle, api.PersistentVolumeReclaimRetain}
pv.Spec.PersistentVolumeReclaimPolicy = reclamationPolicies[c.Rand.Intn(len(reclamationPolicies))]
},
func(pvc *api.PersistentVolumeClaim, c fuzz.Continue) {
c.FuzzNoCustom(pvc) // fuzz self without calling this function again

View File

@ -263,11 +263,33 @@ type PersistentVolumeSpec struct {
// ClaimRef is expected to be non-nil when bound.
// claim.VolumeName is the authoritative bind between PV and PVC.
ClaimRef *ObjectReference `json:"claimRef,omitempty"`
// Optional: what happens to a persistent volume when released from its claim.
PersistentVolumeReclaimPolicy PersistentVolumeReclaimPolicy `json:"persistentVolumeReclaimPolicy,omitempty" description:"what happens to a volume when released from its claim; Valid options are Retain (default) and Recycle. Recyling must be supported by the volume plugin underlying this persistent volume."`
}
// PersistentVolumeReclaimPolicy describes a policy for end-of-life maintenance of persistent volumes
type PersistentVolumeReclaimPolicy string
const (
// PersistentVolumeReclaimRecycle means the volume will be recycled back into the pool of unbound persistent volumes on release from its claim.
// The volume plugin must support Recycling.
PersistentVolumeReclaimRecycle PersistentVolumeReclaimPolicy = "Recycle"
// PersistentVolumeReclaimDelete means the volume will be deleted from Kubernetes on release from its claim.
// The volume plugin must support Deletion.
// TODO: implement w/ DeletableVolumePlugin
// PersistentVolumeReclaimDelete PersistentVolumeReclaimPolicy = "Delete"
// PersistentVolumeReclaimRetain means the volume will left in its current phase (Released) for manual reclamation by the administrator.
// The default policy is Retain.
PersistentVolumeReclaimRetain PersistentVolumeReclaimPolicy = "Retain"
)
type PersistentVolumeStatus struct {
// Phase indicates if a volume is available, bound to a claim, or released by a claim
Phase PersistentVolumePhase `json:"phase,omitempty"`
// A human-readable message indicating details about why the volume is in this state.
Message string `json:"message,omitempty"`
// Reason is a brief CamelCase string that describes any failure and is meant for machine parsing and tidy display in the CLI
Reason string `json:"reason,omitempty"`
}
type PersistentVolumeList struct {
@ -331,12 +353,16 @@ const (
// used for PersistentVolumes that are not available
VolumePending PersistentVolumePhase = "Pending"
// used for PersistentVolumes that are not yet bound
// Available volumes are held by the binder and matched to PersistentVolumeClaims
VolumeAvailable PersistentVolumePhase = "Available"
// used for PersistentVolumes that are bound
VolumeBound PersistentVolumePhase = "Bound"
// used for PersistentVolumes where the bound PersistentVolumeClaim was deleted
// released volumes must be recycled before becoming available again
// this phase is used by the persistent volume claim binder to signal to another process to reclaim the resource
VolumeReleased PersistentVolumePhase = "Released"
// used for PersistentVolumes that failed to be correctly recycled or deleted after being released from a claim
VolumeFailed PersistentVolumePhase = "Failed"
)
type PersistentVolumeClaimPhase string

View File

@ -1358,6 +1358,7 @@ func convert_api_PersistentVolumeSpec_To_v1_PersistentVolumeSpec(in *api.Persist
} else {
out.ClaimRef = nil
}
out.PersistentVolumeReclaimPolicy = PersistentVolumeReclaimPolicy(in.PersistentVolumeReclaimPolicy)
return nil
}
@ -1366,6 +1367,8 @@ func convert_api_PersistentVolumeStatus_To_v1_PersistentVolumeStatus(in *api.Per
defaulting.(func(*api.PersistentVolumeStatus))(in)
}
out.Phase = PersistentVolumePhase(in.Phase)
out.Message = in.Message
out.Reason = in.Reason
return nil
}
@ -3663,6 +3666,7 @@ func convert_v1_PersistentVolumeSpec_To_api_PersistentVolumeSpec(in *PersistentV
} else {
out.ClaimRef = nil
}
out.PersistentVolumeReclaimPolicy = api.PersistentVolumeReclaimPolicy(in.PersistentVolumeReclaimPolicy)
return nil
}
@ -3671,6 +3675,8 @@ func convert_v1_PersistentVolumeStatus_To_api_PersistentVolumeStatus(in *Persist
defaulting.(func(*PersistentVolumeStatus))(in)
}
out.Phase = api.PersistentVolumePhase(in.Phase)
out.Message = in.Message
out.Reason = in.Reason
return nil
}

View File

@ -1174,11 +1174,14 @@ func deepCopy_v1_PersistentVolumeSpec(in PersistentVolumeSpec, out *PersistentVo
} else {
out.ClaimRef = nil
}
out.PersistentVolumeReclaimPolicy = in.PersistentVolumeReclaimPolicy
return nil
}
func deepCopy_v1_PersistentVolumeStatus(in PersistentVolumeStatus, out *PersistentVolumeStatus, c *conversion.Cloner) error {
out.Phase = in.Phase
out.Message = in.Message
out.Reason = in.Reason
return nil
}

View File

@ -113,6 +113,9 @@ func addDefaultingFuncs() {
if obj.Status.Phase == "" {
obj.Status.Phase = VolumePending
}
if obj.Spec.PersistentVolumeReclaimPolicy == "" {
obj.Spec.PersistentVolumeReclaimPolicy = PersistentVolumeReclaimRetain
}
},
func(obj *PersistentVolumeClaim) {
if obj.Status.Phase == "" {

View File

@ -258,6 +258,9 @@ func TestSetDefaultPersistentVolume(t *testing.T) {
if pv2.Status.Phase != versioned.VolumePending {
t.Errorf("Expected volume phase %v, got %v", versioned.VolumePending, pv2.Status.Phase)
}
if pv2.Spec.PersistentVolumeReclaimPolicy != versioned.PersistentVolumeReclaimRetain {
t.Errorf("Expected pv reclaim policy %v, got %v", versioned.PersistentVolumeReclaimRetain, pv2.Spec.PersistentVolumeReclaimPolicy)
}
}
func TestSetDefaultPersistentVolumeClaim(t *testing.T) {

View File

@ -280,11 +280,33 @@ type PersistentVolumeSpec struct {
// ClaimRef is expected to be non-nil when bound.
// claim.VolumeName is the authoritative bind between PV and PVC.
ClaimRef *ObjectReference `json:"claimRef,omitempty" description:"when bound, a reference to the bound claim"`
// Optional: what happens to a persistent volume when released from its claim.
PersistentVolumeReclaimPolicy PersistentVolumeReclaimPolicy `json:"persistentVolumeReclaimPolicy,omitempty" description:"what happens to a volume when released from its claim; Valid options are Retain (default) and Recycle. Recyling must be supported by the volume plugin underlying this persistent volume."`
}
// PersistentVolumeReclaimPolicy describes a policy for end-of-life maintenance of persistent volumes
type PersistentVolumeReclaimPolicy string
const (
// PersistentVolumeReclaimRecycle means the volume will be recycled back into the pool of unbound persistent volumes on release from its claim.
// The volume plugin must support Recycling.
PersistentVolumeReclaimRecycle PersistentVolumeReclaimPolicy = "Recycle"
// PersistentVolumeReclaimDelete means the volume will be deleted from Kubernetes on release from its claim.
// The volume plugin must support Deletion.
// TODO: implement w/ DeletableVolumePlugin
// PersistentVolumeReclaimDelete PersistentVolumeReclaimPolicy = "Delete"
// PersistentVolumeReclaimRetain means the volume will left in its current phase (Released) for manual reclamation by the administrator.
// The default policy is Retain.
PersistentVolumeReclaimRetain PersistentVolumeReclaimPolicy = "Retain"
)
type PersistentVolumeStatus struct {
// Phase indicates if a volume is available, bound to a claim, or released by a claim
Phase PersistentVolumePhase `json:"phase,omitempty" description:"the current phase of a persistent volume"`
// A human-readable message indicating details about why the volume is in this state.
Message string `json:"message,omitempty" description:"human-readable message indicating details about why the volume is in this state"`
// Reason is a brief CamelCase string that describes any failure and is meant for machine parsing and tidy display in the CLI
Reason string `json:"reason,omitempty" description:"(brief) reason the volume is not is not available"`
}
type PersistentVolumeList struct {
@ -348,12 +370,16 @@ const (
// used for PersistentVolumes that are not available
VolumePending PersistentVolumePhase = "Pending"
// used for PersistentVolumes that are not yet bound
// Available volumes are held by the binder and matched to PersistentVolumeClaims
VolumeAvailable PersistentVolumePhase = "Available"
// used for PersistentVolumes that are bound
VolumeBound PersistentVolumePhase = "Bound"
// used for PersistentVolumes where the bound PersistentVolumeClaim was deleted
// released volumes must be recycled before becoming available again
// this phase is used by the persistent volume claim binder to signal to another process to reclaim the resource
VolumeReleased PersistentVolumePhase = "Released"
// used for PersistentVolumes that failed to be correctly recycled or deleted after being released from a claim
VolumeFailed PersistentVolumePhase = "Failed"
)
type PersistentVolumeClaimPhase string

View File

@ -1216,6 +1216,7 @@ func convert_api_PersistentVolumeSpec_To_v1beta3_PersistentVolumeSpec(in *api.Pe
} else {
out.ClaimRef = nil
}
out.PersistentVolumeReclaimPolicy = PersistentVolumeReclaimPolicy(in.PersistentVolumeReclaimPolicy)
return nil
}
@ -1224,6 +1225,8 @@ func convert_api_PersistentVolumeStatus_To_v1beta3_PersistentVolumeStatus(in *ap
defaulting.(func(*api.PersistentVolumeStatus))(in)
}
out.Phase = PersistentVolumePhase(in.Phase)
out.Message = in.Message
out.Reason = in.Reason
return nil
}
@ -3275,6 +3278,7 @@ func convert_v1beta3_PersistentVolumeSpec_To_api_PersistentVolumeSpec(in *Persis
} else {
out.ClaimRef = nil
}
out.PersistentVolumeReclaimPolicy = api.PersistentVolumeReclaimPolicy(in.PersistentVolumeReclaimPolicy)
return nil
}
@ -3283,6 +3287,8 @@ func convert_v1beta3_PersistentVolumeStatus_To_api_PersistentVolumeStatus(in *Pe
defaulting.(func(*PersistentVolumeStatus))(in)
}
out.Phase = api.PersistentVolumePhase(in.Phase)
out.Message = in.Message
out.Reason = in.Reason
return nil
}

View File

@ -1178,11 +1178,14 @@ func deepCopy_v1beta3_PersistentVolumeSpec(in PersistentVolumeSpec, out *Persist
} else {
out.ClaimRef = nil
}
out.PersistentVolumeReclaimPolicy = in.PersistentVolumeReclaimPolicy
return nil
}
func deepCopy_v1beta3_PersistentVolumeStatus(in PersistentVolumeStatus, out *PersistentVolumeStatus, c *conversion.Cloner) error {
out.Phase = in.Phase
out.Message = in.Message
out.Reason = in.Reason
return nil
}

View File

@ -117,6 +117,9 @@ func addDefaultingFuncs() {
if obj.Status.Phase == "" {
obj.Status.Phase = VolumePending
}
if obj.Spec.PersistentVolumeReclaimPolicy == "" {
obj.Spec.PersistentVolumeReclaimPolicy = PersistentVolumeReclaimRetain
}
},
func(obj *PersistentVolumeClaim) {
if obj.Status.Phase == "" {

View File

@ -195,6 +195,9 @@ func TestSetDefaultPersistentVolume(t *testing.T) {
if pv2.Status.Phase != versioned.VolumePending {
t.Errorf("Expected volume phase %v, got %v", versioned.VolumePending, pv2.Status.Phase)
}
if pv2.Spec.PersistentVolumeReclaimPolicy != versioned.PersistentVolumeReclaimRetain {
t.Errorf("Expected pv reclaim policy %v, got %v", versioned.PersistentVolumeReclaimRetain, pv2.Spec.PersistentVolumeReclaimPolicy)
}
}
func TestSetDefaultPersistentVolumeClaim(t *testing.T) {

View File

@ -280,11 +280,33 @@ type PersistentVolumeSpec struct {
// ClaimRef is expected to be non-nil when bound.
// claim.VolumeName is the authoritative bind between PV and PVC.
ClaimRef *ObjectReference `json:"claimRef,omitempty" description:"when bound, a reference to the bound claim"`
// Optional: what happens to a persistent volume when released from its claim.
PersistentVolumeReclaimPolicy PersistentVolumeReclaimPolicy `json:"persistentVolumeReclaimPolicy,omitempty" description:"what happens to a volume when released from its claim; Valid options are Retain (default) and Recycle. Recyling must be supported by the volume plugin underlying this persistent volume."`
}
// PersistentVolumeReclaimPolicy describes a policy for end-of-life maintenance of persistent volumes
type PersistentVolumeReclaimPolicy string
const (
// PersistentVolumeReclaimRecycle means the volume will be recycled back into the pool of unbound persistent volumes on release from its claim.
// The volume plugin must support Recycling.
PersistentVolumeReclaimRecycle PersistentVolumeReclaimPolicy = "Recycle"
// PersistentVolumeReclaimDelete means the volume will be deleted from Kubernetes on release from its claim.
// The volume plugin must support Deletion.
// TODO: implement w/ DeletableVolumePlugin
// PersistentVolumeReclaimDelete PersistentVolumeReclaimPolicy = "Delete"
// PersistentVolumeReclaimRetain means the volume will left in its current phase (Released) for manual reclamation by the administrator.
// The default policy is Retain.
PersistentVolumeReclaimRetain PersistentVolumeReclaimPolicy = "Retain"
)
type PersistentVolumeStatus struct {
// Phase indicates if a volume is available, bound to a claim, or released by a claim
Phase PersistentVolumePhase `json:"phase,omitempty" description:"the current phase of a persistent volume"`
// A human-readable message indicating details about why the volume is in this state.
Message string `json:"message,omitempty" description:"human-readable message indicating details about why the volume is in this state"`
// Reason is a brief CamelCase string that describes any failure and is meant for machine parsing and tidy display in the CLI
Reason string `json:"reason,omitempty" description:"(brief) reason the volume is not is not available"`
}
type PersistentVolumeList struct {
@ -348,12 +370,16 @@ const (
// used for PersistentVolumes that are not available
VolumePending PersistentVolumePhase = "Pending"
// used for PersistentVolumes that are not yet bound
// Available volumes are held by the binder and matched to PersistentVolumeClaims
VolumeAvailable PersistentVolumePhase = "Available"
// used for PersistentVolumes that are bound
VolumeBound PersistentVolumePhase = "Bound"
// used for PersistentVolumes where the bound PersistentVolumeClaim was deleted
// released volumes must be recycled before becoming available again
// this phase is used by the persistent volume claim binder to signal to another process to reclaim the resource
VolumeReleased PersistentVolumePhase = "Released"
// used for PersistentVolumes that failed to be correctly recycled or deleted after being released from a claim
VolumeFailed PersistentVolumePhase = "Failed"
)
type PersistentVolumeClaimPhase string

View File

@ -148,7 +148,8 @@ func TestPersistentVolumeStatusUpdate(t *testing.T) {
},
},
Status: api.PersistentVolumeStatus{
Phase: api.VolumeBound,
Phase: api.VolumeBound,
Message: "foo",
},
}
c := &testClient{

View File

@ -311,6 +311,8 @@ func (d *PersistentVolumeDescriber) Describe(namespace, name string) (string, er
} else {
fmt.Fprintf(out, "Claim:\t%s\n", "")
}
fmt.Fprintf(out, "Reclaim Policy:\t%d\n", pv.Spec.PersistentVolumeReclaimPolicy)
fmt.Fprintf(out, "Message:\t%d\n", pv.Status.Message)
return nil
})
}

View File

@ -259,7 +259,7 @@ var resourceQuotaColumns = []string{"NAME"}
var namespaceColumns = []string{"NAME", "LABELS", "STATUS"}
var secretColumns = []string{"NAME", "TYPE", "DATA"}
var serviceAccountColumns = []string{"NAME", "SECRETS"}
var persistentVolumeColumns = []string{"NAME", "LABELS", "CAPACITY", "ACCESSMODES", "STATUS", "CLAIM"}
var persistentVolumeColumns = []string{"NAME", "LABELS", "CAPACITY", "ACCESSMODES", "STATUS", "CLAIM", "REASON"}
var persistentVolumeClaimColumns = []string{"NAME", "LABELS", "STATUS", "VOLUME"}
var componentStatusColumns = []string{"NAME", "STATUS", "MESSAGE", "ERROR"}
@ -732,7 +732,7 @@ func printPersistentVolume(pv *api.PersistentVolume, w io.Writer, withNamespace
aQty := pv.Spec.Capacity[api.ResourceStorage]
aSize := aQty.Value()
_, err := fmt.Fprintf(w, "%s\t%s\t%d\t%s\t%s\t%s\n", name, formatLabels(pv.Labels), aSize, modesStr, pv.Status.Phase, claimRefUID)
_, err := fmt.Fprintf(w, "%s\t%s\t%d\t%s\t%s\t%s\t%s\n", name, formatLabels(pv.Labels), aSize, modesStr, pv.Status.Phase, claimRefUID, pv.Status.Reason)
return err
}

View File

@ -59,9 +59,12 @@ func validNewPersistentVolume(name string) *api.PersistentVolume {
PersistentVolumeSource: api.PersistentVolumeSource{
HostPath: &api.HostPathVolumeSource{Path: "/foo"},
},
PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimRetain,
},
Status: api.PersistentVolumeStatus{
Phase: api.VolumePending,
Phase: api.VolumePending,
Message: "bar",
Reason: "foo",
},
}
return pv

View File

@ -508,3 +508,11 @@ func GetClient(req *http.Request) string {
}
return "unknown"
}
func ShortenString(str string, n int) string {
if len(str) <= n {
return str
} else {
return str[:n]
}
}

View File

@ -18,23 +18,35 @@ package host_path
import (
"fmt"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/mount"
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume"
)
// This is the primary entrypoint for volume plugins.
// Tests covering recycling should not use this func but instead
// use their own array of plugins w/ a custom recyclerFunc as appropriate
func ProbeVolumePlugins() []volume.VolumePlugin {
return []volume.VolumePlugin{&hostPathPlugin{nil}}
return []volume.VolumePlugin{&hostPathPlugin{nil, newRecycler}}
}
func ProbeRecyclableVolumePlugins(recyclerFunc func(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error)) []volume.VolumePlugin {
return []volume.VolumePlugin{&hostPathPlugin{nil, recyclerFunc}}
}
type hostPathPlugin struct {
host volume.VolumeHost
// decouple creating recyclers by deferring to a function. Allows for easier testing.
newRecyclerFunc func(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error)
}
var _ volume.VolumePlugin = &hostPathPlugin{}
var _ volume.PersistentVolumePlugin = &hostPathPlugin{}
var _ volume.RecyclableVolumePlugin = &hostPathPlugin{}
const (
hostPathPluginName = "kubernetes.io/host-path"
@ -70,6 +82,18 @@ func (plugin *hostPathPlugin) NewCleaner(volName string, podUID types.UID, _ mou
return &hostPath{""}, nil
}
func (plugin *hostPathPlugin) NewRecycler(spec *volume.Spec) (volume.Recycler, error) {
return plugin.newRecyclerFunc(spec, plugin.host)
}
func newRecycler(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error) {
if spec.VolumeSource.HostPath != nil {
return &hostPathRecycler{spec.Name, spec.VolumeSource.HostPath.Path, host}, nil
} else {
return &hostPathRecycler{spec.Name, spec.PersistentVolumeSource.HostPath.Path, host}, nil
}
}
// HostPath volumes represent a bare host file or directory mount.
// The direct at the specified path will be directly exposed to the container.
type hostPath struct {
@ -99,3 +123,64 @@ func (hp *hostPath) TearDown() error {
func (hp *hostPath) TearDownAt(dir string) error {
return fmt.Errorf("TearDownAt() does not make sense for host paths")
}
// hostPathRecycler scrubs a hostPath volume by running "rm -rf" on the volume in a pod
// This recycler only works on a single host cluster and is for testing purposes only.
type hostPathRecycler struct {
name string
path string
host volume.VolumeHost
}
func (r *hostPathRecycler) GetPath() string {
return r.path
}
// Recycler provides methods to reclaim the volume resource.
// A HostPath is recycled by scheduling a pod to run "rm -rf" on the contents of the volume. This is meant for
// development and testing in a single node cluster only.
// Recycle blocks until the pod has completed or any error occurs.
// The scrubber pod's is expected to succeed within 30 seconds when testing localhost.
func (r *hostPathRecycler) Recycle() error {
timeout := int64(30 * time.Second)
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
GenerateName: "pv-scrubber-" + util.ShortenString(r.name, 44) + "-",
Namespace: api.NamespaceDefault,
},
Spec: api.PodSpec{
ActiveDeadlineSeconds: &timeout,
RestartPolicy: api.RestartPolicyNever,
Volumes: []api.Volume{
{
Name: "vol",
VolumeSource: api.VolumeSource{
HostPath: &api.HostPathVolumeSource{r.path},
},
},
},
Containers: []api.Container{
{
Name: "scrubber",
Image: "busybox",
// delete the contents of the volume, but not the directory itself
Command: []string{"/bin/sh"},
// the scrubber:
// 1. validates the /scrub directory exists
// 2. creates a text file in the directory to be scrubbed
// 3. performs rm -rf on the directory
// 4. tests to see if the directory is empty
// the pod fails if the error code is returned
Args: []string{"-c", "test -e /scrub && echo $(date) > /scrub/trash.txt && rm -rf /scrub/* && test -z \"$(ls -A /scrub)\" || exit 1"},
VolumeMounts: []api.VolumeMount{
{
Name: "vol",
MountPath: "/scrub",
},
},
},
},
},
}
return volume.ScrubPodVolumeAndWatchUntilCompletion(pod, r.host.GetKubeClient())
}

View File

@ -59,6 +59,47 @@ func TestGetAccessModes(t *testing.T) {
}
}
func TestRecycler(t *testing.T) {
plugMgr := volume.VolumePluginMgr{}
plugMgr.InitPlugins([]volume.VolumePlugin{&hostPathPlugin{nil, newMockRecycler}}, volume.NewFakeVolumeHost("/tmp/fake", nil, nil))
spec := &volume.Spec{PersistentVolumeSource: api.PersistentVolumeSource{HostPath: &api.HostPathVolumeSource{Path: "/foo"}}}
plug, err := plugMgr.FindRecyclablePluginBySpec(spec)
if err != nil {
t.Errorf("Can't find the plugin by name")
}
recycler, err := plug.NewRecycler(spec)
if err != nil {
t.Error("Failed to make a new Recyler: %v", err)
}
if recycler.GetPath() != spec.PersistentVolumeSource.HostPath.Path {
t.Errorf("Expected %s but got %s", spec.PersistentVolumeSource.HostPath.Path, recycler.GetPath())
}
if err := recycler.Recycle(); err != nil {
t.Errorf("Mock Recycler expected to return nil but got %s", err)
}
}
func newMockRecycler(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error) {
return &mockRecycler{
path: spec.PersistentVolumeSource.HostPath.Path,
}, nil
}
type mockRecycler struct {
path string
host volume.VolumeHost
}
func (r *mockRecycler) GetPath() string {
return r.path
}
func (r *mockRecycler) Recycle() error {
// return nil means recycle passed
return nil
}
func TestPlugin(t *testing.T) {
plugMgr := volume.VolumePluginMgr{}
plugMgr.InitPlugins(ProbeVolumePlugins(), volume.NewFakeVolumeHost("fake", nil, nil))

View File

@ -19,25 +19,33 @@ package nfs
import (
"fmt"
"os"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/mount"
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume"
"github.com/golang/glog"
)
// This is the primary entrypoint for volume plugins.
// Tests covering recycling should not use this func but instead
// use their own array of plugins w/ a custom recyclerFunc as appropriate
func ProbeVolumePlugins() []volume.VolumePlugin {
return []volume.VolumePlugin{&nfsPlugin{nil}}
return []volume.VolumePlugin{&nfsPlugin{nil, newRecycler}}
}
type nfsPlugin struct {
host volume.VolumeHost
// decouple creating recyclers by deferring to a function. Allows for easier testing.
newRecyclerFunc func(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error)
}
var _ volume.VolumePlugin = &nfsPlugin{}
var _ volume.PersistentVolumePlugin = &nfsPlugin{}
var _ volume.RecyclableVolumePlugin = &nfsPlugin{}
const (
nfsPluginName = "kubernetes.io/nfs"
@ -103,6 +111,28 @@ func (plugin *nfsPlugin) newCleanerInternal(volName string, podUID types.UID, mo
}, nil
}
func (plugin *nfsPlugin) NewRecycler(spec *volume.Spec) (volume.Recycler, error) {
return plugin.newRecyclerFunc(spec, plugin.host)
}
func newRecycler(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error) {
if spec.VolumeSource.HostPath != nil {
return &nfsRecycler{
name: spec.Name,
server: spec.VolumeSource.NFS.Server,
path: spec.VolumeSource.NFS.Path,
host: host,
}, nil
} else {
return &nfsRecycler{
name: spec.Name,
server: spec.PersistentVolumeSource.NFS.Server,
path: spec.PersistentVolumeSource.NFS.Path,
host: host,
}, nil
}
}
// NFS volumes represent a bare host file or directory mount of an NFS export.
type nfs struct {
volName string
@ -112,6 +142,8 @@ type nfs struct {
readOnly bool
mounter mount.Interface
plugin *nfsPlugin
// decouple creating recyclers by deferring to a function. Allows for easier testing.
newRecyclerFunc func(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error)
}
// SetUp attaches the disk and bind mounts to the volume path.
@ -199,3 +231,66 @@ func (nfsVolume *nfs) TearDownAt(dir string) error {
return nil
}
// nfsRecycler scrubs an NFS volume by running "rm -rf" on the volume in a pod.
type nfsRecycler struct {
name string
server string
path string
host volume.VolumeHost
}
func (r *nfsRecycler) GetPath() string {
return r.path
}
// Recycler provides methods to reclaim the volume resource.
// A NFS volume is recycled by scheduling a pod to run "rm -rf" on the contents of the volume.
// Recycle blocks until the pod has completed or any error occurs.
// The scrubber pod's is expected to succeed within 5 minutes else an error will be returned
func (r *nfsRecycler) Recycle() error {
timeout := int64(300 * time.Second) // 5 minutes
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
GenerateName: "pv-scrubber-" + util.ShortenString(r.name, 44) + "-",
Namespace: api.NamespaceDefault,
},
Spec: api.PodSpec{
ActiveDeadlineSeconds: &timeout,
RestartPolicy: api.RestartPolicyNever,
Volumes: []api.Volume{
{
Name: "vol",
VolumeSource: api.VolumeSource{
NFS: &api.NFSVolumeSource{
Server: r.server,
Path: r.path,
},
},
},
},
Containers: []api.Container{
{
Name: "scrubber",
Image: "busybox",
// delete the contents of the volume, but not the directory itself
Command: []string{"/bin/sh"},
// the scrubber:
// 1. validates the /scrub directory exists
// 2. creates a text file to be scrubbed
// 3. performs rm -rf on the directory
// 4. tests to see if the directory is empty
// the pod fails if the error code is returned
Args: []string{"-c", "test -e /scrub && echo $(date) > /scrub/trash.txt && rm -rf /scrub/* && test -z \"$(ls -A /scrub)\" || exit 1"},
VolumeMounts: []api.VolumeMount{
{
Name: "vol",
MountPath: "/scrub",
},
},
},
},
},
}
return volume.ScrubPodVolumeAndWatchUntilCompletion(pod, r.host.GetKubeClient())
}

View File

@ -60,6 +60,47 @@ func TestGetAccessModes(t *testing.T) {
}
}
func TestRecycler(t *testing.T) {
plugMgr := volume.VolumePluginMgr{}
plugMgr.InitPlugins([]volume.VolumePlugin{&nfsPlugin{newRecyclerFunc: newMockRecycler}}, volume.NewFakeVolumeHost("/tmp/fake", nil, nil))
spec := &volume.Spec{PersistentVolumeSource: api.PersistentVolumeSource{NFS: &api.NFSVolumeSource{Path: "/foo"}}}
plug, err := plugMgr.FindRecyclablePluginBySpec(spec)
if err != nil {
t.Errorf("Can't find the plugin by name")
}
recycler, err := plug.NewRecycler(spec)
if err != nil {
t.Error("Failed to make a new Recyler: %v", err)
}
if recycler.GetPath() != spec.PersistentVolumeSource.NFS.Path {
t.Errorf("Expected %s but got %s", spec.PersistentVolumeSource.NFS.Path, recycler.GetPath())
}
if err := recycler.Recycle(); err != nil {
t.Errorf("Mock Recycler expected to return nil but got %s", err)
}
}
func newMockRecycler(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error) {
return &mockRecycler{
path: spec.PersistentVolumeSource.NFS.Path,
}, nil
}
type mockRecycler struct {
path string
host volume.VolumeHost
}
func (r *mockRecycler) GetPath() string {
return r.path
}
func (r *mockRecycler) Recycle() error {
// return nil means recycle passed
return nil
}
func contains(modes []api.PersistentVolumeAccessMode, mode api.PersistentVolumeAccessMode) bool {
for _, m := range modes {
if m == mode {

View File

@ -79,6 +79,15 @@ type PersistentVolumePlugin interface {
GetAccessModes() []api.PersistentVolumeAccessMode
}
// RecyclableVolumePlugin is an extended interface of VolumePlugin and is used
// by persistent volumes that want to be recycled before being made available again to new claims
type RecyclableVolumePlugin interface {
VolumePlugin
// NewRecycler creates a new volume.Recycler which knows how to reclaim this resource
// after the volume's release from a PersistentVolumeClaim
NewRecycler(spec *Spec) (Recycler, error)
}
// VolumeHost is an interface that plugins can use to access the kubelet.
type VolumeHost interface {
// GetPluginDir returns the absolute path to a directory under which
@ -217,7 +226,20 @@ func (pm *VolumePluginMgr) FindPluginByName(name string) (VolumePlugin, error) {
return pm.plugins[matches[0]], nil
}
// FindPluginByName fetches a plugin by name or by legacy name. If no plugin
// FindPersistentPluginBySpec looks for a persistent volume plugin that can support a given volume
// specification. If no plugin is found, return an error
func (pm *VolumePluginMgr) FindPersistentPluginBySpec(spec *Spec) (PersistentVolumePlugin, error) {
volumePlugin, err := pm.FindPluginBySpec(spec)
if err != nil {
return nil, fmt.Errorf("Could not find volume plugin for spec: %+v", spec)
}
if persistentVolumePlugin, ok := volumePlugin.(PersistentVolumePlugin); ok {
return persistentVolumePlugin, nil
}
return nil, fmt.Errorf("no persistent volume plugin matched")
}
// FindPersistentPluginByName fetches a persistent volume plugin by name. If no plugin
// is found, returns error.
func (pm *VolumePluginMgr) FindPersistentPluginByName(name string) (PersistentVolumePlugin, error) {
volumePlugin, err := pm.FindPluginByName(name)
@ -227,5 +249,18 @@ func (pm *VolumePluginMgr) FindPersistentPluginByName(name string) (PersistentVo
if persistentVolumePlugin, ok := volumePlugin.(PersistentVolumePlugin); ok {
return persistentVolumePlugin, nil
}
return nil, fmt.Errorf("no persistent volume plugin matched")
return nil, fmt.Errorf("no persistent volume plugin matched: %+v")
}
// FindRecyclablePluginByName fetches a persistent volume plugin by name. If no plugin
// is found, returns error.
func (pm *VolumePluginMgr) FindRecyclablePluginBySpec(spec *Spec) (RecyclableVolumePlugin, error) {
volumePlugin, err := pm.FindPluginBySpec(spec)
if err != nil {
return nil, err
}
if recyclableVolumePlugin, ok := volumePlugin.(RecyclableVolumePlugin); ok {
return recyclableVolumePlugin, nil
}
return nil, fmt.Errorf("no recyclable volume plugin matched")
}

View File

@ -82,6 +82,7 @@ type FakeVolumePlugin struct {
}
var _ VolumePlugin = &FakeVolumePlugin{}
var _ RecyclableVolumePlugin = &FakeVolumePlugin{}
func (plugin *FakeVolumePlugin) Init(host VolumeHost) {
plugin.Host = host
@ -104,6 +105,10 @@ func (plugin *FakeVolumePlugin) NewCleaner(volName string, podUID types.UID, mou
return &FakeVolume{podUID, volName, plugin}, nil
}
func (plugin *FakeVolumePlugin) NewRecycler(spec *Spec) (Recycler, error) {
return &FakeRecycler{"/attributesTransferredFromSpec"}, nil
}
func (plugin *FakeVolumePlugin) GetAccessModes() []api.PersistentVolumeAccessMode {
return []api.PersistentVolumeAccessMode{}
}
@ -133,3 +138,16 @@ func (fv *FakeVolume) TearDown() error {
func (fv *FakeVolume) TearDownAt(dir string) error {
return os.RemoveAll(dir)
}
type FakeRecycler struct {
path string
}
func (fr *FakeRecycler) Recycle() error {
// nil is success, else error
return nil
}
func (fr *FakeRecycler) GetPath() string {
return fr.path
}

View File

@ -17,7 +17,18 @@ limitations under the License.
package volume
import (
"fmt"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/golang/glog"
)
func GetAccessModesAsString(modes []api.PersistentVolumeAccessMode) string {
@ -51,3 +62,89 @@ func contains(modes []api.PersistentVolumeAccessMode, mode api.PersistentVolumeA
}
return false
}
// ScrubPodVolumeAndWatchUntilCompletion is intended for use with volume Recyclers. This function will
// save the given Pod to the API and watch it until it completes, fails, or the pod's ActiveDeadlineSeconds is exceeded, whichever comes first.
// An attempt to delete a scrubber pod is always attempted before returning.
// pod - the pod designed by a volume plugin to scrub the volume's contents
// client - kube client for API operations.
func ScrubPodVolumeAndWatchUntilCompletion(pod *api.Pod, kubeClient client.Interface) error {
return internalScrubPodVolumeAndWatchUntilCompletion(pod, newScrubberClient(kubeClient))
}
// same as above func comments, except 'scrubberClient' is a narrower pod API interface to ease testing
func internalScrubPodVolumeAndWatchUntilCompletion(pod *api.Pod, scrubberClient scrubberClient) error {
glog.V(5).Infof("Creating scrubber pod for volume %s\n", pod.Name)
pod, err := scrubberClient.CreatePod(pod)
if err != nil {
return fmt.Errorf("Unexpected error creating a pod to scrub volume %s: %+v\n", pod.Name, err)
}
defer scrubberClient.DeletePod(pod.Name, pod.Namespace)
nextPod := scrubberClient.WatchPod(pod.Name, pod.Namespace, pod.ResourceVersion)
for {
watchedPod := nextPod()
if watchedPod.Status.Phase == api.PodSucceeded {
// volume.Recycle() returns nil on success, else error
return nil
}
if watchedPod.Status.Phase == api.PodFailed {
// volume.Recycle() returns nil on success, else error
if watchedPod.Status.Message != "" {
return fmt.Errorf(watchedPod.Status.Message)
} else {
return fmt.Errorf("Pod failed, pod.Status.Message unknown.")
}
}
}
}
// scrubberClient abstracts access to a Pod by providing a narrower interface.
// this makes it easier to mock a client for testing
type scrubberClient interface {
CreatePod(pod *api.Pod) (*api.Pod, error)
GetPod(name, namespace string) (*api.Pod, error)
DeletePod(name, namespace string) error
WatchPod(name, namespace, resourceVersion string) func() *api.Pod
}
func newScrubberClient(client client.Interface) scrubberClient {
return &realScrubberClient{client}
}
type realScrubberClient struct {
client client.Interface
}
func (c *realScrubberClient) CreatePod(pod *api.Pod) (*api.Pod, error) {
return c.client.Pods(pod.Namespace).Create(pod)
}
func (c *realScrubberClient) GetPod(name, namespace string) (*api.Pod, error) {
return c.client.Pods(namespace).Get(name)
}
func (c *realScrubberClient) DeletePod(name, namespace string) error {
return c.client.Pods(namespace).Delete(name, nil)
}
func (c *realScrubberClient) WatchPod(name, namespace, resourceVersion string) func() *api.Pod {
fieldSelector, _ := fields.ParseSelector("metadata.name=" + name)
podLW := &cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return c.client.Pods(namespace).List(labels.Everything(), fieldSelector)
},
WatchFunc: func(resourceVersion string) (watch.Interface, error) {
return c.client.Pods(namespace).Watch(labels.Everything(), fieldSelector, resourceVersion)
},
}
queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc)
cache.NewReflector(podLW, &api.Pod{}, queue, 1*time.Minute).Run()
return func() *api.Pod {
obj := queue.Pop()
return obj.(*api.Pod)
}
}

102
pkg/volume/util_test.go Normal file
View File

@ -0,0 +1,102 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package volume
import (
"fmt"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"strings"
)
func TestScrubberSuccess(t *testing.T) {
client := &mockScrubberClient{}
scrubber := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "scrubber-test",
Namespace: api.NamespaceDefault,
},
Status: api.PodStatus{
Phase: api.PodSucceeded,
},
}
err := internalScrubPodVolumeAndWatchUntilCompletion(scrubber, client)
if err != nil {
t.Errorf("Unexpected error watching scrubber pod: %+v", err)
}
if !client.deletedCalled {
t.Errorf("Expected deferred client.Delete to be called on scrubber pod")
}
}
func TestScrubberFailure(t *testing.T) {
client := &mockScrubberClient{}
scrubber := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "scrubber-test",
Namespace: api.NamespaceDefault,
},
Status: api.PodStatus{
Phase: api.PodFailed,
Message: "foo",
},
}
err := internalScrubPodVolumeAndWatchUntilCompletion(scrubber, client)
if err == nil {
t.Fatalf("Expected pod failure but got nil error returned")
}
if err != nil {
if !strings.Contains(err.Error(), "foo") {
t.Errorf("Expected pod.Status.Message %s but got %s", scrubber.Status.Message, err)
}
}
if !client.deletedCalled {
t.Errorf("Expected deferred client.Delete to be called on scrubber pod")
}
}
type mockScrubberClient struct {
pod *api.Pod
deletedCalled bool
}
func (c *mockScrubberClient) CreatePod(pod *api.Pod) (*api.Pod, error) {
c.pod = pod
return c.pod, nil
}
func (c *mockScrubberClient) GetPod(name, namespace string) (*api.Pod, error) {
if c.pod != nil {
return c.pod, nil
} else {
return nil, fmt.Errorf("pod does not exist")
}
}
func (c *mockScrubberClient) DeletePod(name, namespace string) error {
c.deletedCalled = true
return nil
}
func (c *mockScrubberClient) WatchPod(name, namespace, resourceVersion string) func() *api.Pod {
return func() *api.Pod {
return c.pod
}
}

View File

@ -29,7 +29,7 @@ type Volume interface {
GetPath() string
}
// Builder interface provides method to set up/mount the volume.
// Builder interface provides methods to set up/mount the volume.
type Builder interface {
// Uses Interface to provide the path for Docker binds.
Volume
@ -43,7 +43,7 @@ type Builder interface {
SetUpAt(dir string) error
}
// Cleaner interface provides method to cleanup/unmount the volumes.
// Cleaner interface provides methods to cleanup/unmount the volumes.
type Cleaner interface {
Volume
// TearDown unmounts the volume from a self-determined directory and
@ -54,6 +54,14 @@ type Cleaner interface {
TearDownAt(dir string) error
}
// Recycler provides methods to reclaim the volume resource.
type Recycler interface {
Volume
// Recycle reclaims the resource. Calls to this method should block until the recycling task is complete.
// Any error returned indicates the volume has failed to be reclaimed. A nil return indicates success.
Recycle() error
}
func RenameDirectory(oldPath, newName string) (string, error) {
newPath, err := ioutil.TempDir(path.Dir(oldPath), newName)
if err != nil {

View File

@ -99,7 +99,10 @@ func (binder *PersistentVolumeClaimBinder) addVolume(obj interface{}) {
binder.lock.Lock()
defer binder.lock.Unlock()
volume := obj.(*api.PersistentVolume)
syncVolume(binder.volumeIndex, binder.client, volume)
err := syncVolume(binder.volumeIndex, binder.client, volume)
if err != nil {
glog.Errorf("PVClaimBinder could not add volume %s: %+v", volume.Name, err)
}
}
func (binder *PersistentVolumeClaimBinder) updateVolume(oldObj, newObj interface{}) {
@ -107,7 +110,10 @@ func (binder *PersistentVolumeClaimBinder) updateVolume(oldObj, newObj interface
defer binder.lock.Unlock()
newVolume := newObj.(*api.PersistentVolume)
binder.volumeIndex.Update(newVolume)
syncVolume(binder.volumeIndex, binder.client, newVolume)
err := syncVolume(binder.volumeIndex, binder.client, newVolume)
if err != nil {
glog.Errorf("PVClaimBinder could not update volume %s: %+v", newVolume.Name, err)
}
}
func (binder *PersistentVolumeClaimBinder) deleteVolume(obj interface{}) {
@ -121,18 +127,24 @@ func (binder *PersistentVolumeClaimBinder) addClaim(obj interface{}) {
binder.lock.Lock()
defer binder.lock.Unlock()
claim := obj.(*api.PersistentVolumeClaim)
syncClaim(binder.volumeIndex, binder.client, claim)
err := syncClaim(binder.volumeIndex, binder.client, claim)
if err != nil {
glog.Errorf("PVClaimBinder could not add claim %s: %+v", claim.Name, err)
}
}
func (binder *PersistentVolumeClaimBinder) updateClaim(oldObj, newObj interface{}) {
binder.lock.Lock()
defer binder.lock.Unlock()
newClaim := newObj.(*api.PersistentVolumeClaim)
syncClaim(binder.volumeIndex, binder.client, newClaim)
err := syncClaim(binder.volumeIndex, binder.client, newClaim)
if err != nil {
glog.Errorf("PVClaimBinder could not update claim %s: %+v", newClaim.Name, err)
}
}
func syncVolume(volumeIndex *persistentVolumeOrderedIndex, binderClient binderClient, volume *api.PersistentVolume) (err error) {
glog.V(5).Infof("Synchronizing PersistentVolume[%s]\n", volume.Name)
glog.V(5).Infof("Synchronizing PersistentVolume[%s], current phase: %s\n", volume.Name, volume.Status.Phase)
// volumes can be in one of the following states:
//
@ -140,12 +152,28 @@ func syncVolume(volumeIndex *persistentVolumeOrderedIndex, binderClient binderCl
// VolumeAvailable -- not bound to a claim, but processed at least once and found in this controller's volumeIndex.
// VolumeBound -- bound to a claim because volume.Spec.ClaimRef != nil. Claim status may not be correct.
// VolumeReleased -- volume.Spec.ClaimRef != nil but the claim has been deleted by the user.
// VolumeFailed -- volume.Spec.ClaimRef != nil and the volume failed processing in the recycler
currentPhase := volume.Status.Phase
nextPhase := currentPhase
switch currentPhase {
// pending volumes are available only after indexing in order to be matched to claims.
case api.VolumePending:
if volume.Spec.ClaimRef != nil {
// Pending volumes that have a ClaimRef were recently recycled. The Recycler set the phase to VolumePending
// to start the volume again at the beginning of this lifecycle.
// ClaimRef is the last bind between persistent volume and claim.
// The claim has already been deleted by the user at this point
oldClaimRef := volume.Spec.ClaimRef
volume.Spec.ClaimRef = nil
_, err = binderClient.UpdatePersistentVolume(volume)
if err != nil {
// rollback on error, keep the ClaimRef until we can successfully update the volume
volume.Spec.ClaimRef = oldClaimRef
return fmt.Errorf("Unexpected error saving PersistentVolume: %+v", err)
}
}
_, exists, err := volumeIndex.Get(volume)
if err != nil {
return err
@ -178,10 +206,11 @@ func syncVolume(volumeIndex *persistentVolumeOrderedIndex, binderClient binderCl
}
}
}
//bound volumes require verification of their bound claims
case api.VolumeBound:
if volume.Spec.ClaimRef == nil {
return fmt.Errorf("PersistentVolume[%s] expected to be bound but found nil claimRef: %+v", volume)
return fmt.Errorf("PersistentVolume[%s] expected to be bound but found nil claimRef: %+v", volume.Name, volume)
} else {
_, err := binderClient.GetPersistentVolumeClaim(volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name)
if err != nil {
@ -192,12 +221,22 @@ func syncVolume(volumeIndex *persistentVolumeOrderedIndex, binderClient binderCl
}
}
}
// released volumes require recycling
case api.VolumeReleased:
if volume.Spec.ClaimRef == nil {
return fmt.Errorf("PersistentVolume[%s] expected to be bound but found nil claimRef: %+v", volume.Name, volume)
} else {
// another process is watching for released volumes.
// PersistentVolumeReclaimPolicy is set per PersistentVolume
}
// volumes are removed by processes external to this binder and must be removed from the cluster
case api.VolumeFailed:
if volume.Spec.ClaimRef == nil {
return fmt.Errorf("PersistentVolume[%s] expected to be bound but found nil claimRef: %+v", volume)
} else {
// TODO: implement Recycle method on plugins
glog.V(5).Infof("PersistentVolume[%s] previously failed recycling. Skipping.\n", volume.Name)
}
}
@ -206,6 +245,7 @@ func syncVolume(volumeIndex *persistentVolumeOrderedIndex, binderClient binderCl
// a change in state will trigger another update through this controller.
// each pass through this controller evaluates current phase and decides whether or not to change to the next phase
glog.V(5).Infof("PersistentVolume[%s] changing phase from %s to %s\n", volume.Name, currentPhase, nextPhase)
volume, err := binderClient.UpdatePersistentVolumeStatus(volume)
if err != nil {
// Rollback to previous phase
@ -262,6 +302,7 @@ func syncClaim(volumeIndex *persistentVolumeOrderedIndex, binderClient binderCli
}
if volume.Spec.ClaimRef == nil {
glog.V(5).Infof("Rebuilding bind on pv.Spec.ClaimRef\n")
claimRef, err := api.GetReference(claim)
if err != nil {
return fmt.Errorf("Unexpected error getting claim reference: %v\n", err)
@ -326,6 +367,7 @@ func (controller *PersistentVolumeClaimBinder) Stop() {
type binderClient interface {
GetPersistentVolume(name string) (*api.PersistentVolume, error)
UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error)
DeletePersistentVolume(volume *api.PersistentVolume) error
UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error)
GetPersistentVolumeClaim(namespace, name string) (*api.PersistentVolumeClaim, error)
UpdatePersistentVolumeClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error)
@ -348,6 +390,10 @@ func (c *realBinderClient) UpdatePersistentVolume(volume *api.PersistentVolume)
return c.client.PersistentVolumes().Update(volume)
}
func (c *realBinderClient) DeletePersistentVolume(volume *api.PersistentVolume) error {
return c.client.PersistentVolumes().Delete(volume.Name)
}
func (c *realBinderClient) UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
return c.client.PersistentVolumes().UpdateStatus(volume)
}

View File

@ -26,6 +26,8 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/testclient"
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume"
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume/host_path"
)
func TestRunStop(t *testing.T) {
@ -98,13 +100,14 @@ func TestExampleObjects(t *testing.T) {
Spec: api.PersistentVolumeSpec{
AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce},
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("5Gi"),
api.ResourceName(api.ResourceStorage): resource.MustParse("8Gi"),
},
PersistentVolumeSource: api.PersistentVolumeSource{
HostPath: &api.HostPathVolumeSource{
Path: "/tmp/data02",
},
},
PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimRecycle,
},
},
},
@ -179,6 +182,7 @@ func TestBindingWithExamples(t *testing.T) {
client := &testclient.Fake{ReactFn: testclient.ObjectReaction(o, latest.RESTMapper)}
pv, err := client.PersistentVolumes().Get("any")
pv.Spec.PersistentVolumeReclaimPolicy = api.PersistentVolumeReclaimRecycle
if err != nil {
t.Error("Unexpected error getting PV from client: %v", err)
}
@ -194,6 +198,15 @@ func TestBindingWithExamples(t *testing.T) {
claim: claim,
}
plugMgr := volume.VolumePluginMgr{}
plugMgr.InitPlugins(host_path.ProbeRecyclableVolumePlugins(newMockRecycler), volume.NewFakeVolumeHost("/tmp/fake", nil, nil))
recycler := &PersistentVolumeRecycler{
kubeClient: client,
client: mockClient,
pluginMgr: plugMgr,
}
// adds the volume to the index, making the volume available
syncVolume(volumeIndex, mockClient, pv)
if pv.Status.Phase != api.VolumeAvailable {
@ -232,6 +245,31 @@ func TestBindingWithExamples(t *testing.T) {
if pv.Status.Phase != api.VolumeReleased {
t.Errorf("Expected phase %s but got %s", api.VolumeReleased, pv.Status.Phase)
}
if pv.Spec.ClaimRef == nil {
t.Errorf("Expected non-nil ClaimRef: %+v", pv.Spec)
}
mockClient.volume = pv
// released volumes with a PersistentVolumeReclaimPolicy (recycle/delete) can have further processing
err = recycler.reclaimVolume(pv)
if err != nil {
t.Errorf("Unexpected error reclaiming volume: %+v", err)
}
if pv.Status.Phase != api.VolumePending {
t.Errorf("Expected phase %s but got %s", api.VolumePending, pv.Status.Phase)
}
// after the recycling changes the phase to Pending, the binder picks up again
// to remove any vestiges of binding and make the volume Available again
syncVolume(volumeIndex, mockClient, pv)
if pv.Status.Phase != api.VolumeAvailable {
t.Errorf("Expected phase %s but got %s", api.VolumeAvailable, pv.Status.Phase)
}
if pv.Spec.ClaimRef != nil {
t.Errorf("Expected nil ClaimRef: %+v", pv.Spec)
}
}
func TestMissingFromIndex(t *testing.T) {
@ -324,6 +362,11 @@ func (c *mockBinderClient) UpdatePersistentVolume(volume *api.PersistentVolume)
return volume, nil
}
func (c *mockBinderClient) DeletePersistentVolume(volume *api.PersistentVolume) error {
c.volume = nil
return nil
}
func (c *mockBinderClient) UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
return volume, nil
}
@ -343,3 +386,23 @@ func (c *mockBinderClient) UpdatePersistentVolumeClaim(claim *api.PersistentVolu
func (c *mockBinderClient) UpdatePersistentVolumeClaimStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) {
return claim, nil
}
func newMockRecycler(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error) {
return &mockRecycler{
path: spec.PersistentVolumeSource.HostPath.Path,
}, nil
}
type mockRecycler struct {
path string
host volume.VolumeHost
}
func (r *mockRecycler) GetPath() string {
return r.path
}
func (r *mockRecycler) Recycle() error {
// return nil means recycle passed
return nil
}

View File

@ -0,0 +1,231 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package volumeclaimbinder
import (
"fmt"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/mount"
"github.com/golang/glog"
)
// PersistentVolumeRecycler is a controller that watches for PersistentVolumes that are released from their claims.
// This controller will Recycle those volumes whose reclaim policy is set to PersistentVolumeReclaimRecycle and make them
// available again for a new claim.
type PersistentVolumeRecycler struct {
volumeController *framework.Controller
stopChannel chan struct{}
client recyclerClient
kubeClient client.Interface
pluginMgr volume.VolumePluginMgr
}
// PersistentVolumeRecycler creates a new PersistentVolumeRecycler
func NewPersistentVolumeRecycler(kubeClient client.Interface, syncPeriod time.Duration, plugins []volume.VolumePlugin) (*PersistentVolumeRecycler, error) {
recyclerClient := NewRecyclerClient(kubeClient)
recycler := &PersistentVolumeRecycler{
client: recyclerClient,
kubeClient: kubeClient,
}
if err := recycler.pluginMgr.InitPlugins(plugins, recycler); err != nil {
return nil, fmt.Errorf("Could not initialize volume plugins for PVClaimBinder: %+v", err)
}
_, volumeController := framework.NewInformer(
&cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return kubeClient.PersistentVolumes().List(labels.Everything(), fields.Everything())
},
WatchFunc: func(resourceVersion string) (watch.Interface, error) {
return kubeClient.PersistentVolumes().Watch(labels.Everything(), fields.Everything(), resourceVersion)
},
},
&api.PersistentVolume{},
syncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pv := obj.(*api.PersistentVolume)
recycler.reclaimVolume(pv)
},
UpdateFunc: func(oldObj, newObj interface{}) {
pv := newObj.(*api.PersistentVolume)
recycler.reclaimVolume(pv)
},
},
)
recycler.volumeController = volumeController
return recycler, nil
}
func (recycler *PersistentVolumeRecycler) reclaimVolume(pv *api.PersistentVolume) error {
if pv.Status.Phase == api.VolumeReleased && pv.Spec.ClaimRef != nil {
glog.V(5).Infof("Reclaiming PersistentVolume[%s]\n", pv.Name)
latest, err := recycler.client.GetPersistentVolume(pv.Name)
if err != nil {
return fmt.Errorf("Could not find PersistentVolume %s", pv.Name)
}
if latest.Status.Phase != api.VolumeReleased {
return fmt.Errorf("PersistentVolume[%s] phase is %s, expected %s. Skipping.", pv.Name, latest.Status.Phase, api.VolumeReleased)
}
// handleRecycle blocks until completion
// TODO: allow parallel recycling operations to increase throughput
// TODO implement handleDelete in a separate PR w/ cloud volumes
switch pv.Spec.PersistentVolumeReclaimPolicy {
case api.PersistentVolumeReclaimRecycle:
err = recycler.handleRecycle(pv)
case api.PersistentVolumeReclaimRetain:
glog.V(5).Infof("Volume %s is set to retain after release. Skipping.\n", pv.Name)
default:
err = fmt.Errorf("No PersistentVolumeReclaimPolicy defined for spec: %+v", pv)
}
if err != nil {
errMsg := fmt.Sprintf("Could not recycle volume spec: %+v", err)
glog.Errorf(errMsg)
return fmt.Errorf(errMsg)
}
}
return nil
}
func (recycler *PersistentVolumeRecycler) handleRecycle(pv *api.PersistentVolume) error {
glog.V(5).Infof("Recycling PersistentVolume[%s]\n", pv.Name)
currentPhase := pv.Status.Phase
nextPhase := currentPhase
spec := volume.NewSpecFromPersistentVolume(pv)
plugin, err := recycler.pluginMgr.FindRecyclablePluginBySpec(spec)
if err != nil {
return fmt.Errorf("Could not find recyclable volume plugin for spec: %+v", err)
}
volRecycler, err := plugin.NewRecycler(spec)
if err != nil {
return fmt.Errorf("Could not obtain Recycler for spec: %+v", err)
}
// blocks until completion
err = volRecycler.Recycle()
if err != nil {
glog.Errorf("PersistentVolume[%s] failed recycling: %+v", err)
pv.Status.Message = fmt.Sprintf("Recycling error: %s", err)
nextPhase = api.VolumeFailed
} else {
glog.V(5).Infof("PersistentVolume[%s] successfully recycled\n", pv.Name)
nextPhase = api.VolumePending
if err != nil {
glog.Errorf("Error updating pv.Status: %+v", err)
}
}
if currentPhase != nextPhase {
glog.V(5).Infof("PersistentVolume[%s] changing phase from %s to %s\n", pv.Name, currentPhase, nextPhase)
pv.Status.Phase = nextPhase
_, err := recycler.client.UpdatePersistentVolumeStatus(pv)
if err != nil {
// Rollback to previous phase
pv.Status.Phase = currentPhase
}
}
return nil
}
// Run starts this recycler's control loops
func (recycler *PersistentVolumeRecycler) Run() {
glog.V(5).Infof("Starting PersistentVolumeRecycler\n")
if recycler.stopChannel == nil {
recycler.stopChannel = make(chan struct{})
go recycler.volumeController.Run(recycler.stopChannel)
}
}
// Stop gracefully shuts down this binder
func (recycler *PersistentVolumeRecycler) Stop() {
glog.V(5).Infof("Stopping PersistentVolumeRecycler\n")
if recycler.stopChannel != nil {
close(recycler.stopChannel)
recycler.stopChannel = nil
}
}
// recyclerClient abstracts access to PVs
type recyclerClient interface {
GetPersistentVolume(name string) (*api.PersistentVolume, error)
UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error)
UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error)
}
func NewRecyclerClient(c client.Interface) recyclerClient {
return &realRecyclerClient{c}
}
type realRecyclerClient struct {
client client.Interface
}
func (c *realRecyclerClient) GetPersistentVolume(name string) (*api.PersistentVolume, error) {
return c.client.PersistentVolumes().Get(name)
}
func (c *realRecyclerClient) UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
return c.client.PersistentVolumes().Update(volume)
}
func (c *realRecyclerClient) UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
return c.client.PersistentVolumes().UpdateStatus(volume)
}
// PersistentVolumeRecycler is host to the volume plugins, but does not actually mount any volumes.
// Because no mounting is performed, most of the VolumeHost methods are not implemented.
func (f *PersistentVolumeRecycler) GetPluginDir(podUID string) string {
return ""
}
func (f *PersistentVolumeRecycler) GetPodVolumeDir(podUID types.UID, pluginName, volumeName string) string {
return ""
}
func (f *PersistentVolumeRecycler) GetPodPluginDir(podUID types.UID, pluginName string) string {
return ""
}
func (f *PersistentVolumeRecycler) GetKubeClient() client.Interface {
return f.kubeClient
}
func (f *PersistentVolumeRecycler) NewWrapperBuilder(spec *volume.Spec, pod *api.Pod, opts volume.VolumeOptions, mounter mount.Interface) (volume.Builder, error) {
return nil, fmt.Errorf("NewWrapperBuilder not supported by PVClaimBinder's VolumeHost implementation")
}
func (f *PersistentVolumeRecycler) NewWrapperCleaner(spec *volume.Spec, podUID types.UID, mounter mount.Interface) (volume.Cleaner, error) {
return nil, fmt.Errorf("NewWrapperCleaner not supported by PVClaimBinder's VolumeHost implementation")
}

View File

@ -0,0 +1,200 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e
import (
"fmt"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"time"
)
// Marked with [Skipped] to skip the test by default (see driver.go),
// the test needs privileged containers, which are disabled by default.
// Run the test with "go run hack/e2e.go ... --ginkgo.focus=PersistentVolume"
var _ = Describe("[Skipped] persistentVolumes", func() {
var c *client.Client
var ns string
BeforeEach(func() {
var err error
c, err = loadClient()
Expect(err).NotTo(HaveOccurred())
ns_, err := createTestingNS("pv", c)
ns = ns_.Name
Expect(err).NotTo(HaveOccurred())
})
AfterEach(func() {
By(fmt.Sprintf("Destroying namespace for this suite %v", ns))
if err := c.Namespaces().Delete(ns); err != nil {
Failf("Couldn't delete ns %s", err)
}
})
It("PersistentVolume", func() {
config := VolumeTestConfig{
namespace: ns,
prefix: "nfs",
serverImage: "gcr.io/google_containers/volume-nfs",
serverPorts: []int{2049},
}
defer func() {
volumeTestCleanup(c, config)
}()
pod := startVolumeServer(c, config)
serverIP := pod.Status.PodIP
Logf("NFS server IP address: %v", serverIP)
pv := makePersistentVolume(serverIP)
pvc := makePersistentVolumeClaim(ns)
Logf("Creating PersistentVolume using NFS")
pv, err := c.PersistentVolumes().Create(pv)
Expect(err).NotTo(HaveOccurred())
Logf("Creating PersistentVolumeClaim")
pvc, err = c.PersistentVolumeClaims(ns).Create(pvc)
Expect(err).NotTo(HaveOccurred())
// allow the binder a chance to catch up. should not be more than 20s.
waitForPersistentVolumePhase(api.VolumeBound, c, pv.Name, 1*time.Second, 30*time.Second)
pv, err = c.PersistentVolumes().Get(pv.Name)
Expect(err).NotTo(HaveOccurred())
if pv.Spec.ClaimRef == nil {
Failf("Expected PersistentVolume to be bound, but got nil ClaimRef: %+v", pv)
}
Logf("Deleting PersistentVolumeClaim to trigger PV Recycling")
err = c.PersistentVolumeClaims(ns).Delete(pvc.Name)
Expect(err).NotTo(HaveOccurred())
// allow the recycler a chance to catch up. it has to perform NFS scrub, which can be slow in e2e.
waitForPersistentVolumePhase(api.VolumeAvailable, c, pv.Name, 5*time.Second, 300*time.Second)
pv, err = c.PersistentVolumes().Get(pv.Name)
Expect(err).NotTo(HaveOccurred())
if pv.Spec.ClaimRef != nil {
Failf("Expected PersistentVolume to be unbound, but found non-nil ClaimRef: %+v", pv)
}
// The NFS Server pod we're using contains an index.html file
// Verify the file was really scrubbed from the volume
podTemplate := makeCheckPod(ns, serverIP)
checkpod, err := c.Pods(ns).Create(podTemplate)
expectNoError(err, "Failed to create checker pod: %v", err)
err = waitForPodSuccessInNamespace(c, checkpod.Name, checkpod.Spec.Containers[0].Name, checkpod.Namespace)
Expect(err).NotTo(HaveOccurred())
})
})
func makePersistentVolume(serverIP string) *api.PersistentVolume {
return &api.PersistentVolume{
ObjectMeta: api.ObjectMeta{
GenerateName: "nfs-",
},
Spec: api.PersistentVolumeSpec{
PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimRecycle,
Capacity: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("2Gi"),
},
PersistentVolumeSource: api.PersistentVolumeSource{
NFS: &api.NFSVolumeSource{
Server: serverIP,
Path: "/",
ReadOnly: false,
},
},
AccessModes: []api.PersistentVolumeAccessMode{
api.ReadWriteOnce,
api.ReadOnlyMany,
api.ReadWriteMany,
},
},
}
}
func makePersistentVolumeClaim(ns string) *api.PersistentVolumeClaim {
return &api.PersistentVolumeClaim{
ObjectMeta: api.ObjectMeta{
GenerateName: "pvc-",
Namespace: ns,
},
Spec: api.PersistentVolumeClaimSpec{
AccessModes: []api.PersistentVolumeAccessMode{
api.ReadWriteOnce,
api.ReadOnlyMany,
api.ReadWriteMany,
},
Resources: api.ResourceRequirements{
Requests: api.ResourceList{
api.ResourceName(api.ResourceStorage): resource.MustParse("1Gi"),
},
},
},
}
}
func makeCheckPod(ns string, nfsserver string) *api.Pod {
// Prepare pod that mounts the NFS volume again and
// checks that /mnt/index.html was scrubbed there
return &api.Pod{
TypeMeta: api.TypeMeta{
Kind: "Pod",
APIVersion: "v1beta3",
},
ObjectMeta: api.ObjectMeta{
GenerateName: "checker-",
Namespace: ns,
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "scrub-checker",
Image: "busybox",
Command: []string{"/bin/sh"},
Args: []string{"-c", "test ! -e /mnt/index.html || exit 1"},
VolumeMounts: []api.VolumeMount{
{
Name: "nfs-volume",
MountPath: "/mnt",
},
},
},
},
Volumes: []api.Volume{
{
Name: "nfs-volume",
VolumeSource: api.VolumeSource{
NFS: &api.NFSVolumeSource{
Server: nfsserver,
Path: "/",
},
},
},
},
},
}
}

View File

@ -300,6 +300,26 @@ func waitForDefaultServiceAccountInNamespace(c *client.Client, namespace string)
return waitForServiceAccountInNamespace(c, namespace, "default", serviceAccountPoll, serviceAccountProvisionTimeout)
}
// waitForPersistentVolumePhase waits for a PersistentVolume to be in a specific phase or until timeout occurs, whichever comes first.
func waitForPersistentVolumePhase(phase api.PersistentVolumePhase, c *client.Client, pvName string, poll, timeout time.Duration) error {
Logf("Waiting up to %v for PersistentVolume %s to have phase %s", timeout, pvName, phase)
for start := time.Now(); time.Since(start) < timeout; time.Sleep(poll) {
pv, err := c.PersistentVolumes().Get(pvName)
if err != nil {
Logf("Get persistent volume %s in failed, ignoring for %v: %v", pvName, poll, err)
continue
} else {
if pv.Status.Phase == phase {
Logf("PersistentVolume %s found and phase=%s (%v)", pvName, phase, time.Since(start))
return nil
} else {
Logf("PersistentVolume %s found but phase is %s instead of %s.", pvName, pv.Status.Phase, phase)
}
}
}
return fmt.Errorf("PersistentVolume %s not in phase %s within %v", pvName, phase, timeout)
}
// createNS should be used by every test, note that we append a common prefix to the provided test name.
// Please see NewFramework instead of using this directly.
func createTestingNS(baseName string, c *client.Client) (*api.Namespace, error) {