mirror of https://github.com/k3s-io/k3s
Recycler controller
parent
986cbb56d4
commit
deec5f26cd
|
@ -234,6 +234,11 @@ func (s *CMServer) Run(_ []string) error {
|
|||
|
||||
pvclaimBinder := volumeclaimbinder.NewPersistentVolumeClaimBinder(kubeClient, s.PVClaimBinderSyncPeriod)
|
||||
pvclaimBinder.Run()
|
||||
pvRecycler, err := volumeclaimbinder.NewPersistentVolumeRecycler(kubeClient, s.PVClaimBinderSyncPeriod, ProbeRecyclableVolumePlugins())
|
||||
if err != nil {
|
||||
glog.Fatalf("Failed to start persistent volume recycler: %+v", err)
|
||||
}
|
||||
pvRecycler.Run()
|
||||
|
||||
if len(s.ServiceAccountKeyFile) > 0 {
|
||||
privateKey, err := serviceaccount.ReadPrivateKey(s.ServiceAccountKeyFile)
|
||||
|
|
|
@ -20,6 +20,8 @@ import (
|
|||
// This file exists to force the desired plugin implementations to be linked.
|
||||
// This should probably be part of some configuration fed into the build for a
|
||||
// given binary target.
|
||||
|
||||
//Cloud providers
|
||||
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/aws"
|
||||
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/gce"
|
||||
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/mesos"
|
||||
|
@ -27,4 +29,21 @@ import (
|
|||
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/ovirt"
|
||||
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/rackspace"
|
||||
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/vagrant"
|
||||
|
||||
// Volume plugins
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume/host_path"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume/nfs"
|
||||
)
|
||||
|
||||
// ProbeRecyclableVolumePlugins collects all persistent volume plugins into an easy to use list.
|
||||
func ProbeRecyclableVolumePlugins() []volume.VolumePlugin {
|
||||
allPlugins := []volume.VolumePlugin{}
|
||||
|
||||
// The list of plugins to probe is decided by the kubelet binary, not
|
||||
// by dynamic linking or other "magic". Plugins will be analyzed and
|
||||
// initialized later.
|
||||
allPlugins = append(allPlugins, host_path.ProbeVolumePlugins()...)
|
||||
allPlugins = append(allPlugins, nfs.ProbeVolumePlugins()...)
|
||||
return allPlugins
|
||||
}
|
||||
|
|
|
@ -99,7 +99,10 @@ func (binder *PersistentVolumeClaimBinder) addVolume(obj interface{}) {
|
|||
binder.lock.Lock()
|
||||
defer binder.lock.Unlock()
|
||||
volume := obj.(*api.PersistentVolume)
|
||||
syncVolume(binder.volumeIndex, binder.client, volume)
|
||||
err := syncVolume(binder.volumeIndex, binder.client, volume)
|
||||
if err != nil {
|
||||
glog.Errorf("PVClaimBinder could not add volume %s: %+v", volume.Name, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (binder *PersistentVolumeClaimBinder) updateVolume(oldObj, newObj interface{}) {
|
||||
|
@ -107,7 +110,10 @@ func (binder *PersistentVolumeClaimBinder) updateVolume(oldObj, newObj interface
|
|||
defer binder.lock.Unlock()
|
||||
newVolume := newObj.(*api.PersistentVolume)
|
||||
binder.volumeIndex.Update(newVolume)
|
||||
syncVolume(binder.volumeIndex, binder.client, newVolume)
|
||||
err := syncVolume(binder.volumeIndex, binder.client, newVolume)
|
||||
if err != nil {
|
||||
glog.Errorf("PVClaimBinder could not update volume %s: %+v", newVolume.Name, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (binder *PersistentVolumeClaimBinder) deleteVolume(obj interface{}) {
|
||||
|
@ -121,18 +127,24 @@ func (binder *PersistentVolumeClaimBinder) addClaim(obj interface{}) {
|
|||
binder.lock.Lock()
|
||||
defer binder.lock.Unlock()
|
||||
claim := obj.(*api.PersistentVolumeClaim)
|
||||
syncClaim(binder.volumeIndex, binder.client, claim)
|
||||
err := syncClaim(binder.volumeIndex, binder.client, claim)
|
||||
if err != nil {
|
||||
glog.Errorf("PVClaimBinder could not add claim %s: %+v", claim.Name, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (binder *PersistentVolumeClaimBinder) updateClaim(oldObj, newObj interface{}) {
|
||||
binder.lock.Lock()
|
||||
defer binder.lock.Unlock()
|
||||
newClaim := newObj.(*api.PersistentVolumeClaim)
|
||||
syncClaim(binder.volumeIndex, binder.client, newClaim)
|
||||
err := syncClaim(binder.volumeIndex, binder.client, newClaim)
|
||||
if err != nil {
|
||||
glog.Errorf("PVClaimBinder could not update claim %s: %+v", newClaim.Name, err)
|
||||
}
|
||||
}
|
||||
|
||||
func syncVolume(volumeIndex *persistentVolumeOrderedIndex, binderClient binderClient, volume *api.PersistentVolume) (err error) {
|
||||
glog.V(5).Infof("Synchronizing PersistentVolume[%s]\n", volume.Name)
|
||||
glog.V(5).Infof("Synchronizing PersistentVolume[%s], current phase: %s\n", volume.Name, volume.Status.Phase)
|
||||
|
||||
// volumes can be in one of the following states:
|
||||
//
|
||||
|
@ -140,12 +152,28 @@ func syncVolume(volumeIndex *persistentVolumeOrderedIndex, binderClient binderCl
|
|||
// VolumeAvailable -- not bound to a claim, but processed at least once and found in this controller's volumeIndex.
|
||||
// VolumeBound -- bound to a claim because volume.Spec.ClaimRef != nil. Claim status may not be correct.
|
||||
// VolumeReleased -- volume.Spec.ClaimRef != nil but the claim has been deleted by the user.
|
||||
// VolumeFailed -- volume.Spec.ClaimRef != nil and the volume failed processing in the recycler
|
||||
currentPhase := volume.Status.Phase
|
||||
nextPhase := currentPhase
|
||||
|
||||
switch currentPhase {
|
||||
// pending volumes are available only after indexing in order to be matched to claims.
|
||||
case api.VolumePending:
|
||||
if volume.Spec.ClaimRef != nil {
|
||||
// Pending volumes that have a ClaimRef were recently recycled. The Recycler set the phase to VolumePending
|
||||
// to start the volume again at the beginning of this lifecycle.
|
||||
// ClaimRef is the last bind between persistent volume and claim.
|
||||
// The claim has already been deleted by the user at this point
|
||||
oldClaimRef := volume.Spec.ClaimRef
|
||||
volume.Spec.ClaimRef = nil
|
||||
_, err = binderClient.UpdatePersistentVolume(volume)
|
||||
if err != nil {
|
||||
// rollback on error, keep the ClaimRef until we can successfully update the volume
|
||||
volume.Spec.ClaimRef = oldClaimRef
|
||||
return fmt.Errorf("Unexpected error saving PersistentVolume: %+v", err)
|
||||
}
|
||||
}
|
||||
|
||||
_, exists, err := volumeIndex.Get(volume)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -170,10 +198,11 @@ func syncVolume(volumeIndex *persistentVolumeOrderedIndex, binderClient binderCl
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
//bound volumes require verification of their bound claims
|
||||
case api.VolumeBound:
|
||||
if volume.Spec.ClaimRef == nil {
|
||||
return fmt.Errorf("PersistentVolume[%s] expected to be bound but found nil claimRef: %+v", volume)
|
||||
return fmt.Errorf("PersistentVolume[%s] expected to be bound but found nil claimRef: %+v", volume.Name, volume)
|
||||
} else {
|
||||
_, err := binderClient.GetPersistentVolumeClaim(volume.Spec.ClaimRef.Namespace, volume.Spec.ClaimRef.Name)
|
||||
if err != nil {
|
||||
|
@ -184,12 +213,22 @@ func syncVolume(volumeIndex *persistentVolumeOrderedIndex, binderClient binderCl
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// released volumes require recycling
|
||||
case api.VolumeReleased:
|
||||
if volume.Spec.ClaimRef == nil {
|
||||
return fmt.Errorf("PersistentVolume[%s] expected to be bound but found nil claimRef: %+v", volume.Name, volume)
|
||||
} else {
|
||||
// another process is watching for released volumes.
|
||||
// PersistentVolumeReclaimPolicy is set per PersistentVolume
|
||||
}
|
||||
|
||||
// volumes are removed by processes external to this binder and must be removed from the cluster
|
||||
case api.VolumeFailed:
|
||||
if volume.Spec.ClaimRef == nil {
|
||||
return fmt.Errorf("PersistentVolume[%s] expected to be bound but found nil claimRef: %+v", volume)
|
||||
} else {
|
||||
// TODO: implement Recycle method on plugins
|
||||
glog.V(5).Infof("PersistentVolume[%s] previously failed recycling. Skipping.\n", volume.Name)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -198,6 +237,7 @@ func syncVolume(volumeIndex *persistentVolumeOrderedIndex, binderClient binderCl
|
|||
|
||||
// a change in state will trigger another update through this controller.
|
||||
// each pass through this controller evaluates current phase and decides whether or not to change to the next phase
|
||||
glog.V(5).Infof("PersistentVolume[%s] changing phase from %s to %s\n", volume.Name, currentPhase, nextPhase)
|
||||
volume, err := binderClient.UpdatePersistentVolumeStatus(volume)
|
||||
if err != nil {
|
||||
// Rollback to previous phase
|
||||
|
@ -254,6 +294,7 @@ func syncClaim(volumeIndex *persistentVolumeOrderedIndex, binderClient binderCli
|
|||
}
|
||||
|
||||
if volume.Spec.ClaimRef == nil {
|
||||
glog.V(5).Infof("Rebuilding bind on pv.Spec.ClaimRef\n")
|
||||
claimRef, err := api.GetReference(claim)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Unexpected error getting claim reference: %v\n", err)
|
||||
|
@ -318,6 +359,7 @@ func (controller *PersistentVolumeClaimBinder) Stop() {
|
|||
type binderClient interface {
|
||||
GetPersistentVolume(name string) (*api.PersistentVolume, error)
|
||||
UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error)
|
||||
DeletePersistentVolume(volume *api.PersistentVolume) error
|
||||
UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error)
|
||||
GetPersistentVolumeClaim(namespace, name string) (*api.PersistentVolumeClaim, error)
|
||||
UpdatePersistentVolumeClaim(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error)
|
||||
|
@ -340,6 +382,10 @@ func (c *realBinderClient) UpdatePersistentVolume(volume *api.PersistentVolume)
|
|||
return c.client.PersistentVolumes().Update(volume)
|
||||
}
|
||||
|
||||
func (c *realBinderClient) DeletePersistentVolume(volume *api.PersistentVolume) error {
|
||||
return c.client.PersistentVolumes().Delete(volume.Name)
|
||||
}
|
||||
|
||||
func (c *realBinderClient) UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
|
||||
return c.client.PersistentVolumes().UpdateStatus(volume)
|
||||
}
|
||||
|
|
|
@ -26,6 +26,8 @@ import (
|
|||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/testclient"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume/host_path"
|
||||
)
|
||||
|
||||
func TestRunStop(t *testing.T) {
|
||||
|
@ -105,6 +107,7 @@ func TestExampleObjects(t *testing.T) {
|
|||
Path: "/tmp/data02",
|
||||
},
|
||||
},
|
||||
PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimRecycle,
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -179,6 +182,7 @@ func TestBindingWithExamples(t *testing.T) {
|
|||
client := &testclient.Fake{ReactFn: testclient.ObjectReaction(o, latest.RESTMapper)}
|
||||
|
||||
pv, err := client.PersistentVolumes().Get("any")
|
||||
pv.Spec.PersistentVolumeReclaimPolicy = api.PersistentVolumeReclaimRecycle
|
||||
if err != nil {
|
||||
t.Error("Unexpected error getting PV from client: %v", err)
|
||||
}
|
||||
|
@ -194,6 +198,15 @@ func TestBindingWithExamples(t *testing.T) {
|
|||
claim: claim,
|
||||
}
|
||||
|
||||
plugMgr := volume.VolumePluginMgr{}
|
||||
plugMgr.InitPlugins(host_path.ProbeRecyclableVolumePlugins(newMockRecycler), volume.NewFakeVolumeHost("/tmp/fake", nil, nil))
|
||||
|
||||
recycler := &PersistentVolumeRecycler{
|
||||
kubeClient: client,
|
||||
client: mockClient,
|
||||
pluginMgr: plugMgr,
|
||||
}
|
||||
|
||||
// adds the volume to the index, making the volume available
|
||||
syncVolume(volumeIndex, mockClient, pv)
|
||||
if pv.Status.Phase != api.VolumeAvailable {
|
||||
|
@ -232,6 +245,31 @@ func TestBindingWithExamples(t *testing.T) {
|
|||
if pv.Status.Phase != api.VolumeReleased {
|
||||
t.Errorf("Expected phase %s but got %s", api.VolumeReleased, pv.Status.Phase)
|
||||
}
|
||||
if pv.Spec.ClaimRef == nil {
|
||||
t.Errorf("Expected non-nil ClaimRef: %+v", pv.Spec)
|
||||
}
|
||||
|
||||
mockClient.volume = pv
|
||||
|
||||
// released volumes with a PersistentVolumeReclaimPolicy (recycle/delete) can have further processing
|
||||
err = recycler.reclaimVolume(pv)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error reclaiming volume: %+v", err)
|
||||
}
|
||||
if pv.Status.Phase != api.VolumePending {
|
||||
t.Errorf("Expected phase %s but got %s", api.VolumePending, pv.Status.Phase)
|
||||
}
|
||||
|
||||
// after the recycling changes the phase to Pending, the binder picks up again
|
||||
// to remove any vestiges of binding and make the volume Available again
|
||||
syncVolume(volumeIndex, mockClient, pv)
|
||||
|
||||
if pv.Status.Phase != api.VolumeAvailable {
|
||||
t.Errorf("Expected phase %s but got %s", api.VolumeAvailable, pv.Status.Phase)
|
||||
}
|
||||
if pv.Spec.ClaimRef != nil {
|
||||
t.Errorf("Expected nil ClaimRef: %+v", pv.Spec)
|
||||
}
|
||||
}
|
||||
|
||||
type mockBinderClient struct {
|
||||
|
@ -247,6 +285,11 @@ func (c *mockBinderClient) UpdatePersistentVolume(volume *api.PersistentVolume)
|
|||
return volume, nil
|
||||
}
|
||||
|
||||
func (c *mockBinderClient) DeletePersistentVolume(volume *api.PersistentVolume) error {
|
||||
c.volume = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *mockBinderClient) UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
|
||||
return volume, nil
|
||||
}
|
||||
|
@ -266,3 +309,23 @@ func (c *mockBinderClient) UpdatePersistentVolumeClaim(claim *api.PersistentVolu
|
|||
func (c *mockBinderClient) UpdatePersistentVolumeClaimStatus(claim *api.PersistentVolumeClaim) (*api.PersistentVolumeClaim, error) {
|
||||
return claim, nil
|
||||
}
|
||||
|
||||
func newMockRecycler(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error) {
|
||||
return &mockRecycler{
|
||||
path: spec.PersistentVolumeSource.HostPath.Path,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type mockRecycler struct {
|
||||
path string
|
||||
host volume.VolumeHost
|
||||
}
|
||||
|
||||
func (r *mockRecycler) GetPath() string {
|
||||
return r.path
|
||||
}
|
||||
|
||||
func (r *mockRecycler) Recycle() error {
|
||||
// return nil means recycle passed
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -0,0 +1,231 @@
|
|||
/*
|
||||
Copyright 2014 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package volumeclaimbinder
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/mount"
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
// PersistentVolumeRecycler is a controller that watches for PersistentVolumes that are released from their claims.
|
||||
// This controller will Recycle those volumes whose reclaim policy is set to PersistentVolumeReclaimRecycle and make them
|
||||
// available again for a new claim.
|
||||
type PersistentVolumeRecycler struct {
|
||||
volumeController *framework.Controller
|
||||
stopChannel chan struct{}
|
||||
client recyclerClient
|
||||
kubeClient client.Interface
|
||||
pluginMgr volume.VolumePluginMgr
|
||||
}
|
||||
|
||||
// PersistentVolumeRecycler creates a new PersistentVolumeRecycler
|
||||
func NewPersistentVolumeRecycler(kubeClient client.Interface, syncPeriod time.Duration, plugins []volume.VolumePlugin) (*PersistentVolumeRecycler, error) {
|
||||
recyclerClient := NewRecyclerClient(kubeClient)
|
||||
recycler := &PersistentVolumeRecycler{
|
||||
client: recyclerClient,
|
||||
kubeClient: kubeClient,
|
||||
}
|
||||
|
||||
if err := recycler.pluginMgr.InitPlugins(plugins, recycler); err != nil {
|
||||
return nil, fmt.Errorf("Could not initialize volume plugins for PVClaimBinder: %+v", err)
|
||||
}
|
||||
|
||||
_, volumeController := framework.NewInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func() (runtime.Object, error) {
|
||||
return kubeClient.PersistentVolumes().List(labels.Everything(), fields.Everything())
|
||||
},
|
||||
WatchFunc: func(resourceVersion string) (watch.Interface, error) {
|
||||
return kubeClient.PersistentVolumes().Watch(labels.Everything(), fields.Everything(), resourceVersion)
|
||||
},
|
||||
},
|
||||
&api.PersistentVolume{},
|
||||
syncPeriod,
|
||||
framework.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
pv := obj.(*api.PersistentVolume)
|
||||
recycler.reclaimVolume(pv)
|
||||
},
|
||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||
pv := newObj.(*api.PersistentVolume)
|
||||
recycler.reclaimVolume(pv)
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
recycler.volumeController = volumeController
|
||||
return recycler, nil
|
||||
}
|
||||
|
||||
func (recycler *PersistentVolumeRecycler) reclaimVolume(pv *api.PersistentVolume) error {
|
||||
if pv.Status.Phase == api.VolumeReleased && pv.Spec.ClaimRef != nil {
|
||||
glog.V(5).Infof("Reclaiming PersistentVolume[%s]\n", pv.Name)
|
||||
|
||||
latest, err := recycler.client.GetPersistentVolume(pv.Name)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Could not find PersistentVolume %s", pv.Name)
|
||||
}
|
||||
if latest.Status.Phase != api.VolumeReleased {
|
||||
return fmt.Errorf("PersistentVolume[%s] phase is %s, expected %s. Skipping.", pv.Name, latest.Status.Phase, api.VolumeReleased)
|
||||
}
|
||||
|
||||
// handleRecycle blocks until completion
|
||||
// TODO: allow parallel recycling operations to increase throughput
|
||||
// TODO implement handleDelete in a separate PR w/ cloud volumes
|
||||
switch pv.Spec.PersistentVolumeReclaimPolicy {
|
||||
case api.PersistentVolumeReclaimRecycle:
|
||||
err = recycler.handleRecycle(pv)
|
||||
case api.PersistentVolumeReclaimRetain:
|
||||
glog.V(5).Infof("Volume %s is set to retain after release. Skipping.\n", pv.Name)
|
||||
default:
|
||||
err = fmt.Errorf("No PersistentVolumeReclaimPolicy defined for spec: %+v", pv)
|
||||
}
|
||||
if err != nil {
|
||||
errMsg := fmt.Sprintf("Could not recycle volume spec: %+v", err)
|
||||
glog.Errorf(errMsg)
|
||||
return fmt.Errorf(errMsg)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (recycler *PersistentVolumeRecycler) handleRecycle(pv *api.PersistentVolume) error {
|
||||
glog.V(5).Infof("Recycling PersistentVolume[%s]\n", pv.Name)
|
||||
|
||||
currentPhase := pv.Status.Phase
|
||||
nextPhase := currentPhase
|
||||
|
||||
spec := volume.NewSpecFromPersistentVolume(pv)
|
||||
plugin, err := recycler.pluginMgr.FindRecyclablePluginBySpec(spec)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Could not find recyclable volume plugin for spec: %+v", err)
|
||||
}
|
||||
volRecycler, err := plugin.NewRecycler(spec)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Could not obtain Recycler for spec: %+v", err)
|
||||
}
|
||||
// blocks until completion
|
||||
err = volRecycler.Recycle()
|
||||
if err != nil {
|
||||
glog.Errorf("PersistentVolume[%s] failed recycling: %+v", err)
|
||||
pv.Status.Message = fmt.Sprintf("Recycling error: %s", err)
|
||||
nextPhase = api.VolumeFailed
|
||||
} else {
|
||||
glog.V(5).Infof("PersistentVolume[%s] successfully recycled\n", pv.Name)
|
||||
nextPhase = api.VolumePending
|
||||
if err != nil {
|
||||
glog.Errorf("Error updating pv.Status: %+v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if currentPhase != nextPhase {
|
||||
glog.V(5).Infof("PersistentVolume[%s] changing phase from %s to %s\n", pv.Name, currentPhase, nextPhase)
|
||||
pv.Status.Phase = nextPhase
|
||||
_, err := recycler.client.UpdatePersistentVolumeStatus(pv)
|
||||
if err != nil {
|
||||
// Rollback to previous phase
|
||||
pv.Status.Phase = currentPhase
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Run starts this recycler's control loops
|
||||
func (recycler *PersistentVolumeRecycler) Run() {
|
||||
glog.V(5).Infof("Starting PersistentVolumeRecycler\n")
|
||||
if recycler.stopChannel == nil {
|
||||
recycler.stopChannel = make(chan struct{})
|
||||
go recycler.volumeController.Run(recycler.stopChannel)
|
||||
}
|
||||
}
|
||||
|
||||
// Stop gracefully shuts down this binder
|
||||
func (recycler *PersistentVolumeRecycler) Stop() {
|
||||
glog.V(5).Infof("Stopping PersistentVolumeRecycler\n")
|
||||
if recycler.stopChannel != nil {
|
||||
close(recycler.stopChannel)
|
||||
recycler.stopChannel = nil
|
||||
}
|
||||
}
|
||||
|
||||
// recyclerClient abstracts access to PVs
|
||||
type recyclerClient interface {
|
||||
GetPersistentVolume(name string) (*api.PersistentVolume, error)
|
||||
UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error)
|
||||
UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error)
|
||||
}
|
||||
|
||||
func NewRecyclerClient(c client.Interface) recyclerClient {
|
||||
return &realRecyclerClient{c}
|
||||
}
|
||||
|
||||
type realRecyclerClient struct {
|
||||
client client.Interface
|
||||
}
|
||||
|
||||
func (c *realRecyclerClient) GetPersistentVolume(name string) (*api.PersistentVolume, error) {
|
||||
return c.client.PersistentVolumes().Get(name)
|
||||
}
|
||||
|
||||
func (c *realRecyclerClient) UpdatePersistentVolume(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
|
||||
return c.client.PersistentVolumes().Update(volume)
|
||||
}
|
||||
|
||||
func (c *realRecyclerClient) UpdatePersistentVolumeStatus(volume *api.PersistentVolume) (*api.PersistentVolume, error) {
|
||||
return c.client.PersistentVolumes().UpdateStatus(volume)
|
||||
}
|
||||
|
||||
// PersistentVolumeRecycler is host to the volume plugins, but does not actually mount any volumes.
|
||||
// Because no mounting is performed, most of the VolumeHost methods are not implemented.
|
||||
func (f *PersistentVolumeRecycler) GetPluginDir(podUID string) string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func (f *PersistentVolumeRecycler) GetPodVolumeDir(podUID types.UID, pluginName, volumeName string) string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func (f *PersistentVolumeRecycler) GetPodPluginDir(podUID types.UID, pluginName string) string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func (f *PersistentVolumeRecycler) GetKubeClient() client.Interface {
|
||||
return f.kubeClient
|
||||
}
|
||||
|
||||
func (f *PersistentVolumeRecycler) NewWrapperBuilder(spec *volume.Spec, pod *api.Pod, opts volume.VolumeOptions, mounter mount.Interface) (volume.Builder, error) {
|
||||
return nil, fmt.Errorf("NewWrapperBuilder not supported by PVClaimBinder's VolumeHost implementation")
|
||||
}
|
||||
|
||||
func (f *PersistentVolumeRecycler) NewWrapperCleaner(spec *volume.Spec, podUID types.UID, mounter mount.Interface) (volume.Cleaner, error) {
|
||||
return nil, fmt.Errorf("NewWrapperCleaner not supported by PVClaimBinder's VolumeHost implementation")
|
||||
}
|
|
@ -0,0 +1,200 @@
|
|||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package e2e
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Marked with [Skipped] to skip the test by default (see driver.go),
|
||||
// the test needs privileged containers, which are disabled by default.
|
||||
// Run the test with "go run hack/e2e.go ... --ginkgo.focus=PersistentVolume"
|
||||
var _ = Describe("[Skipped] persistentVolumes", func() {
|
||||
// f := NewFramework("pv")
|
||||
|
||||
var c *client.Client
|
||||
var ns string
|
||||
|
||||
BeforeEach(func() {
|
||||
var err error
|
||||
c, err = loadClient()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
ns_, err := createTestingNS("pv", c)
|
||||
ns = ns_.Name
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
})
|
||||
|
||||
AfterEach(func() {
|
||||
By(fmt.Sprintf("Destroying namespace for this suite %v", ns))
|
||||
if err := c.Namespaces().Delete(ns); err != nil {
|
||||
Failf("Couldn't delete ns %s", err)
|
||||
}
|
||||
})
|
||||
|
||||
It("PersistentVolume", func() {
|
||||
config := VolumeTestConfig{
|
||||
namespace: ns,
|
||||
prefix: "nfs",
|
||||
serverImage: "gcr.io/google_containers/volume-nfs",
|
||||
serverPorts: []int{2049},
|
||||
}
|
||||
|
||||
defer func() {
|
||||
volumeTestCleanup(c, config)
|
||||
}()
|
||||
|
||||
pod := startVolumeServer(c, config)
|
||||
serverIP := pod.Status.PodIP
|
||||
Logf("NFS server IP address: %v", serverIP)
|
||||
|
||||
pv := makePersistentVolume(serverIP)
|
||||
pvc := makePersistentVolumeClaim(ns)
|
||||
|
||||
Logf("Creating PersistentVolume using NFS")
|
||||
pv, err := c.PersistentVolumes().Create(pv)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
Logf("Creating PersistentVolumeClaim")
|
||||
pvc, err = c.PersistentVolumeClaims(ns).Create(pvc)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
// allow the binder a chance to catch up
|
||||
time.Sleep(20 * time.Second)
|
||||
|
||||
pv, err = c.PersistentVolumes().Get(pv.Name)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
if pv.Spec.ClaimRef == nil {
|
||||
Failf("Expected PersistentVolume to be bound, but got nil ClaimRef: %+v", pv)
|
||||
}
|
||||
|
||||
Logf("Deleting PersistentVolumeClaim to trigger PV Recycling")
|
||||
err = c.PersistentVolumeClaims(ns).Delete(pvc.Name)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
// allow the recycler a chance to catch up
|
||||
time.Sleep(120 * time.Second)
|
||||
|
||||
pv, err = c.PersistentVolumes().Get(pv.Name)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
if pv.Spec.ClaimRef != nil {
|
||||
Failf("Expected PersistentVolume to be unbound, but found non-nil ClaimRef: %+v", pv)
|
||||
}
|
||||
|
||||
// Now check that index.html from the NFS server was really removed
|
||||
checkpod := makeCheckPod(ns, serverIP)
|
||||
testContainerOutputInNamespace("the volume was scrubbed", c, checkpod, []string{"index.html does not exist"}, ns)
|
||||
|
||||
})
|
||||
})
|
||||
|
||||
func makePersistentVolume(serverIP string) *api.PersistentVolume {
|
||||
return &api.PersistentVolume{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "nfs-" + string(util.NewUUID()),
|
||||
},
|
||||
Spec: api.PersistentVolumeSpec{
|
||||
PersistentVolumeReclaimPolicy: api.PersistentVolumeReclaimRecycle,
|
||||
Capacity: api.ResourceList{
|
||||
api.ResourceName(api.ResourceStorage): resource.MustParse("2Gi"),
|
||||
},
|
||||
PersistentVolumeSource: api.PersistentVolumeSource{
|
||||
NFS: &api.NFSVolumeSource{
|
||||
Server: serverIP,
|
||||
Path: "/",
|
||||
ReadOnly: false,
|
||||
},
|
||||
},
|
||||
AccessModes: []api.PersistentVolumeAccessMode{
|
||||
api.ReadWriteOnce,
|
||||
api.ReadOnlyMany,
|
||||
api.ReadWriteMany,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func makePersistentVolumeClaim(ns string) *api.PersistentVolumeClaim {
|
||||
return &api.PersistentVolumeClaim{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "pvc-" + string(util.NewUUID()),
|
||||
Namespace: ns,
|
||||
},
|
||||
Spec: api.PersistentVolumeClaimSpec{
|
||||
AccessModes: []api.PersistentVolumeAccessMode{
|
||||
api.ReadWriteOnce,
|
||||
api.ReadOnlyMany,
|
||||
api.ReadWriteMany,
|
||||
},
|
||||
Resources: api.ResourceRequirements{
|
||||
Requests: api.ResourceList{
|
||||
api.ResourceName(api.ResourceStorage): resource.MustParse("1Gi"),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func makeCheckPod(ns string, nfsserver string) *api.Pod {
|
||||
// Prepare pod that mounts the NFS volume again and
|
||||
// checks that /mnt/index.html was scrubbed there
|
||||
return &api.Pod{
|
||||
TypeMeta: api.TypeMeta{
|
||||
Kind: "Pod",
|
||||
APIVersion: "v1beta3",
|
||||
},
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "checker-" + string(util.NewUUID()),
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
Containers: []api.Container{
|
||||
{
|
||||
Name: "checker-" + string(util.NewUUID()),
|
||||
Image: "busybox",
|
||||
Command: []string{"/bin/sh"},
|
||||
Args: []string{"-c", "test -e /mnt/index.html || echo 'index.html does not exist'"},
|
||||
VolumeMounts: []api.VolumeMount{
|
||||
{
|
||||
Name: "nfs-volume",
|
||||
MountPath: "/mnt",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Volumes: []api.Volume{
|
||||
{
|
||||
Name: "nfs-volume",
|
||||
VolumeSource: api.VolumeSource{
|
||||
NFS: &api.NFSVolumeSource{
|
||||
Server: nfsserver,
|
||||
Path: "/",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue