diff --git a/pkg/controller/volume/persistentvolume/controller.go b/pkg/controller/volume/persistentvolume/controller.go index 878ce16794..3cb7815c20 100644 --- a/pkg/controller/volume/persistentvolume/controller.go +++ b/pkg/controller/volume/persistentvolume/controller.go @@ -913,7 +913,6 @@ func (ctrl *PersistentVolumeController) unbindVolume(volume *api.PersistentVolum // Update the status _, err = ctrl.updateVolumePhase(newVol, api.VolumeAvailable, "") return err - } // reclaimVolume implements volume.Spec.PersistentVolumeReclaimPolicy and @@ -996,7 +995,8 @@ func (ctrl *PersistentVolumeController) recycleVolumeOperation(arg interface{}) } // Plugin found - recycler, err := plugin.NewRecycler(volume.Name, spec) + recorder := ctrl.newRecyclerEventRecorder(volume) + recycler, err := plugin.NewRecycler(volume.Name, spec, recorder) if err != nil { // Cannot create recycler strerr := fmt.Sprintf("Failed to create recycler: %v", err) @@ -1024,6 +1024,8 @@ func (ctrl *PersistentVolumeController) recycleVolumeOperation(arg interface{}) } glog.V(2).Infof("volume %q recycled", volume.Name) + // Send an event + ctrl.eventRecorder.Event(volume, api.EventTypeNormal, "VolumeRecycled", "Volume recycled") // Make the volume available again if err = ctrl.unbindVolume(volume); err != nil { // Oops, could not save the volume and therefore the controller will @@ -1381,6 +1383,14 @@ func (ctrl *PersistentVolumeController) scheduleOperation(operationName string, } } +// newRecyclerEventRecorder returns a RecycleEventRecorder that sends all events +// to given volume. +func (ctrl *PersistentVolumeController) newRecyclerEventRecorder(volume *api.PersistentVolume) vol.RecycleEventRecorder { + return func(eventtype, message string) { + ctrl.eventRecorder.Eventf(volume, eventtype, "RecyclerPod", "Recycler pod: %s", message) + } +} + // findProvisionablePlugin finds a provisioner plugin for a given claim. // It returns either the provisioning plugin or nil when an external // provisioner is requested. diff --git a/pkg/controller/volume/persistentvolume/framework_test.go b/pkg/controller/volume/persistentvolume/framework_test.go index 6430b311b6..fe7daea5ba 100644 --- a/pkg/controller/volume/persistentvolume/framework_test.go +++ b/pkg/controller/volume/persistentvolume/framework_test.go @@ -1205,7 +1205,7 @@ func (plugin *mockVolumePlugin) GetMetrics() (*vol.Metrics, error) { // Recycler interfaces -func (plugin *mockVolumePlugin) NewRecycler(pvName string, spec *vol.Spec) (vol.Recycler, error) { +func (plugin *mockVolumePlugin) NewRecycler(pvName string, spec *vol.Spec, eventRecorder vol.RecycleEventRecorder) (vol.Recycler, error) { if len(plugin.recycleCalls) > 0 { // mockVolumePlugin directly implements Recycler interface glog.V(4).Infof("mock plugin NewRecycler called, returning mock recycler") diff --git a/pkg/volume/host_path/host_path.go b/pkg/volume/host_path/host_path.go index 40a109a08a..28de189136 100644 --- a/pkg/volume/host_path/host_path.go +++ b/pkg/volume/host_path/host_path.go @@ -46,7 +46,7 @@ func ProbeVolumePlugins(volumeConfig volume.VolumeConfig) []volume.VolumePlugin type hostPathPlugin struct { host volume.VolumeHost // decouple creating Recyclers/Deleters/Provisioners by deferring to a function. Allows for easier testing. - newRecyclerFunc func(pvName string, spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error) + newRecyclerFunc func(pvName string, spec *volume.Spec, eventRecorder volume.RecycleEventRecorder, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error) newDeleterFunc func(spec *volume.Spec, host volume.VolumeHost) (volume.Deleter, error) newProvisionerFunc func(options volume.VolumeOptions, host volume.VolumeHost) (volume.Provisioner, error) config volume.VolumeConfig @@ -112,8 +112,8 @@ func (plugin *hostPathPlugin) NewUnmounter(volName string, podUID types.UID) (vo }}, nil } -func (plugin *hostPathPlugin) NewRecycler(pvName string, spec *volume.Spec) (volume.Recycler, error) { - return plugin.newRecyclerFunc(pvName, spec, plugin.host, plugin.config) +func (plugin *hostPathPlugin) NewRecycler(pvName string, spec *volume.Spec, eventRecorder volume.RecycleEventRecorder) (volume.Recycler, error) { + return plugin.newRecyclerFunc(pvName, spec, eventRecorder, plugin.host, plugin.config) } func (plugin *hostPathPlugin) NewDeleter(spec *volume.Spec) (volume.Deleter, error) { @@ -142,18 +142,19 @@ func (plugin *hostPathPlugin) ConstructVolumeSpec(volumeName, mountPath string) return volume.NewSpecFromVolume(hostPathVolume), nil } -func newRecycler(pvName string, spec *volume.Spec, host volume.VolumeHost, config volume.VolumeConfig) (volume.Recycler, error) { +func newRecycler(pvName string, spec *volume.Spec, eventRecorder volume.RecycleEventRecorder, host volume.VolumeHost, config volume.VolumeConfig) (volume.Recycler, error) { if spec.PersistentVolume == nil || spec.PersistentVolume.Spec.HostPath == nil { return nil, fmt.Errorf("spec.PersistentVolumeSource.HostPath is nil") } path := spec.PersistentVolume.Spec.HostPath.Path return &hostPathRecycler{ - name: spec.Name(), - path: path, - host: host, - config: config, - timeout: volume.CalculateTimeoutForVolume(config.RecyclerMinimumTimeout, config.RecyclerTimeoutIncrement, spec.PersistentVolume), - pvName: pvName, + name: spec.Name(), + path: path, + host: host, + config: config, + timeout: volume.CalculateTimeoutForVolume(config.RecyclerMinimumTimeout, config.RecyclerTimeoutIncrement, spec.PersistentVolume), + pvName: pvName, + eventRecorder: eventRecorder, }, nil } @@ -234,7 +235,8 @@ type hostPathRecycler struct { config volume.VolumeConfig timeout int64 volume.MetricsNil - pvName string + pvName string + eventRecorder volume.RecycleEventRecorder } func (r *hostPathRecycler) GetPath() string { @@ -253,7 +255,7 @@ func (r *hostPathRecycler) Recycle() error { Path: r.path, }, } - return volume.RecycleVolumeByWatchingPodUntilCompletion(r.pvName, pod, r.host.GetKubeClient()) + return volume.RecycleVolumeByWatchingPodUntilCompletion(r.pvName, pod, r.host.GetKubeClient(), r.eventRecorder) } // hostPathProvisioner implements a Provisioner for the HostPath plugin diff --git a/pkg/volume/host_path/host_path_test.go b/pkg/volume/host_path/host_path_test.go index acf07bc750..cf352829df 100644 --- a/pkg/volume/host_path/host_path_test.go +++ b/pkg/volume/host_path/host_path_test.go @@ -78,7 +78,7 @@ func TestRecycler(t *testing.T) { if err != nil { t.Errorf("Can't find the plugin by name") } - recycler, err := plug.NewRecycler("pv-name", spec) + recycler, err := plug.NewRecycler("pv-name", spec, nil) if err != nil { t.Errorf("Failed to make a new Recyler: %v", err) } diff --git a/pkg/volume/nfs/nfs.go b/pkg/volume/nfs/nfs.go index 4ca95781b6..a7eb34a000 100644 --- a/pkg/volume/nfs/nfs.go +++ b/pkg/volume/nfs/nfs.go @@ -46,7 +46,7 @@ func ProbeVolumePlugins(volumeConfig volume.VolumeConfig) []volume.VolumePlugin type nfsPlugin struct { host volume.VolumeHost // decouple creating recyclers by deferring to a function. Allows for easier testing. - newRecyclerFunc func(pvName string, spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error) + newRecyclerFunc func(pvName string, spec *volume.Spec, eventRecorder volume.RecycleEventRecorder, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error) config volume.VolumeConfig } @@ -132,8 +132,8 @@ func (plugin *nfsPlugin) newUnmounterInternal(volName string, podUID types.UID, }}, nil } -func (plugin *nfsPlugin) NewRecycler(pvName string, spec *volume.Spec) (volume.Recycler, error) { - return plugin.newRecyclerFunc(pvName, spec, plugin.host, plugin.config) +func (plugin *nfsPlugin) NewRecycler(pvName string, spec *volume.Spec, eventRecorder volume.RecycleEventRecorder) (volume.Recycler, error) { + return plugin.newRecyclerFunc(pvName, spec, eventRecorder, plugin.host, plugin.config) } func (plugin *nfsPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) { @@ -274,18 +274,19 @@ func (c *nfsUnmounter) TearDownAt(dir string) error { return nil } -func newRecycler(pvName string, spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error) { +func newRecycler(pvName string, spec *volume.Spec, eventRecorder volume.RecycleEventRecorder, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error) { if spec.PersistentVolume == nil || spec.PersistentVolume.Spec.NFS == nil { return nil, fmt.Errorf("spec.PersistentVolumeSource.NFS is nil") } return &nfsRecycler{ - name: spec.Name(), - server: spec.PersistentVolume.Spec.NFS.Server, - path: spec.PersistentVolume.Spec.NFS.Path, - host: host, - config: volumeConfig, - timeout: volume.CalculateTimeoutForVolume(volumeConfig.RecyclerMinimumTimeout, volumeConfig.RecyclerTimeoutIncrement, spec.PersistentVolume), - pvName: pvName, + name: spec.Name(), + server: spec.PersistentVolume.Spec.NFS.Server, + path: spec.PersistentVolume.Spec.NFS.Path, + host: host, + config: volumeConfig, + timeout: volume.CalculateTimeoutForVolume(volumeConfig.RecyclerMinimumTimeout, volumeConfig.RecyclerTimeoutIncrement, spec.PersistentVolume), + pvName: pvName, + eventRecorder: eventRecorder, }, nil } @@ -298,7 +299,8 @@ type nfsRecycler struct { config volume.VolumeConfig timeout int64 volume.MetricsNil - pvName string + pvName string + eventRecorder volume.RecycleEventRecorder } func (r *nfsRecycler) GetPath() string { @@ -318,7 +320,7 @@ func (r *nfsRecycler) Recycle() error { Path: r.path, }, } - return volume.RecycleVolumeByWatchingPodUntilCompletion(r.pvName, pod, r.host.GetKubeClient()) + return volume.RecycleVolumeByWatchingPodUntilCompletion(r.pvName, pod, r.host.GetKubeClient(), r.eventRecorder) } func getVolumeSource(spec *volume.Spec) (*api.NFSVolumeSource, bool, error) { diff --git a/pkg/volume/nfs/nfs_test.go b/pkg/volume/nfs/nfs_test.go index b140e406c3..204b9920c8 100644 --- a/pkg/volume/nfs/nfs_test.go +++ b/pkg/volume/nfs/nfs_test.go @@ -91,7 +91,7 @@ func TestRecycler(t *testing.T) { if err != nil { t.Errorf("Can't find the plugin by name") } - recycler, err := plug.NewRecycler("pv-name", spec) + recycler, err := plug.NewRecycler("pv-name", spec, nil) if err != nil { t.Errorf("Failed to make a new Recyler: %v", err) } @@ -103,7 +103,7 @@ func TestRecycler(t *testing.T) { } } -func newMockRecycler(pvName string, spec *volume.Spec, host volume.VolumeHost, config volume.VolumeConfig) (volume.Recycler, error) { +func newMockRecycler(pvName string, spec *volume.Spec, eventRecorder volume.RecycleEventRecorder, host volume.VolumeHost, config volume.VolumeConfig) (volume.Recycler, error) { return &mockRecycler{ path: spec.PersistentVolume.Spec.NFS.Path, }, nil diff --git a/pkg/volume/plugins.go b/pkg/volume/plugins.go index 8dd1b6a2e2..6e87cd225f 100644 --- a/pkg/volume/plugins.go +++ b/pkg/volume/plugins.go @@ -124,9 +124,12 @@ type PersistentVolumePlugin interface { // again to new claims type RecyclableVolumePlugin interface { VolumePlugin - // NewRecycler creates a new volume.Recycler which knows how to reclaim - // this resource after the volume's release from a PersistentVolumeClaim - NewRecycler(pvName string, spec *Spec) (Recycler, error) + // NewRecycler creates a new volume.Recycler which knows how to reclaim this + // resource after the volume's release from a PersistentVolumeClaim. The + // recycler will use the provided recorder to write any events that might be + // interesting to user. It's expected that caller will pass these events to + // the PV being recycled. + NewRecycler(pvName string, spec *Spec, eventRecorder RecycleEventRecorder) (Recycler, error) } // DeletableVolumePlugin is an extended interface of VolumePlugin and is used diff --git a/pkg/volume/testing/testing.go b/pkg/volume/testing/testing.go index 375c1a0bee..f861ab33b1 100644 --- a/pkg/volume/testing/testing.go +++ b/pkg/volume/testing/testing.go @@ -273,7 +273,7 @@ func (plugin *FakeVolumePlugin) GetNewDetacherCallCount() int { return plugin.NewDetacherCallCount } -func (plugin *FakeVolumePlugin) NewRecycler(pvName string, spec *Spec) (Recycler, error) { +func (plugin *FakeVolumePlugin) NewRecycler(pvName string, spec *Spec, eventRecorder RecycleEventRecorder) (Recycler, error) { return &fakeRecycler{"/attributesTransferredFromSpec", MetricsNil{}}, nil } @@ -457,7 +457,7 @@ func (fr *fakeRecycler) GetPath() string { return fr.path } -func NewFakeRecycler(pvName string, spec *Spec, host VolumeHost, config VolumeConfig) (Recycler, error) { +func NewFakeRecycler(pvName string, spec *Spec, eventRecorder RecycleEventRecorder, host VolumeHost, config VolumeConfig) (Recycler, error) { if spec.PersistentVolume == nil || spec.PersistentVolume.Spec.HostPath == nil { return nil, fmt.Errorf("fakeRecycler only supports spec.PersistentVolume.Spec.HostPath") } diff --git a/pkg/volume/util.go b/pkg/volume/util.go index 19a3afae9c..21d222f3bf 100644 --- a/pkg/volume/util.go +++ b/pkg/volume/util.go @@ -19,13 +19,10 @@ package volume import ( "fmt" "reflect" - "time" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/client/cache" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/fields" - "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/watch" "hash/fnv" @@ -39,6 +36,8 @@ import ( "k8s.io/kubernetes/pkg/util/sets" ) +type RecycleEventRecorder func(eventtype, message string) + // RecycleVolumeByWatchingPodUntilCompletion is intended for use with volume // Recyclers. This function will save the given Pod to the API and watch it // until it completes, fails, or the pod's ActiveDeadlineSeconds is exceeded, @@ -52,8 +51,8 @@ import ( // pod - the pod designed by a volume plugin to recycle the volume. pod.Name // will be overwritten with unique name based on PV.Name. // client - kube client for API operations. -func RecycleVolumeByWatchingPodUntilCompletion(pvName string, pod *api.Pod, kubeClient clientset.Interface) error { - return internalRecycleVolumeByWatchingPodUntilCompletion(pvName, pod, newRecyclerClient(kubeClient)) +func RecycleVolumeByWatchingPodUntilCompletion(pvName string, pod *api.Pod, kubeClient clientset.Interface, recorder RecycleEventRecorder) error { + return internalRecycleVolumeByWatchingPodUntilCompletion(pvName, pod, newRecyclerClient(kubeClient, recorder)) } // same as above func comments, except 'recyclerClient' is a narrower pod API @@ -67,34 +66,61 @@ func internalRecycleVolumeByWatchingPodUntilCompletion(pvName string, pod *api.P pod.Name = "recycler-for-" + pvName pod.GenerateName = "" + stopChannel := make(chan struct{}) + defer close(stopChannel) + podCh, err := recyclerClient.WatchPod(pod.Name, pod.Namespace, stopChannel) + if err != nil { + glog.V(4).Infof("cannot start watcher for pod %s/%s: %v", pod.Namespace, pod.Name, err) + return err + } + // Start the pod - _, err := recyclerClient.CreatePod(pod) + _, err = recyclerClient.CreatePod(pod) if err != nil { if errors.IsAlreadyExists(err) { glog.V(5).Infof("old recycler pod %q found for volume", pod.Name) } else { - return fmt.Errorf("Unexpected error creating recycler pod: %+v\n", err) + return fmt.Errorf("unexpected error creating recycler pod: %+v\n", err) } } defer recyclerClient.DeletePod(pod.Name, pod.Namespace) - // Now only the old pod or the new pod run. Watch it until it finishes. - stopChannel := make(chan struct{}) - defer close(stopChannel) - nextPod := recyclerClient.WatchPod(pod.Name, pod.Namespace, stopChannel) - + // Now only the old pod or the new pod run. Watch it until it finishes + // and send all events on the pod to the PV for { - watchedPod := nextPod() - if watchedPod.Status.Phase == api.PodSucceeded { - // volume.Recycle() returns nil on success, else error - return nil - } - if watchedPod.Status.Phase == api.PodFailed { - // volume.Recycle() returns nil on success, else error - if watchedPod.Status.Message != "" { - return fmt.Errorf(watchedPod.Status.Message) - } else { - return fmt.Errorf("pod failed, pod.Status.Message unknown.") + event := <-podCh + switch event.Object.(type) { + case *api.Pod: + // POD changed + pod := event.Object.(*api.Pod) + glog.V(4).Infof("recycler pod update received: %s %s/%s %s", event.Type, pod.Namespace, pod.Name, pod.Status.Phase) + switch event.Type { + case watch.Added, watch.Modified: + if pod.Status.Phase == api.PodSucceeded { + // Recycle succeeded. + return nil + } + if pod.Status.Phase == api.PodFailed { + if pod.Status.Message != "" { + return fmt.Errorf(pod.Status.Message) + } else { + return fmt.Errorf("pod failed, pod.Status.Message unknown.") + } + } + + case watch.Deleted: + return fmt.Errorf("recycler pod was deleted") + + case watch.Error: + return fmt.Errorf("recycler pod watcher failed") + } + + case *api.Event: + // Event received + podEvent := event.Object.(*api.Event) + glog.V(4).Infof("recycler event received: %s %s/%s %s/%s %s", event.Type, podEvent.Namespace, podEvent.Name, podEvent.InvolvedObject.Namespace, podEvent.InvolvedObject.Name, podEvent.Message) + if event.Type == watch.Added { + recyclerClient.Event(podEvent.Type, podEvent.Message) } } } @@ -106,15 +132,24 @@ type recyclerClient interface { CreatePod(pod *api.Pod) (*api.Pod, error) GetPod(name, namespace string) (*api.Pod, error) DeletePod(name, namespace string) error - WatchPod(name, namespace string, stopChannel chan struct{}) func() *api.Pod + // WatchPod returns a ListWatch for watching a pod. The stopChannel is used + // to close the reflector backing the watch. The caller is responsible for + // derring a close on the channel to stop the reflector. + WatchPod(name, namespace string, stopChannel chan struct{}) (<-chan watch.Event, error) + // Event sends an event to the volume that is being recycled. + Event(eventtype, message string) } -func newRecyclerClient(client clientset.Interface) recyclerClient { - return &realRecyclerClient{client} +func newRecyclerClient(client clientset.Interface, recorder RecycleEventRecorder) recyclerClient { + return &realRecyclerClient{ + client, + recorder, + } } type realRecyclerClient struct { - client clientset.Interface + client clientset.Interface + recorder RecycleEventRecorder } func (c *realRecyclerClient) CreatePod(pod *api.Pod) (*api.Pod, error) { @@ -129,28 +164,60 @@ func (c *realRecyclerClient) DeletePod(name, namespace string) error { return c.client.Core().Pods(namespace).Delete(name, nil) } -// WatchPod returns a ListWatch for watching a pod. The stopChannel is used -// to close the reflector backing the watch. The caller is responsible for -// derring a close on the channel to stop the reflector. -func (c *realRecyclerClient) WatchPod(name, namespace string, stopChannel chan struct{}) func() *api.Pod { - fieldSelector, _ := fields.ParseSelector("metadata.name=" + name) +func (c *realRecyclerClient) Event(eventtype, message string) { + c.recorder(eventtype, message) +} - podLW := &cache.ListWatch{ - ListFunc: func(options api.ListOptions) (runtime.Object, error) { - options.FieldSelector = fieldSelector - return c.client.Core().Pods(namespace).List(options) - }, - WatchFunc: func(options api.ListOptions) (watch.Interface, error) { - options.FieldSelector = fieldSelector - return c.client.Core().Pods(namespace).Watch(options) - }, +func (c *realRecyclerClient) WatchPod(name, namespace string, stopChannel chan struct{}) (<-chan watch.Event, error) { + podSelector, _ := fields.ParseSelector("metadata.name=" + name) + options := api.ListOptions{ + FieldSelector: podSelector, + Watch: true, } - queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc) - cache.NewReflector(podLW, &api.Pod{}, queue, 1*time.Minute).RunUntil(stopChannel) - return func() *api.Pod { - return cache.Pop(queue).(*api.Pod) + podWatch, err := c.client.Core().Pods(namespace).Watch(options) + if err != nil { + return nil, err } + + eventSelector, _ := fields.ParseSelector("involvedObject.name=" + name) + eventWatch, err := c.client.Core().Events(namespace).Watch(api.ListOptions{ + FieldSelector: eventSelector, + Watch: true, + }) + if err != nil { + podWatch.Stop() + return nil, err + } + + eventCh := make(chan watch.Event, 0) + + go func() { + defer eventWatch.Stop() + defer podWatch.Stop() + defer close(eventCh) + + for { + select { + case _ = <-stopChannel: + return + + case podEvent, ok := <-podWatch.ResultChan(): + if !ok { + return + } + eventCh <- podEvent + + case eventEvent, ok := <-eventWatch.ResultChan(): + if !ok { + return + } + eventCh <- eventEvent + } + } + }() + + return eventCh, nil } // CalculateTimeoutForVolume calculates time for a Recycler pod to complete a diff --git a/pkg/volume/util_test.go b/pkg/volume/util_test.go index e992705b24..4fd71f373e 100644 --- a/pkg/volume/util_test.go +++ b/pkg/volume/util_test.go @@ -24,106 +24,195 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/resource" + "k8s.io/kubernetes/pkg/watch" ) -func TestRecyclerSuccess(t *testing.T) { - client := &mockRecyclerClient{} - recycler := &api.Pod{ - ObjectMeta: api.ObjectMeta{ - Namespace: api.NamespaceDefault, - }, - Status: api.PodStatus{ - Phase: api.PodSucceeded, - }, - } +type testcase struct { + // Input of the test + name string + existingPod *api.Pod + createPod *api.Pod + // eventSequence is list of events that are simulated during recycling. It + // can be either event generated by a recycler pod or a state change of + // the pod. (see newPodEvent and newEvent below). + eventSequence []watch.Event - err := internalRecycleVolumeByWatchingPodUntilCompletion("pv-name", recycler, client) - if err != nil { - t.Errorf("Unexpected error watching recycler pod: %+v", err) - } - if !client.deletedCalled { - t.Errorf("Expected deferred client.Delete to be called on recycler pod") + // Expected output. + // expectedEvents is list of events that were sent to the volume that was + // recycled. + expectedEvents []mockEvent + expectedError string +} + +func newPodEvent(eventtype watch.EventType, name string, phase api.PodPhase, message string) watch.Event { + return watch.Event{ + Type: eventtype, + Object: newPod(name, phase, message), } } -func TestRecyclerFailure(t *testing.T) { - client := &mockRecyclerClient{} - recycler := &api.Pod{ - ObjectMeta: api.ObjectMeta{ - Namespace: api.NamespaceDefault, +func newEvent(eventtype, message string) watch.Event { + return watch.Event{ + Type: watch.Added, + Object: &api.Event{ + ObjectMeta: api.ObjectMeta{ + Namespace: api.NamespaceDefault, + }, + Reason: "MockEvent", + Message: message, + Type: eventtype, }, - Status: api.PodStatus{ - Phase: api.PodFailed, - Message: "foo", - }, - } - - err := internalRecycleVolumeByWatchingPodUntilCompletion("pv-name", recycler, client) - if err == nil { - t.Fatalf("Expected pod failure but got nil error returned") - } - if err != nil { - if !strings.Contains(err.Error(), "foo") { - t.Errorf("Expected pod.Status.Message %s but got %s", recycler.Status.Message, err) - } - } - if !client.deletedCalled { - t.Errorf("Expected deferred client.Delete to be called on recycler pod") } } -func TestRecyclerAlreadyExists(t *testing.T) { - // Test that internalRecycleVolumeByWatchingPodUntilCompletion does not - // start a new recycler when an old one is already running. - - // Old recycler is running and fails with "foo" error message - oldRecycler := &api.Pod{ +func newPod(name string, phase api.PodPhase, message string) *api.Pod { + return &api.Pod{ ObjectMeta: api.ObjectMeta{ - Name: "recycler-test", Namespace: api.NamespaceDefault, + Name: name, }, Status: api.PodStatus{ - Phase: api.PodFailed, - Message: "foo", + Phase: phase, + Message: message, + }, + } +} + +func TestRecyclerPod(t *testing.T) { + tests := []testcase{ + { + // Test recycler success with some events + name: "RecyclerSuccess", + createPod: newPod("podRecyclerSuccess", api.PodPending, ""), + eventSequence: []watch.Event{ + // Pod gets Running and Succeeded + newPodEvent(watch.Added, "podRecyclerSuccess", api.PodPending, ""), + newEvent(api.EventTypeNormal, "Successfully assigned recycler-for-podRecyclerSuccess to 127.0.0.1"), + newEvent(api.EventTypeNormal, "pulling image \"gcr.io/google_containers/busybox\""), + newEvent(api.EventTypeNormal, "Successfully pulled image \"gcr.io/google_containers/busybox\""), + newEvent(api.EventTypeNormal, "Created container with docker id 83d929aeac82"), + newEvent(api.EventTypeNormal, "Started container with docker id 83d929aeac82"), + newPodEvent(watch.Modified, "podRecyclerSuccess", api.PodRunning, ""), + newPodEvent(watch.Modified, "podRecyclerSuccess", api.PodSucceeded, ""), + }, + expectedEvents: []mockEvent{ + {api.EventTypeNormal, "Successfully assigned recycler-for-podRecyclerSuccess to 127.0.0.1"}, + {api.EventTypeNormal, "pulling image \"gcr.io/google_containers/busybox\""}, + {api.EventTypeNormal, "Successfully pulled image \"gcr.io/google_containers/busybox\""}, + {api.EventTypeNormal, "Created container with docker id 83d929aeac82"}, + {api.EventTypeNormal, "Started container with docker id 83d929aeac82"}, + }, + expectedError: "", + }, + { + // Test recycler failure with some events + name: "RecyclerFailure", + createPod: newPod("podRecyclerFailure", api.PodPending, ""), + eventSequence: []watch.Event{ + // Pod gets Running and Succeeded + newPodEvent(watch.Added, "podRecyclerFailure", api.PodPending, ""), + newEvent(api.EventTypeNormal, "Successfully assigned recycler-for-podRecyclerFailure to 127.0.0.1"), + newEvent(api.EventTypeWarning, "Unable to mount volumes for pod \"recycler-for-podRecyclerFailure_default(3c9809e5-347c-11e6-a79b-3c970e965218)\": timeout expired waiting for volumes to attach/mount"), + newEvent(api.EventTypeWarning, "Error syncing pod, skipping: timeout expired waiting for volumes to attach/mount for pod \"recycler-for-podRecyclerFailure\"/\"default\". list of unattached/unmounted"), + newPodEvent(watch.Modified, "podRecyclerFailure", api.PodRunning, ""), + newPodEvent(watch.Modified, "podRecyclerFailure", api.PodFailed, "Pod was active on the node longer than specified deadline"), + }, + expectedEvents: []mockEvent{ + {api.EventTypeNormal, "Successfully assigned recycler-for-podRecyclerFailure to 127.0.0.1"}, + {api.EventTypeWarning, "Unable to mount volumes for pod \"recycler-for-podRecyclerFailure_default(3c9809e5-347c-11e6-a79b-3c970e965218)\": timeout expired waiting for volumes to attach/mount"}, + {api.EventTypeWarning, "Error syncing pod, skipping: timeout expired waiting for volumes to attach/mount for pod \"recycler-for-podRecyclerFailure\"/\"default\". list of unattached/unmounted"}, + }, + expectedError: "Pod was active on the node longer than specified deadline", + }, + { + // Recycler pod gets deleted + name: "RecyclerDeleted", + createPod: newPod("podRecyclerDeleted", api.PodPending, ""), + eventSequence: []watch.Event{ + // Pod gets Running and Succeeded + newPodEvent(watch.Added, "podRecyclerDeleted", api.PodPending, ""), + newEvent(api.EventTypeNormal, "Successfully assigned recycler-for-podRecyclerDeleted to 127.0.0.1"), + newPodEvent(watch.Deleted, "podRecyclerDeleted", api.PodPending, ""), + }, + expectedEvents: []mockEvent{ + {api.EventTypeNormal, "Successfully assigned recycler-for-podRecyclerDeleted to 127.0.0.1"}, + }, + expectedError: "recycler pod was deleted", + }, + { + // Another recycler pod is already running + name: "RecyclerRunning", + existingPod: newPod("podOldRecycler", api.PodRunning, ""), + createPod: newPod("podNewRecycler", api.PodFailed, "mock message"), + eventSequence: []watch.Event{ + // Old pod succeeds + newPodEvent(watch.Modified, "podOldRecycler", api.PodSucceeded, ""), + }, + // No error = old pod succeeded. If the new pod was used, there + // would be error with "mock message". + expectedError: "", + }, + { + // Another recycler pod is already running and fails + name: "FailedRecyclerRunning", + existingPod: newPod("podOldRecycler", api.PodRunning, ""), + createPod: newPod("podNewRecycler", api.PodFailed, "mock message"), + eventSequence: []watch.Event{ + // Old pod failure + newPodEvent(watch.Modified, "podOldRecycler", api.PodFailed, "Pod was active on the node longer than specified deadline"), + }, + // If the new pod was used, there would be error with "mock message". + expectedError: "Pod was active on the node longer than specified deadline", }, } - // New recycler _would_ succeed if it was run - newRecycler := &api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: "recycler-test", - Namespace: api.NamespaceDefault, - }, - Status: api.PodStatus{ - Phase: api.PodSucceeded, - Message: "bar", - }, - } - - client := &mockRecyclerClient{ - pod: oldRecycler, - } - - err := internalRecycleVolumeByWatchingPodUntilCompletion("pv-name", newRecycler, client) - if err == nil { - t.Fatalf("Expected pod failure but got nil error returned") - } - - // Check the recycler failed with "foo" error message, i.e. it was the - // old recycler that finished and not the new one. - if err != nil { - if !strings.Contains(err.Error(), "foo") { - t.Errorf("Expected pod.Status.Message %s but got %s", oldRecycler.Status.Message, err) + for _, test := range tests { + t.Logf("Test %q", test.name) + client := &mockRecyclerClient{ + events: test.eventSequence, + pod: test.existingPod, + } + err := internalRecycleVolumeByWatchingPodUntilCompletion(test.createPod.Name, test.createPod, client) + receivedError := "" + if err != nil { + receivedError = err.Error() + } + if receivedError != test.expectedError { + t.Errorf("Test %q failed, expected error %q, got %q", test.name, test.expectedError, receivedError) + continue + } + if !client.deletedCalled { + t.Errorf("Test %q failed, expected deferred client.Delete to be called on recycler pod", test.name) + continue + } + for i, expectedEvent := range test.expectedEvents { + if len(client.receivedEvents) <= i { + t.Errorf("Test %q failed, expected event %d: %q not received", test.name, i, expectedEvent.message) + continue + } + receivedEvent := client.receivedEvents[i] + if expectedEvent.eventtype != receivedEvent.eventtype { + t.Errorf("Test %q failed, event %d does not match: expected eventtype %q, got %q", test.name, i, expectedEvent.eventtype, receivedEvent.eventtype) + } + if expectedEvent.message != receivedEvent.message { + t.Errorf("Test %q failed, event %d does not match: expected message %q, got %q", test.name, i, expectedEvent.message, receivedEvent.message) + } + } + for i := len(test.expectedEvents); i < len(client.receivedEvents); i++ { + t.Errorf("Test %q failed, unexpected event received: %s, %q", test.name, client.receivedEvents[i].eventtype, client.receivedEvents[i].message) } - } - if !client.deletedCalled { - t.Errorf("Expected deferred client.Delete to be called on recycler pod") } } type mockRecyclerClient struct { - pod *api.Pod - deletedCalled bool + pod *api.Pod + deletedCalled bool + receivedEvents []mockEvent + events []watch.Event +} + +type mockEvent struct { + eventtype, message string } func (c *mockRecyclerClient) CreatePod(pod *api.Pod) (*api.Pod, error) { @@ -148,10 +237,18 @@ func (c *mockRecyclerClient) DeletePod(name, namespace string) error { return nil } -func (c *mockRecyclerClient) WatchPod(name, namespace string, stopChannel chan struct{}) func() *api.Pod { - return func() *api.Pod { - return c.pod - } +func (c *mockRecyclerClient) WatchPod(name, namespace string, stopChannel chan struct{}) (<-chan watch.Event, error) { + eventCh := make(chan watch.Event, 0) + go func() { + for _, e := range c.events { + eventCh <- e + } + }() + return eventCh, nil +} + +func (c *mockRecyclerClient) Event(eventtype, message string) { + c.receivedEvents = append(c.receivedEvents, mockEvent{eventtype, message}) } func TestCalculateTimeoutForVolume(t *testing.T) {