Recycling HostPath and NFS impls

pull/6/head
markturansky 2015-05-29 16:34:02 -04:00
parent cb547f4b5c
commit 986cbb56d4
7 changed files with 471 additions and 2 deletions

View File

@ -508,3 +508,11 @@ func GetClient(req *http.Request) string {
}
return "unknown"
}
func ShortenString(str string, n int) string {
if len(str) <= n {
return str
} else {
return str[:n]
}
}

View File

@ -18,23 +18,35 @@ package host_path
import (
"fmt"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/mount"
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume"
)
// This is the primary entrypoint for volume plugins.
// Tests covering recycling should not use this func but instead
// use their own array of plugins w/ a custom recyclerFunc as appropriate
func ProbeVolumePlugins() []volume.VolumePlugin {
return []volume.VolumePlugin{&hostPathPlugin{nil}}
return []volume.VolumePlugin{&hostPathPlugin{nil, newRecycler}}
}
func ProbeRecyclableVolumePlugins(recyclerFunc func(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error)) []volume.VolumePlugin {
return []volume.VolumePlugin{&hostPathPlugin{nil, recyclerFunc}}
}
type hostPathPlugin struct {
host volume.VolumeHost
// decouple creating recyclers by deferring to a function. Allows for easier testing.
newRecyclerFunc func(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error)
}
var _ volume.VolumePlugin = &hostPathPlugin{}
var _ volume.PersistentVolumePlugin = &hostPathPlugin{}
var _ volume.RecyclableVolumePlugin = &hostPathPlugin{}
const (
hostPathPluginName = "kubernetes.io/host-path"
@ -70,6 +82,18 @@ func (plugin *hostPathPlugin) NewCleaner(volName string, podUID types.UID, _ mou
return &hostPath{""}, nil
}
func (plugin *hostPathPlugin) NewRecycler(spec *volume.Spec) (volume.Recycler, error) {
return plugin.newRecyclerFunc(spec, plugin.host)
}
func newRecycler(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error) {
if spec.VolumeSource.HostPath != nil {
return &hostPathRecycler{spec.Name, spec.VolumeSource.HostPath.Path, host}, nil
} else {
return &hostPathRecycler{spec.Name, spec.PersistentVolumeSource.HostPath.Path, host}, nil
}
}
// HostPath volumes represent a bare host file or directory mount.
// The direct at the specified path will be directly exposed to the container.
type hostPath struct {
@ -99,3 +123,64 @@ func (hp *hostPath) TearDown() error {
func (hp *hostPath) TearDownAt(dir string) error {
return fmt.Errorf("TearDownAt() does not make sense for host paths")
}
// hostPathRecycler scrubs a hostPath volume by running "rm -rf" on the volume in a pod
// This recycler only works on a single host cluster and is for testing purposes only.
type hostPathRecycler struct {
name string
path string
host volume.VolumeHost
}
func (r *hostPathRecycler) GetPath() string {
return r.path
}
// Recycler provides methods to reclaim the volume resource.
// A HostPath is recycled by scheduling a pod to run "rm -rf" on the contents of the volume. This is meant for
// development and testing in a single node cluster only.
// Recycle blocks until the pod has completed or any error occurs.
// The scrubber pod's is expected to succeed within 30 seconds when testing localhost.
func (r *hostPathRecycler) Recycle() error {
timeout := int64(30 * time.Second)
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
GenerateName: "pv-scrubber-" + util.ShortenString(r.name, 44) + "-",
Namespace: api.NamespaceDefault,
},
Spec: api.PodSpec{
ActiveDeadlineSeconds: &timeout,
RestartPolicy: api.RestartPolicyNever,
Volumes: []api.Volume{
{
Name: "vol",
VolumeSource: api.VolumeSource{
HostPath: &api.HostPathVolumeSource{r.path},
},
},
},
Containers: []api.Container{
{
Name: "scrubber",
Image: "busybox",
// delete the contents of the volume, but not the directory itself
Command: []string{"/bin/sh"},
// the scrubber:
// 1. validates the /scrub directory exists
// 2. creates a text file in the directory to be scrubbed
// 3. performs rm -rf on the directory
// 4. tests to see if the directory is empty
// the pod fails if the error code is returned
Args: []string{"-c", "test -e /scrub && echo $(date) > /scrub/trash.txt && rm -rf /scrub/* && test -z \"$(ls -A /scrub)\" || exit 1"},
VolumeMounts: []api.VolumeMount{
{
Name: "vol",
MountPath: "/scrub",
},
},
},
},
},
}
return volume.ScrubPodVolumeAndWatchUntilCompletion(pod, r.host.GetKubeClient())
}

View File

@ -59,6 +59,47 @@ func TestGetAccessModes(t *testing.T) {
}
}
func TestRecycler(t *testing.T) {
plugMgr := volume.VolumePluginMgr{}
plugMgr.InitPlugins([]volume.VolumePlugin{&hostPathPlugin{nil, newMockRecycler}}, volume.NewFakeVolumeHost("/tmp/fake", nil, nil))
spec := &volume.Spec{PersistentVolumeSource: api.PersistentVolumeSource{HostPath: &api.HostPathVolumeSource{Path: "/foo"}}}
plug, err := plugMgr.FindRecyclablePluginBySpec(spec)
if err != nil {
t.Errorf("Can't find the plugin by name")
}
recycler, err := plug.NewRecycler(spec)
if err != nil {
t.Error("Failed to make a new Recyler: %v", err)
}
if recycler.GetPath() != spec.PersistentVolumeSource.HostPath.Path {
t.Errorf("Expected %s but got %s", spec.PersistentVolumeSource.HostPath.Path, recycler.GetPath())
}
if err := recycler.Recycle(); err != nil {
t.Errorf("Mock Recycler expected to return nil but got %s", err)
}
}
func newMockRecycler(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error) {
return &mockRecycler{
path: spec.PersistentVolumeSource.HostPath.Path,
}, nil
}
type mockRecycler struct {
path string
host volume.VolumeHost
}
func (r *mockRecycler) GetPath() string {
return r.path
}
func (r *mockRecycler) Recycle() error {
// return nil means recycle passed
return nil
}
func TestPlugin(t *testing.T) {
plugMgr := volume.VolumePluginMgr{}
plugMgr.InitPlugins(ProbeVolumePlugins(), volume.NewFakeVolumeHost("fake", nil, nil))

View File

@ -19,25 +19,33 @@ package nfs
import (
"fmt"
"os"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/mount"
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume"
"github.com/golang/glog"
)
// This is the primary entrypoint for volume plugins.
// Tests covering recycling should not use this func but instead
// use their own array of plugins w/ a custom recyclerFunc as appropriate
func ProbeVolumePlugins() []volume.VolumePlugin {
return []volume.VolumePlugin{&nfsPlugin{nil}}
return []volume.VolumePlugin{&nfsPlugin{nil, newRecycler}}
}
type nfsPlugin struct {
host volume.VolumeHost
// decouple creating recyclers by deferring to a function. Allows for easier testing.
newRecyclerFunc func(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error)
}
var _ volume.VolumePlugin = &nfsPlugin{}
var _ volume.PersistentVolumePlugin = &nfsPlugin{}
var _ volume.RecyclableVolumePlugin = &nfsPlugin{}
const (
nfsPluginName = "kubernetes.io/nfs"
@ -103,6 +111,28 @@ func (plugin *nfsPlugin) newCleanerInternal(volName string, podUID types.UID, mo
}, nil
}
func (plugin *nfsPlugin) NewRecycler(spec *volume.Spec) (volume.Recycler, error) {
return plugin.newRecyclerFunc(spec, plugin.host)
}
func newRecycler(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error) {
if spec.VolumeSource.HostPath != nil {
return &nfsRecycler{
name: spec.Name,
server: spec.VolumeSource.NFS.Server,
path: spec.VolumeSource.NFS.Path,
host: host,
}, nil
} else {
return &nfsRecycler{
name: spec.Name,
server: spec.PersistentVolumeSource.NFS.Server,
path: spec.PersistentVolumeSource.NFS.Path,
host: host,
}, nil
}
}
// NFS volumes represent a bare host file or directory mount of an NFS export.
type nfs struct {
volName string
@ -112,6 +142,8 @@ type nfs struct {
readOnly bool
mounter mount.Interface
plugin *nfsPlugin
// decouple creating recyclers by deferring to a function. Allows for easier testing.
newRecyclerFunc func(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error)
}
// SetUp attaches the disk and bind mounts to the volume path.
@ -199,3 +231,66 @@ func (nfsVolume *nfs) TearDownAt(dir string) error {
return nil
}
// nfsRecycler scrubs an NFS volume by running "rm -rf" on the volume in a pod.
type nfsRecycler struct {
name string
server string
path string
host volume.VolumeHost
}
func (r *nfsRecycler) GetPath() string {
return r.path
}
// Recycler provides methods to reclaim the volume resource.
// A NFS volume is recycled by scheduling a pod to run "rm -rf" on the contents of the volume.
// Recycle blocks until the pod has completed or any error occurs.
// The scrubber pod's is expected to succeed within 5 minutes else an error will be returned
func (r *nfsRecycler) Recycle() error {
timeout := int64(300 * time.Second) // 5 minutes
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
GenerateName: "pv-scrubber-" + util.ShortenString(r.name, 44) + "-",
Namespace: api.NamespaceDefault,
},
Spec: api.PodSpec{
ActiveDeadlineSeconds: &timeout,
RestartPolicy: api.RestartPolicyNever,
Volumes: []api.Volume{
{
Name: "vol",
VolumeSource: api.VolumeSource{
NFS: &api.NFSVolumeSource{
Server: r.server,
Path: r.path,
},
},
},
},
Containers: []api.Container{
{
Name: "scrubber",
Image: "busybox",
// delete the contents of the volume, but not the directory itself
Command: []string{"/bin/sh"},
// the scrubber:
// 1. validates the /scrub directory exists
// 2. creates a text file to be scrubbed
// 3. performs rm -rf on the directory
// 4. tests to see if the directory is empty
// the pod fails if the error code is returned
Args: []string{"-c", "test -e /scrub && echo $(date) > /scrub/trash.txt && rm -rf /scrub/* && test -z \"$(ls -A /scrub)\" || exit 1"},
VolumeMounts: []api.VolumeMount{
{
Name: "vol",
MountPath: "/scrub",
},
},
},
},
},
}
return volume.ScrubPodVolumeAndWatchUntilCompletion(pod, r.host.GetKubeClient())
}

