Merge pull request #25263 from jsafrane/devel/adopt-recycle-pod

Automatic merge from submit-queue

volume recycler: Don't start a new recycler pod if one already exists.

Recycling is a long duration process and when the recycler controller is restarted in the meantime, it should not start a new recycler pod if there is one already running.

This means that the recycler pod must have deterministic name based on name of the recycled PV, we then get name conflicts when creating the pod.

Two things need to be changed:

- recycler controller and recycler plugins must pass the PV.Name to place, where the pod is created. This is most of the patch and it should be pretty straightforward.

- create recycler pod with deterministic name and check "already exists" error.

When at it, remove useless 'resourceVersion' argument and make log messages starting with lowercase.

There is an unit test to check the behavior + there is an e2e test that checks that regular recycling is not broken (it does not try to run two recycler pods in parallel as the recycler is single-threaded now).
pull/6/head
k8s-merge-robot 2016-05-21 02:28:26 -07:00
commit 62a8394eb4
10 changed files with 125 additions and 43 deletions

View File

@ -865,7 +865,7 @@ func (ctrl *PersistentVolumeController) recycleVolumeOperation(arg interface{})
}
// Plugin found
recycler, err := plugin.NewRecycler(spec)
recycler, err := plugin.NewRecycler(volume.Name, spec)
if err != nil {
// Cannot create recycler
strerr := fmt.Sprintf("Failed to create recycler: %v", err)

View File

@ -882,6 +882,9 @@ type mockVolumePlugin struct {
}
var _ vol.VolumePlugin = &mockVolumePlugin{}
var _ vol.RecyclableVolumePlugin = &mockVolumePlugin{}
var _ vol.DeletableVolumePlugin = &mockVolumePlugin{}
var _ vol.ProvisionableVolumePlugin = &mockVolumePlugin{}
func (plugin *mockVolumePlugin) Init(host vol.VolumeHost) error {
return nil
@ -981,7 +984,7 @@ func (plugin *mockVolumePlugin) GetMetrics() (*vol.Metrics, error) {
// Recycler interfaces
func (plugin *mockVolumePlugin) NewRecycler(spec *vol.Spec) (vol.Recycler, error) {
func (plugin *mockVolumePlugin) NewRecycler(pvName string, spec *vol.Spec) (vol.Recycler, error) {
if len(plugin.recycleCalls) > 0 {
// mockVolumePlugin directly implements Recycler interface
glog.V(4).Infof("mock plugin NewRecycler called, returning mock recycler")

View File

@ -43,7 +43,7 @@ func ProbeVolumePlugins(volumeConfig volume.VolumeConfig) []volume.VolumePlugin
}
}
func ProbeRecyclableVolumePlugins(recyclerFunc func(spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error), volumeConfig volume.VolumeConfig) []volume.VolumePlugin {
func ProbeRecyclableVolumePlugins(recyclerFunc func(pvName string, spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error), volumeConfig volume.VolumeConfig) []volume.VolumePlugin {
return []volume.VolumePlugin{
&hostPathPlugin{
host: nil,
@ -57,7 +57,7 @@ func ProbeRecyclableVolumePlugins(recyclerFunc func(spec *volume.Spec, host volu
type hostPathPlugin struct {
host volume.VolumeHost
// decouple creating Recyclers/Deleters/Provisioners by deferring to a function. Allows for easier testing.
newRecyclerFunc func(spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error)
newRecyclerFunc func(pvName string, spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error)
newDeleterFunc func(spec *volume.Spec, host volume.VolumeHost) (volume.Deleter, error)
newProvisionerFunc func(options volume.VolumeOptions, host volume.VolumeHost) (volume.Provisioner, error)
config volume.VolumeConfig
@ -115,8 +115,8 @@ func (plugin *hostPathPlugin) NewUnmounter(volName string, podUID types.UID) (vo
}}, nil
}
func (plugin *hostPathPlugin) NewRecycler(spec *volume.Spec) (volume.Recycler, error) {
return plugin.newRecyclerFunc(spec, plugin.host, plugin.config)
func (plugin *hostPathPlugin) NewRecycler(pvName string, spec *volume.Spec) (volume.Recycler, error) {
return plugin.newRecyclerFunc(pvName, spec, plugin.host, plugin.config)
}
func (plugin *hostPathPlugin) NewDeleter(spec *volume.Spec) (volume.Deleter, error) {
@ -130,7 +130,7 @@ func (plugin *hostPathPlugin) NewProvisioner(options volume.VolumeOptions) (volu
return plugin.newProvisionerFunc(options, plugin.host)
}
func newRecycler(spec *volume.Spec, host volume.VolumeHost, config volume.VolumeConfig) (volume.Recycler, error) {
func newRecycler(pvName string, spec *volume.Spec, host volume.VolumeHost, config volume.VolumeConfig) (volume.Recycler, error) {
if spec.PersistentVolume == nil || spec.PersistentVolume.Spec.HostPath == nil {
return nil, fmt.Errorf("spec.PersistentVolumeSource.HostPath is nil")
}
@ -141,6 +141,7 @@ func newRecycler(spec *volume.Spec, host volume.VolumeHost, config volume.Volume
host: host,
config: config,
timeout: volume.CalculateTimeoutForVolume(config.RecyclerMinimumTimeout, config.RecyclerTimeoutIncrement, spec.PersistentVolume),
pvName: pvName,
}, nil
}
@ -221,6 +222,7 @@ type hostPathRecycler struct {
config volume.VolumeConfig
timeout int64
volume.MetricsNil
pvName string
}
func (r *hostPathRecycler) GetPath() string {
@ -234,13 +236,12 @@ func (r *hostPathRecycler) Recycle() error {
pod := r.config.RecyclerPodTemplate
// overrides
pod.Spec.ActiveDeadlineSeconds = &r.timeout
pod.GenerateName = "pv-recycler-hostpath-"
pod.Spec.Volumes[0].VolumeSource = api.VolumeSource{
HostPath: &api.HostPathVolumeSource{
Path: r.path,
},
}
return volume.RecycleVolumeByWatchingPodUntilCompletion(pod, r.host.GetKubeClient())
return volume.RecycleVolumeByWatchingPodUntilCompletion(r.pvName, pod, r.host.GetKubeClient())
}
// hostPathProvisioner implements a Provisioner for the HostPath plugin

View File

@ -77,7 +77,7 @@ func TestRecycler(t *testing.T) {
if err != nil {
t.Errorf("Can't find the plugin by name")
}
recycler, err := plug.NewRecycler(spec)
recycler, err := plug.NewRecycler("pv-name", spec)
if err != nil {
t.Errorf("Failed to make a new Recyler: %v", err)
}

View File

@ -46,7 +46,7 @@ func ProbeVolumePlugins(volumeConfig volume.VolumeConfig) []volume.VolumePlugin
type nfsPlugin struct {
host volume.VolumeHost
// decouple creating recyclers by deferring to a function. Allows for easier testing.
newRecyclerFunc func(spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error)
newRecyclerFunc func(pvName string, spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error)
config volume.VolumeConfig
}
@ -120,8 +120,8 @@ func (plugin *nfsPlugin) newUnmounterInternal(volName string, podUID types.UID,
}}, nil
}
func (plugin *nfsPlugin) NewRecycler(spec *volume.Spec) (volume.Recycler, error) {
return plugin.newRecyclerFunc(spec, plugin.host, plugin.config)
func (plugin *nfsPlugin) NewRecycler(pvName string, spec *volume.Spec) (volume.Recycler, error) {
return plugin.newRecyclerFunc(pvName, spec, plugin.host, plugin.config)
}
// NFS volumes represent a bare host file or directory mount of an NFS export.
@ -250,7 +250,7 @@ func (c *nfsUnmounter) TearDownAt(dir string) error {
return nil
}
func newRecycler(spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error) {
func newRecycler(pvName string, spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error) {
if spec.PersistentVolume == nil || spec.PersistentVolume.Spec.NFS == nil {
return nil, fmt.Errorf("spec.PersistentVolumeSource.NFS is nil")
}
@ -261,6 +261,7 @@ func newRecycler(spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.
host: host,
config: volumeConfig,
timeout: volume.CalculateTimeoutForVolume(volumeConfig.RecyclerMinimumTimeout, volumeConfig.RecyclerTimeoutIncrement, spec.PersistentVolume),
pvName: pvName,
}, nil
}
@ -273,6 +274,7 @@ type nfsRecycler struct {
config volume.VolumeConfig
timeout int64
volume.MetricsNil
pvName string
}
func (r *nfsRecycler) GetPath() string {
@ -292,5 +294,5 @@ func (r *nfsRecycler) Recycle() error {
Path: r.path,
},
}
return volume.RecycleVolumeByWatchingPodUntilCompletion(pod, r.host.GetKubeClient())
return volume.RecycleVolumeByWatchingPodUntilCompletion(r.pvName, pod, r.host.GetKubeClient())
}

View File

@ -91,7 +91,7 @@ func TestRecycler(t *testing.T) {
if err != nil {
t.Errorf("Can't find the plugin by name")
}
recycler, err := plug.NewRecycler(spec)
recycler, err := plug.NewRecycler("pv-name", spec)
if err != nil {
t.Errorf("Failed to make a new Recyler: %v", err)
}
@ -103,7 +103,7 @@ func TestRecycler(t *testing.T) {
}
}
func newMockRecycler(spec *volume.Spec, host volume.VolumeHost, config volume.VolumeConfig) (volume.Recycler, error) {
func newMockRecycler(pvName string, spec *volume.Spec, host volume.VolumeHost, config volume.VolumeConfig) (volume.Recycler, error) {
return &mockRecycler{
path: spec.PersistentVolume.Spec.NFS.Path,
}, nil

View File

@ -101,7 +101,7 @@ type RecyclableVolumePlugin interface {
VolumePlugin
// NewRecycler creates a new volume.Recycler which knows how to reclaim this resource
// after the volume's release from a PersistentVolumeClaim
NewRecycler(spec *Spec) (Recycler, error)
NewRecycler(pvName string, spec *Spec) (Recycler, error)
}
// DeletableVolumePlugin is an extended interface of VolumePlugin and is used by persistent volumes that want
@ -238,6 +238,9 @@ type VolumeConfig struct {
// Example: 5Gi volume x 30s increment = 150s + 30s minimum = 180s ActiveDeadlineSeconds for recycler pod
RecyclerTimeoutIncrement int
// PVName is name of the PersistentVolume instance that is being recycled. It is used to generate unique recycler pod name.
PVName string
// OtherAttributes stores config as strings. These strings are opaque to the system and only understood by the binary
// hosting the plugin and the plugin itself.
OtherAttributes map[string]string

View File

@ -199,7 +199,7 @@ func (plugin *FakeVolumePlugin) NewDetacher() (Detacher, error) {
return plugin.getFakeVolume(&plugin.Detachers), nil
}
func (plugin *FakeVolumePlugin) NewRecycler(spec *Spec) (Recycler, error) {
func (plugin *FakeVolumePlugin) NewRecycler(pvName string, spec *Spec) (Recycler, error) {
return &fakeRecycler{"/attributesTransferredFromSpec", MetricsNil{}}, nil
}
@ -312,7 +312,7 @@ func (fr *fakeRecycler) GetPath() string {
return fr.path
}
func NewFakeRecycler(spec *Spec, host VolumeHost, config VolumeConfig) (Recycler, error) {
func NewFakeRecycler(pvName string, spec *Spec, host VolumeHost, config VolumeConfig) (Recycler, error) {
if spec.PersistentVolume == nil || spec.PersistentVolume.Spec.HostPath == nil {
return nil, fmt.Errorf("fakeRecycler only supports spec.PersistentVolume.Spec.HostPath")
}

View File

@ -28,31 +28,52 @@ import (
"k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/resource"
)
// RecycleVolumeByWatchingPodUntilCompletion is intended for use with volume Recyclers. This function will
// save the given Pod to the API and watch it until it completes, fails, or the pod's ActiveDeadlineSeconds is exceeded, whichever comes first.
// An attempt to delete a recycler pod is always attempted before returning.
// pod - the pod designed by a volume plugin to recycle the volume
// RecycleVolumeByWatchingPodUntilCompletion is intended for use with volume
// Recyclers. This function will save the given Pod to the API and watch it
// until it completes, fails, or the pod's ActiveDeadlineSeconds is exceeded,
// whichever comes first. An attempt to delete a recycler pod is always
// attempted before returning.
//
// In case there is a pod with the same namespace+name already running, this
// function assumes it's an older instance of the recycler pod and watches this
// old pod instead of starting a new one.
//
// pod - the pod designed by a volume plugin to recycle the volume. pod.Name
// will be overwritten with unique name based on PV.Name.
// client - kube client for API operations.
func RecycleVolumeByWatchingPodUntilCompletion(pod *api.Pod, kubeClient clientset.Interface) error {
return internalRecycleVolumeByWatchingPodUntilCompletion(pod, newRecyclerClient(kubeClient))
func RecycleVolumeByWatchingPodUntilCompletion(pvName string, pod *api.Pod, kubeClient clientset.Interface) error {
return internalRecycleVolumeByWatchingPodUntilCompletion(pvName, pod, newRecyclerClient(kubeClient))
}
// same as above func comments, except 'recyclerClient' is a narrower pod API interface to ease testing
func internalRecycleVolumeByWatchingPodUntilCompletion(pod *api.Pod, recyclerClient recyclerClient) error {
glog.V(5).Infof("Creating recycler pod for volume %s\n", pod.Name)
pod, err := recyclerClient.CreatePod(pod)
func internalRecycleVolumeByWatchingPodUntilCompletion(pvName string, pod *api.Pod, recyclerClient recyclerClient) error {
glog.V(5).Infof("creating recycler pod for volume %s\n", pod.Name)
// Generate unique name for the recycler pod - we need to get "already
// exists" error when a previous controller has already started recycling
// the volume. Here we assume that pv.Name is already unique.
pod.Name = "recycler-for-" + pvName
pod.GenerateName = ""
// Start the pod
_, err := recyclerClient.CreatePod(pod)
if err != nil {
if errors.IsAlreadyExists(err) {
glog.V(5).Infof("old recycler pod %q found for volume", pod.Name)
} else {
return fmt.Errorf("Unexpected error creating recycler pod: %+v\n", err)
}
}
defer recyclerClient.DeletePod(pod.Name, pod.Namespace)
// Now only the old pod or the new pod run. Watch it until it finishes.
stopChannel := make(chan struct{})
defer close(stopChannel)
nextPod := recyclerClient.WatchPod(pod.Name, pod.Namespace, pod.ResourceVersion, stopChannel)
nextPod := recyclerClient.WatchPod(pod.Name, pod.Namespace, stopChannel)
for {
watchedPod := nextPod()
@ -65,7 +86,7 @@ func internalRecycleVolumeByWatchingPodUntilCompletion(pod *api.Pod, recyclerCli
if watchedPod.Status.Message != "" {
return fmt.Errorf(watchedPod.Status.Message)
} else {
return fmt.Errorf("Pod failed, pod.Status.Message unknown.")
return fmt.Errorf("pod failed, pod.Status.Message unknown.")
}
}
}
@ -77,7 +98,7 @@ type recyclerClient interface {
CreatePod(pod *api.Pod) (*api.Pod, error)
GetPod(name, namespace string) (*api.Pod, error)
DeletePod(name, namespace string) error
WatchPod(name, namespace, resourceVersion string, stopChannel chan struct{}) func() *api.Pod
WatchPod(name, namespace string, stopChannel chan struct{}) func() *api.Pod
}
func newRecyclerClient(client clientset.Interface) recyclerClient {
@ -103,7 +124,7 @@ func (c *realRecyclerClient) DeletePod(name, namespace string) error {
// WatchPod returns a ListWatch for watching a pod. The stopChannel is used
// to close the reflector backing the watch. The caller is responsible for derring a close on the channel to
// stop the reflector.
func (c *realRecyclerClient) WatchPod(name, namespace, resourceVersion string, stopChannel chan struct{}) func() *api.Pod {
func (c *realRecyclerClient) WatchPod(name, namespace string, stopChannel chan struct{}) func() *api.Pod {
fieldSelector, _ := fields.ParseSelector("metadata.name=" + name)
podLW := &cache.ListWatch{

View File

@ -22,6 +22,7 @@ import (
"testing"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/resource"
)
@ -29,7 +30,6 @@ func TestRecyclerSuccess(t *testing.T) {
client := &mockRecyclerClient{}
recycler := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "recycler-test",
Namespace: api.NamespaceDefault,
},
Status: api.PodStatus{
@ -37,7 +37,7 @@ func TestRecyclerSuccess(t *testing.T) {
},
}
err := internalRecycleVolumeByWatchingPodUntilCompletion(recycler, client)
err := internalRecycleVolumeByWatchingPodUntilCompletion("pv-name", recycler, client)
if err != nil {
t.Errorf("Unexpected error watching recycler pod: %+v", err)
}
@ -50,7 +50,6 @@ func TestRecyclerFailure(t *testing.T) {
client := &mockRecyclerClient{}
recycler := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "recycler-test",
Namespace: api.NamespaceDefault,
},
Status: api.PodStatus{
@ -59,7 +58,7 @@ func TestRecyclerFailure(t *testing.T) {
},
}
err := internalRecycleVolumeByWatchingPodUntilCompletion(recycler, client)
err := internalRecycleVolumeByWatchingPodUntilCompletion("pv-name", recycler, client)
if err == nil {
t.Fatalf("Expected pod failure but got nil error returned")
}
@ -73,15 +72,68 @@ func TestRecyclerFailure(t *testing.T) {
}
}
func TestRecyclerAlreadyExists(t *testing.T) {
// Test that internalRecycleVolumeByWatchingPodUntilCompletion does not
// start a new recycler when an old one is already running.
// Old recycler is running and fails with "foo" error message
oldRecycler := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "recycler-test",
Namespace: api.NamespaceDefault,
},
Status: api.PodStatus{
Phase: api.PodFailed,
Message: "foo",
},
}
// New recycler _would_ succeed if it was run
newRecycler := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "recycler-test",
Namespace: api.NamespaceDefault,
},
Status: api.PodStatus{
Phase: api.PodSucceeded,
Message: "bar",
},
}
client := &mockRecyclerClient{
pod: oldRecycler,
}
err := internalRecycleVolumeByWatchingPodUntilCompletion("pv-name", newRecycler, client)
if err == nil {
t.Fatalf("Expected pod failure but got nil error returned")
}
// Check the recycler failed with "foo" error message, i.e. it was the
// old recycler that finished and not the new one.
if err != nil {
if !strings.Contains(err.Error(), "foo") {
t.Errorf("Expected pod.Status.Message %s but got %s", oldRecycler.Status.Message, err)
}
}
if !client.deletedCalled {
t.Errorf("Expected deferred client.Delete to be called on recycler pod")
}
}
type mockRecyclerClient struct {
pod *api.Pod
deletedCalled bool
}
func (c *mockRecyclerClient) CreatePod(pod *api.Pod) (*api.Pod, error) {
if c.pod == nil {
c.pod = pod
return c.pod, nil
}
// Simulate "already exists" error
return nil, errors.NewAlreadyExists(api.Resource("pods"), pod.Name)
}
func (c *mockRecyclerClient) GetPod(name, namespace string) (*api.Pod, error) {
if c.pod != nil {
@ -96,7 +148,7 @@ func (c *mockRecyclerClient) DeletePod(name, namespace string) error {
return nil
}
func (c *mockRecyclerClient) WatchPod(name, namespace, resourceVersion string, stopChannel chan struct{}) func() *api.Pod {
func (c *mockRecyclerClient) WatchPod(name, namespace string, stopChannel chan struct{}) func() *api.Pod {
return func() *api.Pod {
return c.pod
}