Retry recycle or delete operation on failure.

Recycle controller tries to recycle or delete a PV several times.
It stores count of failed attempts and timestamp of the last attempt in
annotations of the PV.

By default, the controller tries to recycle/delete a PV 3 times in
10 minutes interval. These values are configurable by
kube-controller-manager --pv-recycler-maximum-retry=X --pvclaimbinder-sync-period=Y
arguments.
pull/6/head
Jan Safranek 2016-02-05 17:02:13 +01:00
parent faa0fc3d8c
commit 76b6449715
8 changed files with 332 additions and 46 deletions

View File

@ -306,7 +306,9 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
pvRecycler, err := persistentvolumecontroller.NewPersistentVolumeRecycler(
clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "persistent-volume-recycler")),
s.PersistentVolumeControllerOptions.PVClaimBinderSyncPeriod, ProbeRecyclableVolumePlugins(s.PersistentVolumeControllerOptions.VolumeConfigFlags),
s.PersistentVolumeControllerOptions.PVClaimBinderSyncPeriod,
s.PersistentVolumeControllerOptions.VolumeConfigFlags.PersistentVolumeRecyclerMaximumRetry,
ProbeRecyclableVolumePlugins(s.PersistentVolumeControllerOptions.VolumeConfigFlags),
cloud,
)
if err != nil {

View File

@ -256,6 +256,7 @@ func (s *CMServer) Run(_ []string) error {
pvRecycler, err := persistentvolumecontroller.NewPersistentVolumeRecycler(
clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "persistent-volume-recycler")),
s.PersistentVolumeControllerOptions.PVClaimBinderSyncPeriod,
s.PersistentVolumeControllerOptions.VolumeConfigFlags.PersistentVolumeRecyclerMaximumRetry,
kubecontrollermanager.ProbeRecyclableVolumePlugins(s.PersistentVolumeControllerOptions.VolumeConfigFlags), cloud)
if err != nil {
glog.Fatalf("Failed to start persistent volume recycler: %+v", err)

View File

@ -91,6 +91,7 @@ kube-controller-manager
--port=10252: The port that the controller-manager's http service runs on
--profiling[=true]: Enable profiling via web interface host:port/debug/pprof/
--pv-recycler-increment-timeout-nfs=30: the increment of time added per Gi to ActiveDeadlineSeconds for an NFS scrubber pod
--pv-recycler-maximum-retry=3: Maximum number of attempts to recycle or delete a persistent volume
--pv-recycler-minimum-timeout-hostpath=60: The minimum ActiveDeadlineSeconds to use for a HostPath Recycler pod. This is for development and testing only and will not work in a multi-node cluster.
--pv-recycler-minimum-timeout-nfs=300: The minimum ActiveDeadlineSeconds to use for an NFS Recycler pod
--pv-recycler-pod-template-filepath-hostpath="": The file path to a pod definition used as a template for HostPath persistent volume recycling. This is for development and testing only and will not work in a multi-node cluster.

View File

@ -279,6 +279,7 @@ pv-recycler-minimum-timeout-hostpath
pv-recycler-minimum-timeout-nfs
pv-recycler-pod-template-filepath-hostpath
pv-recycler-pod-template-filepath-nfs
pv-recycler-maximum-retry
pv-recycler-timeout-increment-hostpath
pvclaimbinder-sync-period
read-only-port

View File

@ -27,6 +27,7 @@ import (
// of volume.VolumeConfig which are then passed to the appropriate plugin. The ControllerManager binary is the only
// part of the code which knows what plugins are supported and which CLI flags correspond to each plugin.
type VolumeConfigFlags struct {
PersistentVolumeRecyclerMaximumRetry int
PersistentVolumeRecyclerMinimumTimeoutNFS int
PersistentVolumeRecyclerPodTemplateFilePathNFS string
PersistentVolumeRecyclerIncrementTimeoutNFS int
@ -46,6 +47,7 @@ func NewPersistentVolumeControllerOptions() PersistentVolumeControllerOptions {
PVClaimBinderSyncPeriod: 10 * time.Minute,
VolumeConfigFlags: VolumeConfigFlags{
// default values here
PersistentVolumeRecyclerMaximumRetry: 3,
PersistentVolumeRecyclerMinimumTimeoutNFS: 300,
PersistentVolumeRecyclerIncrementTimeoutNFS: 30,
PersistentVolumeRecyclerMinimumTimeoutHostPath: 60,
@ -76,6 +78,9 @@ func (o *PersistentVolumeControllerOptions) AddFlags(fs *pflag.FlagSet) {
o.VolumeConfigFlags.PersistentVolumeRecyclerIncrementTimeoutHostPath,
"the increment of time added per Gi to ActiveDeadlineSeconds for a HostPath scrubber pod. "+
"This is for development and testing only and will not work in a multi-node cluster.")
fs.IntVar(&o.VolumeConfigFlags.PersistentVolumeRecyclerMaximumRetry, "pv-recycler-maximum-retry",
o.VolumeConfigFlags.PersistentVolumeRecyclerMaximumRetry,
"Maximum number of attempts to recycle or delete a persistent volume")
fs.BoolVar(&o.VolumeConfigFlags.EnableHostPathProvisioning, "enable-hostpath-provisioner", o.VolumeConfigFlags.EnableHostPathProvisioning,
"Enable HostPath PV provisioning when running without a cloud provider. This allows testing and development of provisioning features. "+
"HostPath provisioning is not supported in any way, won't work in a multi-node cluster, and should not be used for anything other than testing or development.")

View File

@ -46,15 +46,33 @@ type PersistentVolumeRecycler struct {
kubeClient clientset.Interface
pluginMgr volume.VolumePluginMgr
cloud cloudprovider.Interface
maximumRetry int
syncPeriod time.Duration
// Local cache of failed recycle / delete operations. Map volume.Name -> status of the volume.
// Only PVs in Released state have an entry here.
releasedVolumes map[string]releasedVolumeStatus
}
// PersistentVolumeRecycler creates a new PersistentVolumeRecycler
func NewPersistentVolumeRecycler(kubeClient clientset.Interface, syncPeriod time.Duration, plugins []volume.VolumePlugin, cloud cloudprovider.Interface) (*PersistentVolumeRecycler, error) {
// releasedVolumeStatus holds state of failed delete/recycle operation on a
// volume. The controller re-tries the operation several times and it stores
// retry count + timestamp of the last attempt here.
type releasedVolumeStatus struct {
// How many recycle/delete operations failed.
retryCount int
// Timestamp of the last attempt.
lastAttempt time.Time
}
// NewPersistentVolumeRecycler creates a new PersistentVolumeRecycler
func NewPersistentVolumeRecycler(kubeClient clientset.Interface, syncPeriod time.Duration, maximumRetry int, plugins []volume.VolumePlugin, cloud cloudprovider.Interface) (*PersistentVolumeRecycler, error) {
recyclerClient := NewRecyclerClient(kubeClient)
recycler := &PersistentVolumeRecycler{
client: recyclerClient,
kubeClient: kubeClient,
cloud: cloud,
client: recyclerClient,
kubeClient: kubeClient,
cloud: cloud,
maximumRetry: maximumRetry,
syncPeriod: syncPeriod,
releasedVolumes: make(map[string]releasedVolumeStatus),
}
if err := recycler.pluginMgr.InitPlugins(plugins, recycler); err != nil {
@ -89,6 +107,14 @@ func NewPersistentVolumeRecycler(kubeClient clientset.Interface, syncPeriod time
}
recycler.reclaimVolume(pv)
},
DeleteFunc: func(obj interface{}) {
pv, ok := obj.(*api.PersistentVolume)
if !ok {
glog.Errorf("Error casting object to PersistentVolume: %v", obj)
return
}
recycler.removeReleasedVolume(pv)
},
},
)
@ -96,17 +122,50 @@ func NewPersistentVolumeRecycler(kubeClient clientset.Interface, syncPeriod time
return recycler, nil
}
func (recycler *PersistentVolumeRecycler) reclaimVolume(pv *api.PersistentVolume) error {
if pv.Status.Phase == api.VolumeReleased && pv.Spec.ClaimRef != nil {
glog.V(5).Infof("Reclaiming PersistentVolume[%s]\n", pv.Name)
// shouldRecycle checks a volume and returns nil, if the volume should be
// recycled right now. Otherwise it returns an error with reason why it should
// not be recycled.
func (recycler *PersistentVolumeRecycler) shouldRecycle(pv *api.PersistentVolume) error {
if pv.Spec.ClaimRef == nil {
return fmt.Errorf("Volume does not have a reference to claim")
}
if pv.Status.Phase != api.VolumeReleased {
return fmt.Errorf("The volume is not in 'Released' phase")
}
latest, err := recycler.client.GetPersistentVolume(pv.Name)
if err != nil {
return fmt.Errorf("Could not find PersistentVolume %s", pv.Name)
}
if latest.Status.Phase != api.VolumeReleased {
return fmt.Errorf("PersistentVolume[%s] phase is %s, expected %s. Skipping.", pv.Name, latest.Status.Phase, api.VolumeReleased)
}
// The volume is Released, should we retry recycling?
status, found := recycler.releasedVolumes[pv.Name]
if !found {
// We don't know anything about this volume. The controller has been
// restarted or the volume has been marked as Released by another
// controller. Recycle/delete this volume as if it was just Released.
glog.V(5).Infof("PersistentVolume[%s] not found in local cache, recycling", pv.Name)
return nil
}
// Check the timestamp
expectedRetry := status.lastAttempt.Add(recycler.syncPeriod)
if time.Now().After(expectedRetry) {
glog.V(5).Infof("PersistentVolume[%s] retrying recycle after timeout", pv.Name)
return nil
}
// It's too early
glog.V(5).Infof("PersistentVolume[%s] skipping recycle, it's too early: now: %v, next retry: %v", pv.Name, time.Now(), expectedRetry)
return fmt.Errorf("Too early after previous failure")
}
func (recycler *PersistentVolumeRecycler) reclaimVolume(pv *api.PersistentVolume) error {
glog.V(5).Infof("Recycler: checking PersistentVolume[%s]\n", pv.Name)
// Always load the latest version of the volume
newPV, err := recycler.client.GetPersistentVolume(pv.Name)
if err != nil {
return fmt.Errorf("Could not find PersistentVolume %s", pv.Name)
}
pv = newPV
err = recycler.shouldRecycle(pv)
if err == nil {
glog.V(5).Infof("Reclaiming PersistentVolume[%s]\n", pv.Name)
// both handleRecycle and handleDelete block until completion
// TODO: allow parallel recycling operations to increase throughput
@ -125,10 +184,41 @@ func (recycler *PersistentVolumeRecycler) reclaimVolume(pv *api.PersistentVolume
glog.Errorf(errMsg)
return fmt.Errorf(errMsg)
}
return nil
}
glog.V(3).Infof("PersistentVolume[%s] phase %s - skipping: %v", pv.Name, pv.Status.Phase, err)
return nil
}
// handleReleaseFailure evaluates a failed Recycle/Delete operation, updates
// internal controller state with new nr. of attempts and timestamp of the last
// attempt. Based on the number of failures it returns the next state of the
// volume (Released / Failed).
func (recycler *PersistentVolumeRecycler) handleReleaseFailure(pv *api.PersistentVolume) api.PersistentVolumePhase {
status, found := recycler.releasedVolumes[pv.Name]
if !found {
// First failure, set retryCount to 0 (will be inceremented few lines below)
status = releasedVolumeStatus{}
}
status.retryCount += 1
if status.retryCount > recycler.maximumRetry {
// This was the last attempt. Remove any internal state and mark the
// volume as Failed.
glog.V(3).Infof("PersistentVolume[%s] failed %d times - marking Failed", pv.Name, status.retryCount)
recycler.removeReleasedVolume(pv)
return api.VolumeFailed
}
status.lastAttempt = time.Now()
recycler.releasedVolumes[pv.Name] = status
return api.VolumeReleased
}
func (recycler *PersistentVolumeRecycler) removeReleasedVolume(pv *api.PersistentVolume) {
delete(recycler.releasedVolumes, pv.Name)
}
func (recycler *PersistentVolumeRecycler) handleRecycle(pv *api.PersistentVolume) error {
glog.V(5).Infof("Recycling PersistentVolume[%s]\n", pv.Name)
@ -154,9 +244,12 @@ func (recycler *PersistentVolumeRecycler) handleRecycle(pv *api.PersistentVolume
if err := volRecycler.Recycle(); err != nil {
glog.Errorf("PersistentVolume[%s] failed recycling: %+v", pv.Name, err)
pv.Status.Message = fmt.Sprintf("Recycling error: %s", err)
nextPhase = api.VolumeFailed
nextPhase = recycler.handleReleaseFailure(pv)
} else {
glog.V(5).Infof("PersistentVolume[%s] successfully recycled\n", pv.Name)
// The volume has been recycled. Remove any internal state to make
// any subsequent bind+recycle cycle working.
recycler.removeReleasedVolume(pv)
nextPhase = api.VolumePending
}
}
@ -200,9 +293,10 @@ func (recycler *PersistentVolumeRecycler) handleDelete(pv *api.PersistentVolume)
if err != nil {
glog.Errorf("PersistentVolume[%s] failed deletion: %+v", pv.Name, err)
pv.Status.Message = fmt.Sprintf("Deletion error: %s", err)
nextPhase = api.VolumeFailed
nextPhase = recycler.handleReleaseFailure(pv)
} else {
glog.V(5).Infof("PersistentVolume[%s] successfully deleted through plugin\n", pv.Name)
recycler.removeReleasedVolume(pv)
// after successful deletion through the plugin, we can also remove the PV from the cluster
if err := recycler.client.DeletePersistentVolume(pv); err != nil {
return fmt.Errorf("error deleting persistent volume: %+v", err)

View File

@ -17,16 +17,149 @@ limitations under the License.
package persistentvolume
import (
"fmt"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/client/testing/fake"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/host_path"
)
const (
mySyncPeriod = 2 * time.Second
myMaximumRetry = 3
)
func TestFailedRecycling(t *testing.T) {
pv := &api.PersistentVolume{
pv := preparePV()
mockClient := &mockBinderClient{
volume: pv,
}
// no Init called for pluginMgr and no plugins are available. Volume should fail recycling.
plugMgr := volume.VolumePluginMgr{}
recycler := &PersistentVolumeRecycler{
kubeClient: fake.NewSimpleClientset(),
client: mockClient,
pluginMgr: plugMgr,
releasedVolumes: make(map[string]releasedVolumeStatus),
}
err := recycler.reclaimVolume(pv)
if err != nil {
t.Errorf("Unexpected non-nil error: %v", err)
}
if mockClient.volume.Status.Phase != api.VolumeFailed {
t.Errorf("Expected %s but got %s", api.VolumeFailed, mockClient.volume.Status.Phase)
}
// Use a new volume for the next test
pv = preparePV()
mockClient.volume = pv
pv.Spec.PersistentVolumeReclaimPolicy = api.PersistentVolumeReclaimDelete
err = recycler.reclaimVolume(pv)
if err != nil {
t.Errorf("Unexpected non-nil error: %v", err)
}
if mockClient.volume.Status.Phase != api.VolumeFailed {
t.Errorf("Expected %s but got %s", api.VolumeFailed, mockClient.volume.Status.Phase)
}
}
func TestRecyclingRetry(t *testing.T) {
// Test that recycler controller retries to recycle a volume several times, which succeeds eventually
pv := preparePV()
mockClient := &mockBinderClient{
volume: pv,
}
plugMgr := volume.VolumePluginMgr{}
// Use a fake NewRecycler function
plugMgr.InitPlugins(host_path.ProbeRecyclableVolumePlugins(newFailingMockRecycler, volume.VolumeConfig{}), volume.NewFakeVolumeHost("/tmp/fake", nil, nil))
// Reset a global call counter
failedCallCount = 0
recycler := &PersistentVolumeRecycler{
kubeClient: fake.NewSimpleClientset(),
client: mockClient,
pluginMgr: plugMgr,
syncPeriod: mySyncPeriod,
maximumRetry: myMaximumRetry,
releasedVolumes: make(map[string]releasedVolumeStatus),
}
// All but the last attempt will fail
testRecycleFailures(t, recycler, mockClient, pv, myMaximumRetry-1)
// The last attempt should succeed
err := recycler.reclaimVolume(pv)
if err != nil {
t.Errorf("Last step: Recycler failed: %v", err)
}
if mockClient.volume.Status.Phase != api.VolumePending {
t.Errorf("Last step: The volume should be Pending, but is %s instead", mockClient.volume.Status.Phase)
}
// Check the cache, it should not have any entry
status, found := recycler.releasedVolumes[pv.Name]
if found {
t.Errorf("Last step: Expected PV to be removed from cache, got %v", status)
}
}
func TestRecyclingRetryAlwaysFail(t *testing.T) {
// Test that recycler controller retries to recycle a volume several times, which always fails.
pv := preparePV()
mockClient := &mockBinderClient{
volume: pv,
}
plugMgr := volume.VolumePluginMgr{}
// Use a fake NewRecycler function
plugMgr.InitPlugins(host_path.ProbeRecyclableVolumePlugins(newAlwaysFailingMockRecycler, volume.VolumeConfig{}), volume.NewFakeVolumeHost("/tmp/fake", nil, nil))
// Reset a global call counter
failedCallCount = 0
recycler := &PersistentVolumeRecycler{
kubeClient: fake.NewSimpleClientset(),
client: mockClient,
pluginMgr: plugMgr,
syncPeriod: mySyncPeriod,
maximumRetry: myMaximumRetry,
releasedVolumes: make(map[string]releasedVolumeStatus),
}
// myMaximumRetry recycle attempts will fail
testRecycleFailures(t, recycler, mockClient, pv, myMaximumRetry)
// The volume should be failed after myMaximumRetry attempts
err := recycler.reclaimVolume(pv)
if err != nil {
t.Errorf("Last step: Recycler failed: %v", err)
}
if mockClient.volume.Status.Phase != api.VolumeFailed {
t.Errorf("Last step: The volume should be Failed, but is %s instead", mockClient.volume.Status.Phase)
}
// Check the cache, it should not have any entry
status, found := recycler.releasedVolumes[pv.Name]
if found {
t.Errorf("Last step: Expected PV to be removed from cache, got %v", status)
}
}
func preparePV() *api.PersistentVolume {
return &api.PersistentVolume{
Spec: api.PersistentVolumeSpec{
AccessModes: []api.PersistentVolumeAccessMode{api.ReadWriteOnce},
Capacity: api.ResourceList{
@ -34,7 +167,7 @@ func TestFailedRecycling(t *testing.T) {
},
PersistentVolumeSource: api.PersistentVolumeSource{
HostPath: &api.HostPathVolumeSource{
Path: "/somepath/data02",
Path: "/tmp/data02",
},
},
PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimRecycle,
@ -47,36 +180,85 @@ func TestFailedRecycling(t *testing.T) {
Phase: api.VolumeReleased,
},
}
}
mockClient := &mockBinderClient{
volume: pv,
}
// Test that `count` attempts to recycle a PV fails.
func testRecycleFailures(t *testing.T, recycler *PersistentVolumeRecycler, mockClient *mockBinderClient, pv *api.PersistentVolume, count int) {
for i := 1; i <= count; i++ {
err := recycler.reclaimVolume(pv)
if err != nil {
t.Errorf("STEP %d: Recycler faled: %v", i, err)
}
// no Init called for pluginMgr and no plugins are available. Volume should fail recycling.
plugMgr := volume.VolumePluginMgr{}
// Check the status, it should be failed
if mockClient.volume.Status.Phase != api.VolumeReleased {
t.Errorf("STEP %d: The volume should be Released, but is %s instead", i, mockClient.volume.Status.Phase)
}
recycler := &PersistentVolumeRecycler{
kubeClient: fake.NewSimpleClientset(),
client: mockClient,
pluginMgr: plugMgr,
}
// Check the failed volume cache
status, found := recycler.releasedVolumes[pv.Name]
if !found {
t.Errorf("STEP %d: cannot find released volume status", i)
}
if status.retryCount != i {
t.Errorf("STEP %d: Expected nr. of attempts to be %d, got %d", i, i, status.retryCount)
}
err := recycler.reclaimVolume(pv)
if err != nil {
t.Errorf("Unexpected non-nil error: %v", err)
}
// call reclaimVolume too early, it should not increment the retryCount
time.Sleep(mySyncPeriod / 2)
err = recycler.reclaimVolume(pv)
if err != nil {
t.Errorf("STEP %d: Recycler failed: %v", i, err)
}
if mockClient.volume.Status.Phase != api.VolumeFailed {
t.Errorf("Expected %s but got %s", api.VolumeFailed, mockClient.volume.Status.Phase)
}
status, found = recycler.releasedVolumes[pv.Name]
if !found {
t.Errorf("STEP %d: cannot find released volume status", i)
}
if status.retryCount != i {
t.Errorf("STEP %d: Expected nr. of attempts to be %d, got %d", i, i, status.retryCount)
}
pv.Spec.PersistentVolumeReclaimPolicy = api.PersistentVolumeReclaimDelete
err = recycler.reclaimVolume(pv)
if err != nil {
t.Errorf("Unexpected non-nil error: %v", err)
}
if mockClient.volume.Status.Phase != api.VolumeFailed {
t.Errorf("Expected %s but got %s", api.VolumeFailed, mockClient.volume.Status.Phase)
// Call the next reclaimVolume() after full pvRecycleRetryPeriod
time.Sleep(mySyncPeriod / 2)
}
}
func newFailingMockRecycler(spec *volume.Spec, host volume.VolumeHost, config volume.VolumeConfig) (volume.Recycler, error) {
return &failingMockRecycler{
path: spec.PersistentVolume.Spec.HostPath.Path,
errorCount: myMaximumRetry - 1, // fail two times and then successfuly recycle the volume
}, nil
}
func newAlwaysFailingMockRecycler(spec *volume.Spec, host volume.VolumeHost, config volume.VolumeConfig) (volume.Recycler, error) {
return &failingMockRecycler{
path: spec.PersistentVolume.Spec.HostPath.Path,
errorCount: 1000, // always fail
}, nil
}
type failingMockRecycler struct {
path string
// How many times should the recycler fail before returning success.
errorCount int
volume.MetricsNil
}
// Counter of failingMockRecycler.Recycle() calls. Global variable just for
// testing. It's too much code to create a custom volume plugin, which would
// hold this variable.
var failedCallCount = 0
func (r *failingMockRecycler) GetPath() string {
return r.path
}
func (r *failingMockRecycler) Recycle() error {
failedCallCount += 1
if failedCallCount <= r.errorCount {
return fmt.Errorf("Failing for %d. time", failedCallCount)
}
// return nil means recycle passed
return nil
}

View File

@ -58,7 +58,7 @@ func TestPersistentVolumeRecycler(t *testing.T) {
binder.Run()
defer binder.Stop()
recycler, _ := persistentvolumecontroller.NewPersistentVolumeRecycler(recyclerClient, 30*time.Second, plugins, cloud)
recycler, _ := persistentvolumecontroller.NewPersistentVolumeRecycler(recyclerClient, 30*time.Second, 3, plugins, cloud)
recycler.Run()
defer recycler.Stop()