View File

@ -60,6 +60,47 @@ func TestGetAccessModes(t *testing.T) {
}
}
func TestRecycler(t *testing.T) {
plugMgr := volume.VolumePluginMgr{}
plugMgr.InitPlugins([]volume.VolumePlugin{&nfsPlugin{newRecyclerFunc: newMockRecycler}}, volume.NewFakeVolumeHost("/tmp/fake", nil, nil))
spec := &volume.Spec{PersistentVolumeSource: api.PersistentVolumeSource{NFS: &api.NFSVolumeSource{Path: "/foo"}}}
plug, err := plugMgr.FindRecyclablePluginBySpec(spec)
if err != nil {
t.Errorf("Can't find the plugin by name")
}
recycler, err := plug.NewRecycler(spec)
if err != nil {
t.Error("Failed to make a new Recyler: %v", err)
}
if recycler.GetPath() != spec.PersistentVolumeSource.NFS.Path {
t.Errorf("Expected %s but got %s", spec.PersistentVolumeSource.NFS.Path, recycler.GetPath())
}
if err := recycler.Recycle(); err != nil {
t.Errorf("Mock Recycler expected to return nil but got %s", err)
}
}
func newMockRecycler(spec *volume.Spec, host volume.VolumeHost) (volume.Recycler, error) {
return &mockRecycler{
path: spec.PersistentVolumeSource.NFS.Path,
}, nil
}
type mockRecycler struct {
path string
host volume.VolumeHost
}
func (r *mockRecycler) GetPath() string {
return r.path
}
func (r *mockRecycler) Recycle() error {
// return nil means recycle passed
return nil
}
func contains(modes []api.PersistentVolumeAccessMode, mode api.PersistentVolumeAccessMode) bool {
for _, m := range modes {
if m == mode {

View File

@ -17,7 +17,18 @@ limitations under the License.
package volume
import (
"fmt"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/golang/glog"
)
func GetAccessModesAsString(modes []api.PersistentVolumeAccessMode) string {
@ -51,3 +62,89 @@ func contains(modes []api.PersistentVolumeAccessMode, mode api.PersistentVolumeA
}
return false
}
// ScrubPodVolumeAndWatchUntilCompletion is intended for use with volume Recyclers. This function will
// save the given Pod to the API and watch it until it completes, fails, or the pod's ActiveDeadlineSeconds is exceeded, whichever comes first.
// An attempt to delete a scrubber pod is always attempted before returning.
// pod - the pod designed by a volume plugin to scrub the volume's contents
// client - kube client for API operations.
func ScrubPodVolumeAndWatchUntilCompletion(pod *api.Pod, kubeClient client.Interface) error {
return internalScrubPodVolumeAndWatchUntilCompletion(pod, newScrubberClient(kubeClient))
}
// same as above func comments, except 'scrubberClient' is a narrower pod API interface to ease testing
func internalScrubPodVolumeAndWatchUntilCompletion(pod *api.Pod, scrubberClient scrubberClient) error {
glog.V(5).Infof("Creating scrubber pod for volume %s\n", pod.Name)
pod, err := scrubberClient.CreatePod(pod)
if err != nil {
return fmt.Errorf("Unexpected error creating a pod to scrub volume %s: %+v\n", pod.Name, err)
}
defer scrubberClient.DeletePod(pod.Name, pod.Namespace)
nextPod := scrubberClient.WatchPod(pod.Name, pod.Namespace, pod.ResourceVersion)
for {
watchedPod := nextPod()
if watchedPod.Status.Phase == api.PodSucceeded {
// volume.Recycle() returns nil on success, else error
return nil
}
if watchedPod.Status.Phase == api.PodFailed {
// volume.Recycle() returns nil on success, else error
if watchedPod.Status.Message != "" {
return fmt.Errorf(watchedPod.Status.Message)
} else {
return fmt.Errorf("Pod failed, pod.Status.Message unknown.")
}
}
}
}
// scrubberClient abstracts access to a Pod by providing a narrower interface.
// this makes it easier to mock a client for testing
type scrubberClient interface {
CreatePod(pod *api.Pod) (*api.Pod, error)
GetPod(name, namespace string) (*api.Pod, error)
DeletePod(name, namespace string) error
WatchPod(name, namespace, resourceVersion string) func() *api.Pod
}
func newScrubberClient(client client.Interface) scrubberClient {
return &realScrubberClient{client}
}
type realScrubberClient struct {
client client.Interface
}
func (c *realScrubberClient) CreatePod(pod *api.Pod) (*api.Pod, error) {
return c.client.Pods(pod.Namespace).Create(pod)
}
func (c *realScrubberClient) GetPod(name, namespace string) (*api.Pod, error) {
return c.client.Pods(namespace).Get(name)
}
func (c *realScrubberClient) DeletePod(name, namespace string) error {
return c.client.Pods(namespace).Delete(name, nil)
}
func (c *realScrubberClient) WatchPod(name, namespace, resourceVersion string) func() *api.Pod {
fieldSelector, _ := fields.ParseSelector("metadata.name=" + name)
podLW := &cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return c.client.Pods(namespace).List(labels.Everything(), fieldSelector)
},
WatchFunc: func(resourceVersion string) (watch.Interface, error) {
return c.client.Pods(namespace).Watch(labels.Everything(), fieldSelector, resourceVersion)
},
}
queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc)
cache.NewReflector(podLW, &api.Pod{}, queue, 1*time.Minute).Run()
return func() *api.Pod {
obj := queue.Pop()
return obj.(*api.Pod)
}
}

