diff --git a/pkg/controller/persistentvolume/controller.go b/pkg/controller/persistentvolume/controller.go index e61a30baa6..eb499fc89b 100644 --- a/pkg/controller/persistentvolume/controller.go +++ b/pkg/controller/persistentvolume/controller.go @@ -865,7 +865,7 @@ func (ctrl *PersistentVolumeController) recycleVolumeOperation(arg interface{}) } // Plugin found - recycler, err := plugin.NewRecycler(spec) + recycler, err := plugin.NewRecycler(volume.Name, spec) if err != nil { // Cannot create recycler strerr := fmt.Sprintf("Failed to create recycler: %v", err) diff --git a/pkg/controller/persistentvolume/framework_test.go b/pkg/controller/persistentvolume/framework_test.go index aea5dc4e8b..6096799dc9 100644 --- a/pkg/controller/persistentvolume/framework_test.go +++ b/pkg/controller/persistentvolume/framework_test.go @@ -882,6 +882,9 @@ type mockVolumePlugin struct { } var _ vol.VolumePlugin = &mockVolumePlugin{} +var _ vol.RecyclableVolumePlugin = &mockVolumePlugin{} +var _ vol.DeletableVolumePlugin = &mockVolumePlugin{} +var _ vol.ProvisionableVolumePlugin = &mockVolumePlugin{} func (plugin *mockVolumePlugin) Init(host vol.VolumeHost) error { return nil @@ -981,7 +984,7 @@ func (plugin *mockVolumePlugin) GetMetrics() (*vol.Metrics, error) { // Recycler interfaces -func (plugin *mockVolumePlugin) NewRecycler(spec *vol.Spec) (vol.Recycler, error) { +func (plugin *mockVolumePlugin) NewRecycler(pvName string, spec *vol.Spec) (vol.Recycler, error) { if len(plugin.recycleCalls) > 0 { // mockVolumePlugin directly implements Recycler interface glog.V(4).Infof("mock plugin NewRecycler called, returning mock recycler") diff --git a/pkg/volume/host_path/host_path.go b/pkg/volume/host_path/host_path.go index ff2d9c667f..8a6cae457d 100644 --- a/pkg/volume/host_path/host_path.go +++ b/pkg/volume/host_path/host_path.go @@ -43,7 +43,7 @@ func ProbeVolumePlugins(volumeConfig volume.VolumeConfig) []volume.VolumePlugin } } -func ProbeRecyclableVolumePlugins(recyclerFunc func(spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error), volumeConfig volume.VolumeConfig) []volume.VolumePlugin { +func ProbeRecyclableVolumePlugins(recyclerFunc func(pvName string, spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error), volumeConfig volume.VolumeConfig) []volume.VolumePlugin { return []volume.VolumePlugin{ &hostPathPlugin{ host: nil, @@ -57,7 +57,7 @@ func ProbeRecyclableVolumePlugins(recyclerFunc func(spec *volume.Spec, host volu type hostPathPlugin struct { host volume.VolumeHost // decouple creating Recyclers/Deleters/Provisioners by deferring to a function. Allows for easier testing. - newRecyclerFunc func(spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error) + newRecyclerFunc func(pvName string, spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error) newDeleterFunc func(spec *volume.Spec, host volume.VolumeHost) (volume.Deleter, error) newProvisionerFunc func(options volume.VolumeOptions, host volume.VolumeHost) (volume.Provisioner, error) config volume.VolumeConfig @@ -115,8 +115,8 @@ func (plugin *hostPathPlugin) NewUnmounter(volName string, podUID types.UID) (vo }}, nil } -func (plugin *hostPathPlugin) NewRecycler(spec *volume.Spec) (volume.Recycler, error) { - return plugin.newRecyclerFunc(spec, plugin.host, plugin.config) +func (plugin *hostPathPlugin) NewRecycler(pvName string, spec *volume.Spec) (volume.Recycler, error) { + return plugin.newRecyclerFunc(pvName, spec, plugin.host, plugin.config) } func (plugin *hostPathPlugin) NewDeleter(spec *volume.Spec) (volume.Deleter, error) { @@ -130,7 +130,7 @@ func (plugin *hostPathPlugin) NewProvisioner(options volume.VolumeOptions) (volu return plugin.newProvisionerFunc(options, plugin.host) } -func newRecycler(spec *volume.Spec, host volume.VolumeHost, config volume.VolumeConfig) (volume.Recycler, error) { +func newRecycler(pvName string, spec *volume.Spec, host volume.VolumeHost, config volume.VolumeConfig) (volume.Recycler, error) { if spec.PersistentVolume == nil || spec.PersistentVolume.Spec.HostPath == nil { return nil, fmt.Errorf("spec.PersistentVolumeSource.HostPath is nil") } @@ -141,6 +141,7 @@ func newRecycler(spec *volume.Spec, host volume.VolumeHost, config volume.Volume host: host, config: config, timeout: volume.CalculateTimeoutForVolume(config.RecyclerMinimumTimeout, config.RecyclerTimeoutIncrement, spec.PersistentVolume), + pvName: pvName, }, nil } @@ -221,6 +222,7 @@ type hostPathRecycler struct { config volume.VolumeConfig timeout int64 volume.MetricsNil + pvName string } func (r *hostPathRecycler) GetPath() string { @@ -234,13 +236,12 @@ func (r *hostPathRecycler) Recycle() error { pod := r.config.RecyclerPodTemplate // overrides pod.Spec.ActiveDeadlineSeconds = &r.timeout - pod.GenerateName = "pv-recycler-hostpath-" pod.Spec.Volumes[0].VolumeSource = api.VolumeSource{ HostPath: &api.HostPathVolumeSource{ Path: r.path, }, } - return volume.RecycleVolumeByWatchingPodUntilCompletion(pod, r.host.GetKubeClient()) + return volume.RecycleVolumeByWatchingPodUntilCompletion(r.pvName, pod, r.host.GetKubeClient()) } // hostPathProvisioner implements a Provisioner for the HostPath plugin diff --git a/pkg/volume/host_path/host_path_test.go b/pkg/volume/host_path/host_path_test.go index 6a6e9e2f3b..1f4e892275 100644 --- a/pkg/volume/host_path/host_path_test.go +++ b/pkg/volume/host_path/host_path_test.go @@ -77,7 +77,7 @@ func TestRecycler(t *testing.T) { if err != nil { t.Errorf("Can't find the plugin by name") } - recycler, err := plug.NewRecycler(spec) + recycler, err := plug.NewRecycler("pv-name", spec) if err != nil { t.Errorf("Failed to make a new Recyler: %v", err) } diff --git a/pkg/volume/nfs/nfs.go b/pkg/volume/nfs/nfs.go index 09a6af5017..6b787c6bc3 100644 --- a/pkg/volume/nfs/nfs.go +++ b/pkg/volume/nfs/nfs.go @@ -46,7 +46,7 @@ func ProbeVolumePlugins(volumeConfig volume.VolumeConfig) []volume.VolumePlugin type nfsPlugin struct { host volume.VolumeHost // decouple creating recyclers by deferring to a function. Allows for easier testing. - newRecyclerFunc func(spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error) + newRecyclerFunc func(pvName string, spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error) config volume.VolumeConfig } @@ -120,8 +120,8 @@ func (plugin *nfsPlugin) newUnmounterInternal(volName string, podUID types.UID, }}, nil } -func (plugin *nfsPlugin) NewRecycler(spec *volume.Spec) (volume.Recycler, error) { - return plugin.newRecyclerFunc(spec, plugin.host, plugin.config) +func (plugin *nfsPlugin) NewRecycler(pvName string, spec *volume.Spec) (volume.Recycler, error) { + return plugin.newRecyclerFunc(pvName, spec, plugin.host, plugin.config) } // NFS volumes represent a bare host file or directory mount of an NFS export. @@ -250,7 +250,7 @@ func (c *nfsUnmounter) TearDownAt(dir string) error { return nil } -func newRecycler(spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error) { +func newRecycler(pvName string, spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error) { if spec.PersistentVolume == nil || spec.PersistentVolume.Spec.NFS == nil { return nil, fmt.Errorf("spec.PersistentVolumeSource.NFS is nil") } @@ -261,6 +261,7 @@ func newRecycler(spec *volume.Spec, host volume.VolumeHost, volumeConfig volume. host: host, config: volumeConfig, timeout: volume.CalculateTimeoutForVolume(volumeConfig.RecyclerMinimumTimeout, volumeConfig.RecyclerTimeoutIncrement, spec.PersistentVolume), + pvName: pvName, }, nil } @@ -273,6 +274,7 @@ type nfsRecycler struct { config volume.VolumeConfig timeout int64 volume.MetricsNil + pvName string } func (r *nfsRecycler) GetPath() string { @@ -292,5 +294,5 @@ func (r *nfsRecycler) Recycle() error { Path: r.path, }, } - return volume.RecycleVolumeByWatchingPodUntilCompletion(pod, r.host.GetKubeClient()) + return volume.RecycleVolumeByWatchingPodUntilCompletion(r.pvName, pod, r.host.GetKubeClient()) } diff --git a/pkg/volume/nfs/nfs_test.go b/pkg/volume/nfs/nfs_test.go index 4a045148c2..29b6fdf233 100644 --- a/pkg/volume/nfs/nfs_test.go +++ b/pkg/volume/nfs/nfs_test.go @@ -91,7 +91,7 @@ func TestRecycler(t *testing.T) { if err != nil { t.Errorf("Can't find the plugin by name") } - recycler, err := plug.NewRecycler(spec) + recycler, err := plug.NewRecycler("pv-name", spec) if err != nil { t.Errorf("Failed to make a new Recyler: %v", err) } @@ -103,7 +103,7 @@ func TestRecycler(t *testing.T) { } } -func newMockRecycler(spec *volume.Spec, host volume.VolumeHost, config volume.VolumeConfig) (volume.Recycler, error) { +func newMockRecycler(pvName string, spec *volume.Spec, host volume.VolumeHost, config volume.VolumeConfig) (volume.Recycler, error) { return &mockRecycler{ path: spec.PersistentVolume.Spec.NFS.Path, }, nil diff --git a/pkg/volume/plugins.go b/pkg/volume/plugins.go index c9195f6c28..31a69e1b17 100644 --- a/pkg/volume/plugins.go +++ b/pkg/volume/plugins.go @@ -101,7 +101,7 @@ type RecyclableVolumePlugin interface { VolumePlugin // NewRecycler creates a new volume.Recycler which knows how to reclaim this resource // after the volume's release from a PersistentVolumeClaim - NewRecycler(spec *Spec) (Recycler, error) + NewRecycler(pvName string, spec *Spec) (Recycler, error) } // DeletableVolumePlugin is an extended interface of VolumePlugin and is used by persistent volumes that want @@ -238,6 +238,9 @@ type VolumeConfig struct { // Example: 5Gi volume x 30s increment = 150s + 30s minimum = 180s ActiveDeadlineSeconds for recycler pod RecyclerTimeoutIncrement int + // PVName is name of the PersistentVolume instance that is being recycled. It is used to generate unique recycler pod name. + PVName string + // OtherAttributes stores config as strings. These strings are opaque to the system and only understood by the binary // hosting the plugin and the plugin itself. OtherAttributes map[string]string diff --git a/pkg/volume/testing/testing.go b/pkg/volume/testing/testing.go index b126004f22..b424765d2f 100644 --- a/pkg/volume/testing/testing.go +++ b/pkg/volume/testing/testing.go @@ -199,7 +199,7 @@ func (plugin *FakeVolumePlugin) NewDetacher() (Detacher, error) { return plugin.getFakeVolume(&plugin.Detachers), nil } -func (plugin *FakeVolumePlugin) NewRecycler(spec *Spec) (Recycler, error) { +func (plugin *FakeVolumePlugin) NewRecycler(pvName string, spec *Spec) (Recycler, error) { return &fakeRecycler{"/attributesTransferredFromSpec", MetricsNil{}}, nil } @@ -312,7 +312,7 @@ func (fr *fakeRecycler) GetPath() string { return fr.path } -func NewFakeRecycler(spec *Spec, host VolumeHost, config VolumeConfig) (Recycler, error) { +func NewFakeRecycler(pvName string, spec *Spec, host VolumeHost, config VolumeConfig) (Recycler, error) { if spec.PersistentVolume == nil || spec.PersistentVolume.Spec.HostPath == nil { return nil, fmt.Errorf("fakeRecycler only supports spec.PersistentVolume.Spec.HostPath") } diff --git a/pkg/volume/util.go b/pkg/volume/util.go index 7dc454d3cd..c4a532fa16 100644 --- a/pkg/volume/util.go +++ b/pkg/volume/util.go @@ -28,31 +28,52 @@ import ( "k8s.io/kubernetes/pkg/watch" "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/resource" ) -// RecycleVolumeByWatchingPodUntilCompletion is intended for use with volume Recyclers. This function will -// save the given Pod to the API and watch it until it completes, fails, or the pod's ActiveDeadlineSeconds is exceeded, whichever comes first. -// An attempt to delete a recycler pod is always attempted before returning. -// pod - the pod designed by a volume plugin to recycle the volume +// RecycleVolumeByWatchingPodUntilCompletion is intended for use with volume +// Recyclers. This function will save the given Pod to the API and watch it +// until it completes, fails, or the pod's ActiveDeadlineSeconds is exceeded, +// whichever comes first. An attempt to delete a recycler pod is always +// attempted before returning. +// +// In case there is a pod with the same namespace+name already running, this +// function assumes it's an older instance of the recycler pod and watches this +// old pod instead of starting a new one. +// +// pod - the pod designed by a volume plugin to recycle the volume. pod.Name +// will be overwritten with unique name based on PV.Name. // client - kube client for API operations. -func RecycleVolumeByWatchingPodUntilCompletion(pod *api.Pod, kubeClient clientset.Interface) error { - return internalRecycleVolumeByWatchingPodUntilCompletion(pod, newRecyclerClient(kubeClient)) +func RecycleVolumeByWatchingPodUntilCompletion(pvName string, pod *api.Pod, kubeClient clientset.Interface) error { + return internalRecycleVolumeByWatchingPodUntilCompletion(pvName, pod, newRecyclerClient(kubeClient)) } // same as above func comments, except 'recyclerClient' is a narrower pod API interface to ease testing -func internalRecycleVolumeByWatchingPodUntilCompletion(pod *api.Pod, recyclerClient recyclerClient) error { - glog.V(5).Infof("Creating recycler pod for volume %s\n", pod.Name) - pod, err := recyclerClient.CreatePod(pod) - if err != nil { - return fmt.Errorf("Unexpected error creating recycler pod: %+v\n", err) - } +func internalRecycleVolumeByWatchingPodUntilCompletion(pvName string, pod *api.Pod, recyclerClient recyclerClient) error { + glog.V(5).Infof("creating recycler pod for volume %s\n", pod.Name) + // Generate unique name for the recycler pod - we need to get "already + // exists" error when a previous controller has already started recycling + // the volume. Here we assume that pv.Name is already unique. + pod.Name = "recycler-for-" + pvName + pod.GenerateName = "" + + // Start the pod + _, err := recyclerClient.CreatePod(pod) + if err != nil { + if errors.IsAlreadyExists(err) { + glog.V(5).Infof("old recycler pod %q found for volume", pod.Name) + } else { + return fmt.Errorf("Unexpected error creating recycler pod: %+v\n", err) + } + } defer recyclerClient.DeletePod(pod.Name, pod.Namespace) + // Now only the old pod or the new pod run. Watch it until it finishes. stopChannel := make(chan struct{}) defer close(stopChannel) - nextPod := recyclerClient.WatchPod(pod.Name, pod.Namespace, pod.ResourceVersion, stopChannel) + nextPod := recyclerClient.WatchPod(pod.Name, pod.Namespace, stopChannel) for { watchedPod := nextPod() @@ -65,7 +86,7 @@ func internalRecycleVolumeByWatchingPodUntilCompletion(pod *api.Pod, recyclerCli if watchedPod.Status.Message != "" { return fmt.Errorf(watchedPod.Status.Message) } else { - return fmt.Errorf("Pod failed, pod.Status.Message unknown.") + return fmt.Errorf("pod failed, pod.Status.Message unknown.") } } } @@ -77,7 +98,7 @@ type recyclerClient interface { CreatePod(pod *api.Pod) (*api.Pod, error) GetPod(name, namespace string) (*api.Pod, error) DeletePod(name, namespace string) error - WatchPod(name, namespace, resourceVersion string, stopChannel chan struct{}) func() *api.Pod + WatchPod(name, namespace string, stopChannel chan struct{}) func() *api.Pod } func newRecyclerClient(client clientset.Interface) recyclerClient { @@ -103,7 +124,7 @@ func (c *realRecyclerClient) DeletePod(name, namespace string) error { // WatchPod returns a ListWatch for watching a pod. The stopChannel is used // to close the reflector backing the watch. The caller is responsible for derring a close on the channel to // stop the reflector. -func (c *realRecyclerClient) WatchPod(name, namespace, resourceVersion string, stopChannel chan struct{}) func() *api.Pod { +func (c *realRecyclerClient) WatchPod(name, namespace string, stopChannel chan struct{}) func() *api.Pod { fieldSelector, _ := fields.ParseSelector("metadata.name=" + name) podLW := &cache.ListWatch{ diff --git a/pkg/volume/util_test.go b/pkg/volume/util_test.go index faccd021ad..aab1be192e 100644 --- a/pkg/volume/util_test.go +++ b/pkg/volume/util_test.go @@ -22,6 +22,7 @@ import ( "testing" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/resource" ) @@ -29,7 +30,6 @@ func TestRecyclerSuccess(t *testing.T) { client := &mockRecyclerClient{} recycler := &api.Pod{ ObjectMeta: api.ObjectMeta{ - Name: "recycler-test", Namespace: api.NamespaceDefault, }, Status: api.PodStatus{ @@ -37,7 +37,7 @@ func TestRecyclerSuccess(t *testing.T) { }, } - err := internalRecycleVolumeByWatchingPodUntilCompletion(recycler, client) + err := internalRecycleVolumeByWatchingPodUntilCompletion("pv-name", recycler, client) if err != nil { t.Errorf("Unexpected error watching recycler pod: %+v", err) } @@ -50,7 +50,6 @@ func TestRecyclerFailure(t *testing.T) { client := &mockRecyclerClient{} recycler := &api.Pod{ ObjectMeta: api.ObjectMeta{ - Name: "recycler-test", Namespace: api.NamespaceDefault, }, Status: api.PodStatus{ @@ -59,7 +58,7 @@ func TestRecyclerFailure(t *testing.T) { }, } - err := internalRecycleVolumeByWatchingPodUntilCompletion(recycler, client) + err := internalRecycleVolumeByWatchingPodUntilCompletion("pv-name", recycler, client) if err == nil { t.Fatalf("Expected pod failure but got nil error returned") } @@ -73,14 +72,67 @@ func TestRecyclerFailure(t *testing.T) { } } +func TestRecyclerAlreadyExists(t *testing.T) { + // Test that internalRecycleVolumeByWatchingPodUntilCompletion does not + // start a new recycler when an old one is already running. + + // Old recycler is running and fails with "foo" error message + oldRecycler := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "recycler-test", + Namespace: api.NamespaceDefault, + }, + Status: api.PodStatus{ + Phase: api.PodFailed, + Message: "foo", + }, + } + + // New recycler _would_ succeed if it was run + newRecycler := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "recycler-test", + Namespace: api.NamespaceDefault, + }, + Status: api.PodStatus{ + Phase: api.PodSucceeded, + Message: "bar", + }, + } + + client := &mockRecyclerClient{ + pod: oldRecycler, + } + + err := internalRecycleVolumeByWatchingPodUntilCompletion("pv-name", newRecycler, client) + if err == nil { + t.Fatalf("Expected pod failure but got nil error returned") + } + + // Check the recycler failed with "foo" error message, i.e. it was the + // old recycler that finished and not the new one. + if err != nil { + if !strings.Contains(err.Error(), "foo") { + t.Errorf("Expected pod.Status.Message %s but got %s", oldRecycler.Status.Message, err) + } + } + if !client.deletedCalled { + t.Errorf("Expected deferred client.Delete to be called on recycler pod") + } +} + type mockRecyclerClient struct { pod *api.Pod deletedCalled bool } func (c *mockRecyclerClient) CreatePod(pod *api.Pod) (*api.Pod, error) { - c.pod = pod - return c.pod, nil + if c.pod == nil { + c.pod = pod + return c.pod, nil + } + // Simulate "already exists" error + return nil, errors.NewAlreadyExists(api.Resource("pods"), pod.Name) } func (c *mockRecyclerClient) GetPod(name, namespace string) (*api.Pod, error) { @@ -96,7 +148,7 @@ func (c *mockRecyclerClient) DeletePod(name, namespace string) error { return nil } -func (c *mockRecyclerClient) WatchPod(name, namespace, resourceVersion string, stopChannel chan struct{}) func() *api.Pod { +func (c *mockRecyclerClient) WatchPod(name, namespace string, stopChannel chan struct{}) func() *api.Pod { return func() *api.Pod { return c.pod }