102
pkg/volume/util_test.go Normal file
View File

@ -0,0 +1,102 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package volume
import (
"fmt"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"strings"
)
func TestScrubberSuccess(t *testing.T) {
client := &mockScrubberClient{}
scrubber := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "scrubber-test",
Namespace: api.NamespaceDefault,
},
Status: api.PodStatus{
Phase: api.PodSucceeded,
},
}
err := internalScrubPodVolumeAndWatchUntilCompletion(scrubber, client)
if err != nil {
t.Errorf("Unexpected error watching scrubber pod: %+v", err)
}
if !client.deletedCalled {
t.Errorf("Expected deferred client.Delete to be called on scrubber pod")
}
}
func TestScrubberFailure(t *testing.T) {
client := &mockScrubberClient{}
scrubber := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "scrubber-test",
Namespace: api.NamespaceDefault,
},
Status: api.PodStatus{
Phase: api.PodFailed,
Message: "foo",
},
}
err := internalScrubPodVolumeAndWatchUntilCompletion(scrubber, client)
if err == nil {
t.Fatalf("Expected pod failure but got nil error returned")
}
if err != nil {
if !strings.Contains(err.Error(), "foo") {
t.Errorf("Expected pod.Status.Message %s but got %s", scrubber.Status.Message, err)
}
}
if !client.deletedCalled {
t.Errorf("Expected deferred client.Delete to be called on scrubber pod")
}
}
type mockScrubberClient struct {
pod *api.Pod
deletedCalled bool
}
func (c *mockScrubberClient) CreatePod(pod *api.Pod) (*api.Pod, error) {
c.pod = pod
return c.pod, nil
}
func (c *mockScrubberClient) GetPod(name, namespace string) (*api.Pod, error) {
if c.pod != nil {
return c.pod, nil
} else {
return nil, fmt.Errorf("pod does not exist")
}
}
func (c *mockScrubberClient) DeletePod(name, namespace string) error {
c.deletedCalled = true
return nil
}
func (c *mockScrubberClient) WatchPod(name, namespace, resourceVersion string) func() *api.Pod {
return func() *api.Pod {
return c.pod
}
}