mirror of https://github.com/k3s-io/k3s
Send recycle events from pod to pv.
This allows users to diagnose what's wrong with recycler. Recycler pods are started automatically with a cryptic name and they are deleted immediately when they finish. kubectl describe pods will show: FirstSeen LastSeen Count From SubobjectPath Type Reason Message --------- -------- ----- ---- ------------- -------- ------ ------- 59m 59m 1 {persistentvolume-controller } Warning RecyclerPod Recycler pod: Unable to mount volumes for pod "recycler-for-nfs_default(5421800e-347b-11e6-a79b-3c970e965218)": timeout expired waiting for volumes to attach/mount for pod "recycler-for-nfs"/"default". list of unattached/unmounted volumes=[vol] 53m 53m 1 {persistentvolume-controller } Warning RecyclerPod Recycler pod: Unable to mount volumes for pod "recycler-for-nfs_default(3c9809e5-347c-11e6-a79b-3c970e965218)": timeout expired waiting for volumes to attach/mount for pod "recycler-for-nfs"/"default". list of unattached/unmounted volumes=[vol] 46m 46m 1 {persistentvolume-controller } Warning RecyclerPod Recycler pod: Unable to mount volumes for pod "recycler-for-nfs_default(250dd2a2-347d-11e6-a79b-3c970e965218)": timeout expired waiting for volumes to attach/mount for pod "recycler-for-nfs"/"default". list of unattached/unmounted volumes=[vol] 40m 40m 1 {persistentvolume-controller } Warning RecyclerPod Recycler pod: Unable to mount volumes for pod "recycler-for-nfs_default(0d84ea33-347e-11e6-a79b-3c970e965218)": timeout expired waiting for volumes to attach/mount for pod "recycler-for-nfs"/"default". list of unattached/unmounted volumes=[vol] 33m 33m 1 {persistentvolume-controller } Warning RecyclerPod Recycler pod: Unable to mount volumes for pod "recycler-for-nfs_default(f5fb63bf-347e-11e6-a79b-3c970e965218)": timeout expired waiting for volumes to attach/mount for pod "recycler-for-nfs"/"default". list of unattached/unmounted volumes=[vol] 27m 27m 1 {persistentvolume-controller } Warning RecyclerPod Recycler pod: Unable to mount volumes for pod "recycler-for-nfs_default(de7128fd-347f-11e6-a79b-3c970e965218)": timeout expired waiting for volumes to attach/mount for pod "recycler-for-nfs"/"default". list of unattached/unmounted volumes=[vol] 1h 3m 75 {persistentvolume-controller } Normal RecyclerPod Recycler pod: Successfully assigned recycler-for-nfs to 127.0.0.1 1h 3m 76 {persistentvolume-controller } Normal RecyclerPod Recycler pod: Pod was active on the node longer than specified deadline 1h 1m 12 {persistentvolume-controller } Warning RecyclerPod Recycler pod: Error syncing pod, skipping: timeout expired waiting for volumes to attach/mount for pod "recycler-for-nfs"/"default". list of unattached/unmounted volumes=[vol] 20m 1m 4 {persistentvolume-controller } Warning RecyclerPod (events with common reason combined) These steps were necessary: - added event watcher to volume.RecycleVolumeByWatchingPodUntilCompletion - pass all these events through volume plugins to volume controller - rework volume.RecycleVolumeByWatchingPodUntilCompletion unit tests to a table (too much copy-paste) - fix all unit tests along the waypull/6/head
parent
bf4e9e9db8
commit
d7111b282f
|
@ -914,7 +914,6 @@ func (ctrl *PersistentVolumeController) unbindVolume(volume *api.PersistentVolum
|
|||
// Update the status
|
||||
_, err = ctrl.updateVolumePhase(newVol, api.VolumeAvailable, "")
|
||||
return err
|
||||
|
||||
}
|
||||
|
||||
// reclaimVolume implements volume.Spec.PersistentVolumeReclaimPolicy and
|
||||
|
@ -997,7 +996,8 @@ func (ctrl *PersistentVolumeController) recycleVolumeOperation(arg interface{})
|
|||
}
|
||||
|
||||
// Plugin found
|
||||
recycler, err := plugin.NewRecycler(volume.Name, spec)
|
||||
recorder := ctrl.newRecyclerEventRecorder(volume)
|
||||
recycler, err := plugin.NewRecycler(volume.Name, spec, recorder)
|
||||
if err != nil {
|
||||
// Cannot create recycler
|
||||
strerr := fmt.Sprintf("Failed to create recycler: %v", err)
|
||||
|
@ -1025,6 +1025,8 @@ func (ctrl *PersistentVolumeController) recycleVolumeOperation(arg interface{})
|
|||
}
|
||||
|
||||
glog.V(2).Infof("volume %q recycled", volume.Name)
|
||||
// Send an event
|
||||
ctrl.eventRecorder.Event(volume, api.EventTypeNormal, "VolumeRecycled", "Volume recycled")
|
||||
// Make the volume available again
|
||||
if err = ctrl.unbindVolume(volume); err != nil {
|
||||
// Oops, could not save the volume and therefore the controller will
|
||||
|
@ -1380,6 +1382,14 @@ func (ctrl *PersistentVolumeController) scheduleOperation(operationName string,
|
|||
}
|
||||
}
|
||||
|
||||
// newRecyclerEventRecorder returns a RecycleEventRecorder that sends all events
|
||||
// to given volume.
|
||||
func (ctrl *PersistentVolumeController) newRecyclerEventRecorder(volume *api.PersistentVolume) vol.RecycleEventRecorder {
|
||||
return func(eventtype, message string) {
|
||||
ctrl.eventRecorder.Eventf(volume, eventtype, "RecyclerPod", "Recycler pod: %s", message)
|
||||
}
|
||||
}
|
||||
|
||||
// findProvisionablePlugin finds a provisioner plugin for a given claim.
|
||||
// It returns either the provisioning plugin or nil when an external
|
||||
// provisioner is requested.
|
||||
|
|
|
@ -1205,7 +1205,7 @@ func (plugin *mockVolumePlugin) GetMetrics() (*vol.Metrics, error) {
|
|||
|
||||
// Recycler interfaces
|
||||
|
||||
func (plugin *mockVolumePlugin) NewRecycler(pvName string, spec *vol.Spec) (vol.Recycler, error) {
|
||||
func (plugin *mockVolumePlugin) NewRecycler(pvName string, spec *vol.Spec, eventRecorder vol.RecycleEventRecorder) (vol.Recycler, error) {
|
||||
if len(plugin.recycleCalls) > 0 {
|
||||
// mockVolumePlugin directly implements Recycler interface
|
||||
glog.V(4).Infof("mock plugin NewRecycler called, returning mock recycler")
|
||||
|
|
|
@ -46,7 +46,7 @@ func ProbeVolumePlugins(volumeConfig volume.VolumeConfig) []volume.VolumePlugin
|
|||
type hostPathPlugin struct {
|
||||
host volume.VolumeHost
|
||||
// decouple creating Recyclers/Deleters/Provisioners by deferring to a function. Allows for easier testing.
|
||||
newRecyclerFunc func(pvName string, spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error)
|
||||
newRecyclerFunc func(pvName string, spec *volume.Spec, eventRecorder volume.RecycleEventRecorder, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error)
|
||||
newDeleterFunc func(spec *volume.Spec, host volume.VolumeHost) (volume.Deleter, error)
|
||||
newProvisionerFunc func(options volume.VolumeOptions, host volume.VolumeHost) (volume.Provisioner, error)
|
||||
config volume.VolumeConfig
|
||||
|
@ -112,8 +112,8 @@ func (plugin *hostPathPlugin) NewUnmounter(volName string, podUID types.UID) (vo
|
|||
}}, nil
|
||||
}
|
||||
|
||||
func (plugin *hostPathPlugin) NewRecycler(pvName string, spec *volume.Spec) (volume.Recycler, error) {
|
||||
return plugin.newRecyclerFunc(pvName, spec, plugin.host, plugin.config)
|
||||
func (plugin *hostPathPlugin) NewRecycler(pvName string, spec *volume.Spec, eventRecorder volume.RecycleEventRecorder) (volume.Recycler, error) {
|
||||
return plugin.newRecyclerFunc(pvName, spec, eventRecorder, plugin.host, plugin.config)
|
||||
}
|
||||
|
||||
func (plugin *hostPathPlugin) NewDeleter(spec *volume.Spec) (volume.Deleter, error) {
|
||||
|
@ -142,18 +142,19 @@ func (plugin *hostPathPlugin) ConstructVolumeSpec(volumeName, mountPath string)
|
|||
return volume.NewSpecFromVolume(hostPathVolume), nil
|
||||
}
|
||||
|
||||
func newRecycler(pvName string, spec *volume.Spec, host volume.VolumeHost, config volume.VolumeConfig) (volume.Recycler, error) {
|
||||
func newRecycler(pvName string, spec *volume.Spec, eventRecorder volume.RecycleEventRecorder, host volume.VolumeHost, config volume.VolumeConfig) (volume.Recycler, error) {
|
||||
if spec.PersistentVolume == nil || spec.PersistentVolume.Spec.HostPath == nil {
|
||||
return nil, fmt.Errorf("spec.PersistentVolumeSource.HostPath is nil")
|
||||
}
|
||||
path := spec.PersistentVolume.Spec.HostPath.Path
|
||||
return &hostPathRecycler{
|
||||
name: spec.Name(),
|
||||
path: path,
|
||||
host: host,
|
||||
config: config,
|
||||
timeout: volume.CalculateTimeoutForVolume(config.RecyclerMinimumTimeout, config.RecyclerTimeoutIncrement, spec.PersistentVolume),
|
||||
pvName: pvName,
|
||||
name: spec.Name(),
|
||||
path: path,
|
||||
host: host,
|
||||
config: config,
|
||||
timeout: volume.CalculateTimeoutForVolume(config.RecyclerMinimumTimeout, config.RecyclerTimeoutIncrement, spec.PersistentVolume),
|
||||
pvName: pvName,
|
||||
eventRecorder: eventRecorder,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -234,7 +235,8 @@ type hostPathRecycler struct {
|
|||
config volume.VolumeConfig
|
||||
timeout int64
|
||||
volume.MetricsNil
|
||||
pvName string
|
||||
pvName string
|
||||
eventRecorder volume.RecycleEventRecorder
|
||||
}
|
||||
|
||||
func (r *hostPathRecycler) GetPath() string {
|
||||
|
@ -253,7 +255,7 @@ func (r *hostPathRecycler) Recycle() error {
|
|||
Path: r.path,
|
||||
},
|
||||
}
|
||||
return volume.RecycleVolumeByWatchingPodUntilCompletion(r.pvName, pod, r.host.GetKubeClient())
|
||||
return volume.RecycleVolumeByWatchingPodUntilCompletion(r.pvName, pod, r.host.GetKubeClient(), r.eventRecorder)
|
||||
}
|
||||
|
||||
// hostPathProvisioner implements a Provisioner for the HostPath plugin
|
||||
|
|
|
@ -78,7 +78,7 @@ func TestRecycler(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Errorf("Can't find the plugin by name")
|
||||
}
|
||||
recycler, err := plug.NewRecycler("pv-name", spec)
|
||||
recycler, err := plug.NewRecycler("pv-name", spec, nil)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to make a new Recyler: %v", err)
|
||||
}
|
||||
|
|
|
@ -46,7 +46,7 @@ func ProbeVolumePlugins(volumeConfig volume.VolumeConfig) []volume.VolumePlugin
|
|||
type nfsPlugin struct {
|
||||
host volume.VolumeHost
|
||||
// decouple creating recyclers by deferring to a function. Allows for easier testing.
|
||||
newRecyclerFunc func(pvName string, spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error)
|
||||
newRecyclerFunc func(pvName string, spec *volume.Spec, eventRecorder volume.RecycleEventRecorder, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error)
|
||||
config volume.VolumeConfig
|
||||
}
|
||||
|
||||
|
@ -132,8 +132,8 @@ func (plugin *nfsPlugin) newUnmounterInternal(volName string, podUID types.UID,
|
|||
}}, nil
|
||||
}
|
||||
|
||||
func (plugin *nfsPlugin) NewRecycler(pvName string, spec *volume.Spec) (volume.Recycler, error) {
|
||||
return plugin.newRecyclerFunc(pvName, spec, plugin.host, plugin.config)
|
||||
func (plugin *nfsPlugin) NewRecycler(pvName string, spec *volume.Spec, eventRecorder volume.RecycleEventRecorder) (volume.Recycler, error) {
|
||||
return plugin.newRecyclerFunc(pvName, spec, eventRecorder, plugin.host, plugin.config)
|
||||
}
|
||||
|
||||
func (plugin *nfsPlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
|
||||
|
@ -274,18 +274,19 @@ func (c *nfsUnmounter) TearDownAt(dir string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func newRecycler(pvName string, spec *volume.Spec, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error) {
|
||||
func newRecycler(pvName string, spec *volume.Spec, eventRecorder volume.RecycleEventRecorder, host volume.VolumeHost, volumeConfig volume.VolumeConfig) (volume.Recycler, error) {
|
||||
if spec.PersistentVolume == nil || spec.PersistentVolume.Spec.NFS == nil {
|
||||
return nil, fmt.Errorf("spec.PersistentVolumeSource.NFS is nil")
|
||||
}
|
||||
return &nfsRecycler{
|
||||
name: spec.Name(),
|
||||
server: spec.PersistentVolume.Spec.NFS.Server,
|
||||
path: spec.PersistentVolume.Spec.NFS.Path,
|
||||
host: host,
|
||||
config: volumeConfig,
|
||||
timeout: volume.CalculateTimeoutForVolume(volumeConfig.RecyclerMinimumTimeout, volumeConfig.RecyclerTimeoutIncrement, spec.PersistentVolume),
|
||||
pvName: pvName,
|
||||
name: spec.Name(),
|
||||
server: spec.PersistentVolume.Spec.NFS.Server,
|
||||
path: spec.PersistentVolume.Spec.NFS.Path,
|
||||
host: host,
|
||||
config: volumeConfig,
|
||||
timeout: volume.CalculateTimeoutForVolume(volumeConfig.RecyclerMinimumTimeout, volumeConfig.RecyclerTimeoutIncrement, spec.PersistentVolume),
|
||||
pvName: pvName,
|
||||
eventRecorder: eventRecorder,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -298,7 +299,8 @@ type nfsRecycler struct {
|
|||
config volume.VolumeConfig
|
||||
timeout int64
|
||||
volume.MetricsNil
|
||||
pvName string
|
||||
pvName string
|
||||
eventRecorder volume.RecycleEventRecorder
|
||||
}
|
||||
|
||||
func (r *nfsRecycler) GetPath() string {
|
||||
|
@ -318,7 +320,7 @@ func (r *nfsRecycler) Recycle() error {
|
|||
Path: r.path,
|
||||
},
|
||||
}
|
||||
return volume.RecycleVolumeByWatchingPodUntilCompletion(r.pvName, pod, r.host.GetKubeClient())
|
||||
return volume.RecycleVolumeByWatchingPodUntilCompletion(r.pvName, pod, r.host.GetKubeClient(), r.eventRecorder)
|
||||
}
|
||||
|
||||
func getVolumeSource(spec *volume.Spec) (*api.NFSVolumeSource, bool, error) {
|
||||
|
|
|
@ -91,7 +91,7 @@ func TestRecycler(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Errorf("Can't find the plugin by name")
|
||||
}
|
||||
recycler, err := plug.NewRecycler("pv-name", spec)
|
||||
recycler, err := plug.NewRecycler("pv-name", spec, nil)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to make a new Recyler: %v", err)
|
||||
}
|
||||
|
@ -103,7 +103,7 @@ func TestRecycler(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func newMockRecycler(pvName string, spec *volume.Spec, host volume.VolumeHost, config volume.VolumeConfig) (volume.Recycler, error) {
|
||||
func newMockRecycler(pvName string, spec *volume.Spec, eventRecorder volume.RecycleEventRecorder, host volume.VolumeHost, config volume.VolumeConfig) (volume.Recycler, error) {
|
||||
return &mockRecycler{
|
||||
path: spec.PersistentVolume.Spec.NFS.Path,
|
||||
}, nil
|
||||
|
|
|
@ -124,9 +124,12 @@ type PersistentVolumePlugin interface {
|
|||
// again to new claims
|
||||
type RecyclableVolumePlugin interface {
|
||||
VolumePlugin
|
||||
// NewRecycler creates a new volume.Recycler which knows how to reclaim
|
||||
// this resource after the volume's release from a PersistentVolumeClaim
|
||||
NewRecycler(pvName string, spec *Spec) (Recycler, error)
|
||||
// NewRecycler creates a new volume.Recycler which knows how to reclaim this
|
||||
// resource after the volume's release from a PersistentVolumeClaim. The
|
||||
// recycler will use the provided recorder to write any events that might be
|
||||
// interesting to user. It's expected that caller will pass these events to
|
||||
// the PV being recycled.
|
||||
NewRecycler(pvName string, spec *Spec, eventRecorder RecycleEventRecorder) (Recycler, error)
|
||||
}
|
||||
|
||||
// DeletableVolumePlugin is an extended interface of VolumePlugin and is used
|
||||
|
|
|
@ -273,7 +273,7 @@ func (plugin *FakeVolumePlugin) GetNewDetacherCallCount() int {
|
|||
return plugin.NewDetacherCallCount
|
||||
}
|
||||
|
||||
func (plugin *FakeVolumePlugin) NewRecycler(pvName string, spec *Spec) (Recycler, error) {
|
||||
func (plugin *FakeVolumePlugin) NewRecycler(pvName string, spec *Spec, eventRecorder RecycleEventRecorder) (Recycler, error) {
|
||||
return &fakeRecycler{"/attributesTransferredFromSpec", MetricsNil{}}, nil
|
||||
}
|
||||
|
||||
|
@ -457,7 +457,7 @@ func (fr *fakeRecycler) GetPath() string {
|
|||
return fr.path
|
||||
}
|
||||
|
||||
func NewFakeRecycler(pvName string, spec *Spec, host VolumeHost, config VolumeConfig) (Recycler, error) {
|
||||
func NewFakeRecycler(pvName string, spec *Spec, eventRecorder RecycleEventRecorder, host VolumeHost, config VolumeConfig) (Recycler, error) {
|
||||
if spec.PersistentVolume == nil || spec.PersistentVolume.Spec.HostPath == nil {
|
||||
return nil, fmt.Errorf("fakeRecycler only supports spec.PersistentVolume.Spec.HostPath")
|
||||
}
|
||||
|
|
|
@ -19,13 +19,10 @@ package volume
|
|||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/client/cache"
|
||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||
"k8s.io/kubernetes/pkg/fields"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
|
||||
"hash/fnv"
|
||||
|
@ -39,6 +36,8 @@ import (
|
|||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
)
|
||||
|
||||
type RecycleEventRecorder func(eventtype, message string)
|
||||
|
||||
// RecycleVolumeByWatchingPodUntilCompletion is intended for use with volume
|
||||
// Recyclers. This function will save the given Pod to the API and watch it
|
||||
// until it completes, fails, or the pod's ActiveDeadlineSeconds is exceeded,
|
||||
|
@ -52,8 +51,8 @@ import (
|
|||
// pod - the pod designed by a volume plugin to recycle the volume. pod.Name
|
||||
// will be overwritten with unique name based on PV.Name.
|
||||
// client - kube client for API operations.
|
||||
func RecycleVolumeByWatchingPodUntilCompletion(pvName string, pod *api.Pod, kubeClient clientset.Interface) error {
|
||||
return internalRecycleVolumeByWatchingPodUntilCompletion(pvName, pod, newRecyclerClient(kubeClient))
|
||||
func RecycleVolumeByWatchingPodUntilCompletion(pvName string, pod *api.Pod, kubeClient clientset.Interface, recorder RecycleEventRecorder) error {
|
||||
return internalRecycleVolumeByWatchingPodUntilCompletion(pvName, pod, newRecyclerClient(kubeClient, recorder))
|
||||
}
|
||||
|
||||
// same as above func comments, except 'recyclerClient' is a narrower pod API
|
||||
|
@ -67,34 +66,61 @@ func internalRecycleVolumeByWatchingPodUntilCompletion(pvName string, pod *api.P
|
|||
pod.Name = "recycler-for-" + pvName
|
||||
pod.GenerateName = ""
|
||||
|
||||
stopChannel := make(chan struct{})
|
||||
defer close(stopChannel)
|
||||
podCh, err := recyclerClient.WatchPod(pod.Name, pod.Namespace, stopChannel)
|
||||
if err != nil {
|
||||
glog.V(4).Infof("cannot start watcher for pod %s/%s: %v", pod.Namespace, pod.Name, err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Start the pod
|
||||
_, err := recyclerClient.CreatePod(pod)
|
||||
_, err = recyclerClient.CreatePod(pod)
|
||||
if err != nil {
|
||||
if errors.IsAlreadyExists(err) {
|
||||
glog.V(5).Infof("old recycler pod %q found for volume", pod.Name)
|
||||
} else {
|
||||
return fmt.Errorf("Unexpected error creating recycler pod: %+v\n", err)
|
||||
return fmt.Errorf("unexpected error creating recycler pod: %+v\n", err)
|
||||
}
|
||||
}
|
||||
defer recyclerClient.DeletePod(pod.Name, pod.Namespace)
|
||||
|
||||
// Now only the old pod or the new pod run. Watch it until it finishes.
|
||||
stopChannel := make(chan struct{})
|
||||
defer close(stopChannel)
|
||||
nextPod := recyclerClient.WatchPod(pod.Name, pod.Namespace, stopChannel)
|
||||
|
||||
// Now only the old pod or the new pod run. Watch it until it finishes
|
||||
// and send all events on the pod to the PV
|
||||
for {
|
||||
watchedPod := nextPod()
|
||||
if watchedPod.Status.Phase == api.PodSucceeded {
|
||||
// volume.Recycle() returns nil on success, else error
|
||||
return nil
|
||||
}
|
||||
if watchedPod.Status.Phase == api.PodFailed {
|
||||
// volume.Recycle() returns nil on success, else error
|
||||
if watchedPod.Status.Message != "" {
|
||||
return fmt.Errorf(watchedPod.Status.Message)
|
||||
} else {
|
||||
return fmt.Errorf("pod failed, pod.Status.Message unknown.")
|
||||
event := <-podCh
|
||||
switch event.Object.(type) {
|
||||
case *api.Pod:
|
||||
// POD changed
|
||||
pod := event.Object.(*api.Pod)
|
||||
glog.V(4).Infof("recycler pod update received: %s %s/%s %s", event.Type, pod.Namespace, pod.Name, pod.Status.Phase)
|
||||
switch event.Type {
|
||||
case watch.Added, watch.Modified:
|
||||
if pod.Status.Phase == api.PodSucceeded {
|
||||
// Recycle succeeded.
|
||||
return nil
|
||||
}
|
||||
if pod.Status.Phase == api.PodFailed {
|
||||
if pod.Status.Message != "" {
|
||||
return fmt.Errorf(pod.Status.Message)
|
||||
} else {
|
||||
return fmt.Errorf("pod failed, pod.Status.Message unknown.")
|
||||
}
|
||||
}
|
||||
|
||||
case watch.Deleted:
|
||||
return fmt.Errorf("recycler pod was deleted")
|
||||
|
||||
case watch.Error:
|
||||
return fmt.Errorf("recycler pod watcher failed")
|
||||
}
|
||||
|
||||
case *api.Event:
|
||||
// Event received
|
||||
podEvent := event.Object.(*api.Event)
|
||||
glog.V(4).Infof("recycler event received: %s %s/%s %s/%s %s", event.Type, podEvent.Namespace, podEvent.Name, podEvent.InvolvedObject.Namespace, podEvent.InvolvedObject.Name, podEvent.Message)
|
||||
if event.Type == watch.Added {
|
||||
recyclerClient.Event(podEvent.Type, podEvent.Message)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -106,15 +132,24 @@ type recyclerClient interface {
|
|||
CreatePod(pod *api.Pod) (*api.Pod, error)
|
||||
GetPod(name, namespace string) (*api.Pod, error)
|
||||
DeletePod(name, namespace string) error
|
||||
WatchPod(name, namespace string, stopChannel chan struct{}) func() *api.Pod
|
||||
// WatchPod returns a ListWatch for watching a pod. The stopChannel is used
|
||||
// to close the reflector backing the watch. The caller is responsible for
|
||||
// derring a close on the channel to stop the reflector.
|
||||
WatchPod(name, namespace string, stopChannel chan struct{}) (<-chan watch.Event, error)
|
||||
// Event sends an event to the volume that is being recycled.
|
||||
Event(eventtype, message string)
|
||||
}
|
||||
|
||||
func newRecyclerClient(client clientset.Interface) recyclerClient {
|
||||
return &realRecyclerClient{client}
|
||||
func newRecyclerClient(client clientset.Interface, recorder RecycleEventRecorder) recyclerClient {
|
||||
return &realRecyclerClient{
|
||||
client,
|
||||
recorder,
|
||||
}
|
||||
}
|
||||
|
||||
type realRecyclerClient struct {
|
||||
client clientset.Interface
|
||||
client clientset.Interface
|
||||
recorder RecycleEventRecorder
|
||||
}
|
||||
|
||||
func (c *realRecyclerClient) CreatePod(pod *api.Pod) (*api.Pod, error) {
|
||||
|
@ -129,28 +164,60 @@ func (c *realRecyclerClient) DeletePod(name, namespace string) error {
|
|||
return c.client.Core().Pods(namespace).Delete(name, nil)
|
||||
}
|
||||
|
||||
// WatchPod returns a ListWatch for watching a pod. The stopChannel is used
|
||||
// to close the reflector backing the watch. The caller is responsible for
|
||||
// derring a close on the channel to stop the reflector.
|
||||
func (c *realRecyclerClient) WatchPod(name, namespace string, stopChannel chan struct{}) func() *api.Pod {
|
||||
fieldSelector, _ := fields.ParseSelector("metadata.name=" + name)
|
||||
func (c *realRecyclerClient) Event(eventtype, message string) {
|
||||
c.recorder(eventtype, message)
|
||||
}
|
||||
|
||||
podLW := &cache.ListWatch{
|
||||
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
|
||||
options.FieldSelector = fieldSelector
|
||||
return c.client.Core().Pods(namespace).List(options)
|
||||
},
|
||||
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
|
||||
options.FieldSelector = fieldSelector
|
||||
return c.client.Core().Pods(namespace).Watch(options)
|
||||
},
|
||||
func (c *realRecyclerClient) WatchPod(name, namespace string, stopChannel chan struct{}) (<-chan watch.Event, error) {
|
||||
podSelector, _ := fields.ParseSelector("metadata.name=" + name)
|
||||
options := api.ListOptions{
|
||||
FieldSelector: podSelector,
|
||||
Watch: true,
|
||||
}
|
||||
queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc)
|
||||
cache.NewReflector(podLW, &api.Pod{}, queue, 1*time.Minute).RunUntil(stopChannel)
|
||||
|
||||
return func() *api.Pod {
|
||||
return cache.Pop(queue).(*api.Pod)
|
||||
podWatch, err := c.client.Core().Pods(namespace).Watch(options)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
eventSelector, _ := fields.ParseSelector("involvedObject.name=" + name)
|
||||
eventWatch, err := c.client.Core().Events(namespace).Watch(api.ListOptions{
|
||||
FieldSelector: eventSelector,
|
||||
Watch: true,
|
||||
})
|
||||
if err != nil {
|
||||
podWatch.Stop()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
eventCh := make(chan watch.Event, 0)
|
||||
|
||||
go func() {
|
||||
defer eventWatch.Stop()
|
||||
defer podWatch.Stop()
|
||||
defer close(eventCh)
|
||||
|
||||
for {
|
||||
select {
|
||||
case _ = <-stopChannel:
|
||||
return
|
||||
|
||||
case podEvent, ok := <-podWatch.ResultChan():
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
eventCh <- podEvent
|
||||
|
||||
case eventEvent, ok := <-eventWatch.ResultChan():
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
eventCh <- eventEvent
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return eventCh, nil
|
||||
}
|
||||
|
||||
// CalculateTimeoutForVolume calculates time for a Recycler pod to complete a
|
||||
|
|
|
@ -24,106 +24,195 @@ import (
|
|||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/errors"
|
||||
"k8s.io/kubernetes/pkg/api/resource"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
func TestRecyclerSuccess(t *testing.T) {
|
||||
client := &mockRecyclerClient{}
|
||||
recycler := &api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Namespace: api.NamespaceDefault,
|
||||
},
|
||||
Status: api.PodStatus{
|
||||
Phase: api.PodSucceeded,
|
||||
},
|
||||
}
|
||||
type testcase struct {
|
||||
// Input of the test
|
||||
name string
|
||||
existingPod *api.Pod
|
||||
createPod *api.Pod
|
||||
// eventSequence is list of events that are simulated during recycling. It
|
||||
// can be either event generated by a recycler pod or a state change of
|
||||
// the pod. (see newPodEvent and newEvent below).
|
||||
eventSequence []watch.Event
|
||||
|
||||
err := internalRecycleVolumeByWatchingPodUntilCompletion("pv-name", recycler, client)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error watching recycler pod: %+v", err)
|
||||
}
|
||||
if !client.deletedCalled {
|
||||
t.Errorf("Expected deferred client.Delete to be called on recycler pod")
|
||||
// Expected output.
|
||||
// expectedEvents is list of events that were sent to the volume that was
|
||||
// recycled.
|
||||
expectedEvents []mockEvent
|
||||
expectedError string
|
||||
}
|
||||
|
||||
func newPodEvent(eventtype watch.EventType, name string, phase api.PodPhase, message string) watch.Event {
|
||||
return watch.Event{
|
||||
Type: eventtype,
|
||||
Object: newPod(name, phase, message),
|
||||
}
|
||||
}
|
||||
|
||||
func TestRecyclerFailure(t *testing.T) {
|
||||
client := &mockRecyclerClient{}
|
||||
recycler := &api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Namespace: api.NamespaceDefault,
|
||||
func newEvent(eventtype, message string) watch.Event {
|
||||
return watch.Event{
|
||||
Type: watch.Added,
|
||||
Object: &api.Event{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Namespace: api.NamespaceDefault,
|
||||
},
|
||||
Reason: "MockEvent",
|
||||
Message: message,
|
||||
Type: eventtype,
|
||||
},
|
||||
Status: api.PodStatus{
|
||||
Phase: api.PodFailed,
|
||||
Message: "foo",
|
||||
},
|
||||
}
|
||||
|
||||
err := internalRecycleVolumeByWatchingPodUntilCompletion("pv-name", recycler, client)
|
||||
if err == nil {
|
||||
t.Fatalf("Expected pod failure but got nil error returned")
|
||||
}
|
||||
if err != nil {
|
||||
if !strings.Contains(err.Error(), "foo") {
|
||||
t.Errorf("Expected pod.Status.Message %s but got %s", recycler.Status.Message, err)
|
||||
}
|
||||
}
|
||||
if !client.deletedCalled {
|
||||
t.Errorf("Expected deferred client.Delete to be called on recycler pod")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRecyclerAlreadyExists(t *testing.T) {
|
||||
// Test that internalRecycleVolumeByWatchingPodUntilCompletion does not
|
||||
// start a new recycler when an old one is already running.
|
||||
|
||||
// Old recycler is running and fails with "foo" error message
|
||||
oldRecycler := &api.Pod{
|
||||
func newPod(name string, phase api.PodPhase, message string) *api.Pod {
|
||||
return &api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "recycler-test",
|
||||
Namespace: api.NamespaceDefault,
|
||||
Name: name,
|
||||
},
|
||||
Status: api.PodStatus{
|
||||
Phase: api.PodFailed,
|
||||
Message: "foo",
|
||||
Phase: phase,
|
||||
Message: message,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func TestRecyclerPod(t *testing.T) {
|
||||
tests := []testcase{
|
||||
{
|
||||
// Test recycler success with some events
|
||||
name: "RecyclerSuccess",
|
||||
createPod: newPod("podRecyclerSuccess", api.PodPending, ""),
|
||||
eventSequence: []watch.Event{
|
||||
// Pod gets Running and Succeeded
|
||||
newPodEvent(watch.Added, "podRecyclerSuccess", api.PodPending, ""),
|
||||
newEvent(api.EventTypeNormal, "Successfully assigned recycler-for-podRecyclerSuccess to 127.0.0.1"),
|
||||
newEvent(api.EventTypeNormal, "pulling image \"gcr.io/google_containers/busybox\""),
|
||||
newEvent(api.EventTypeNormal, "Successfully pulled image \"gcr.io/google_containers/busybox\""),
|
||||
newEvent(api.EventTypeNormal, "Created container with docker id 83d929aeac82"),
|
||||
newEvent(api.EventTypeNormal, "Started container with docker id 83d929aeac82"),
|
||||
newPodEvent(watch.Modified, "podRecyclerSuccess", api.PodRunning, ""),
|
||||
newPodEvent(watch.Modified, "podRecyclerSuccess", api.PodSucceeded, ""),
|
||||
},
|
||||
expectedEvents: []mockEvent{
|
||||
{api.EventTypeNormal, "Successfully assigned recycler-for-podRecyclerSuccess to 127.0.0.1"},
|
||||
{api.EventTypeNormal, "pulling image \"gcr.io/google_containers/busybox\""},
|
||||
{api.EventTypeNormal, "Successfully pulled image \"gcr.io/google_containers/busybox\""},
|
||||
{api.EventTypeNormal, "Created container with docker id 83d929aeac82"},
|
||||
{api.EventTypeNormal, "Started container with docker id 83d929aeac82"},
|
||||
},
|
||||
expectedError: "",
|
||||
},
|
||||
{
|
||||
// Test recycler failure with some events
|
||||
name: "RecyclerFailure",
|
||||
createPod: newPod("podRecyclerFailure", api.PodPending, ""),
|
||||
eventSequence: []watch.Event{
|
||||
// Pod gets Running and Succeeded
|
||||
newPodEvent(watch.Added, "podRecyclerFailure", api.PodPending, ""),
|
||||
newEvent(api.EventTypeNormal, "Successfully assigned recycler-for-podRecyclerFailure to 127.0.0.1"),
|
||||
newEvent(api.EventTypeWarning, "Unable to mount volumes for pod \"recycler-for-podRecyclerFailure_default(3c9809e5-347c-11e6-a79b-3c970e965218)\": timeout expired waiting for volumes to attach/mount"),
|
||||
newEvent(api.EventTypeWarning, "Error syncing pod, skipping: timeout expired waiting for volumes to attach/mount for pod \"recycler-for-podRecyclerFailure\"/\"default\". list of unattached/unmounted"),
|
||||
newPodEvent(watch.Modified, "podRecyclerFailure", api.PodRunning, ""),
|
||||
newPodEvent(watch.Modified, "podRecyclerFailure", api.PodFailed, "Pod was active on the node longer than specified deadline"),
|
||||
},
|
||||
expectedEvents: []mockEvent{
|
||||
{api.EventTypeNormal, "Successfully assigned recycler-for-podRecyclerFailure to 127.0.0.1"},
|
||||
{api.EventTypeWarning, "Unable to mount volumes for pod \"recycler-for-podRecyclerFailure_default(3c9809e5-347c-11e6-a79b-3c970e965218)\": timeout expired waiting for volumes to attach/mount"},
|
||||
{api.EventTypeWarning, "Error syncing pod, skipping: timeout expired waiting for volumes to attach/mount for pod \"recycler-for-podRecyclerFailure\"/\"default\". list of unattached/unmounted"},
|
||||
},
|
||||
expectedError: "Pod was active on the node longer than specified deadline",
|
||||
},
|
||||
{
|
||||
// Recycler pod gets deleted
|
||||
name: "RecyclerDeleted",
|
||||
createPod: newPod("podRecyclerDeleted", api.PodPending, ""),
|
||||
eventSequence: []watch.Event{
|
||||
// Pod gets Running and Succeeded
|
||||
newPodEvent(watch.Added, "podRecyclerDeleted", api.PodPending, ""),
|
||||
newEvent(api.EventTypeNormal, "Successfully assigned recycler-for-podRecyclerDeleted to 127.0.0.1"),
|
||||
newPodEvent(watch.Deleted, "podRecyclerDeleted", api.PodPending, ""),
|
||||
},
|
||||
expectedEvents: []mockEvent{
|
||||
{api.EventTypeNormal, "Successfully assigned recycler-for-podRecyclerDeleted to 127.0.0.1"},
|
||||
},
|
||||
expectedError: "recycler pod was deleted",
|
||||
},
|
||||
{
|
||||
// Another recycler pod is already running
|
||||
name: "RecyclerRunning",
|
||||
existingPod: newPod("podOldRecycler", api.PodRunning, ""),
|
||||
createPod: newPod("podNewRecycler", api.PodFailed, "mock message"),
|
||||
eventSequence: []watch.Event{
|
||||
// Old pod succeeds
|
||||
newPodEvent(watch.Modified, "podOldRecycler", api.PodSucceeded, ""),
|
||||
},
|
||||
// No error = old pod succeeded. If the new pod was used, there
|
||||
// would be error with "mock message".
|
||||
expectedError: "",
|
||||
},
|
||||
{
|
||||
// Another recycler pod is already running and fails
|
||||
name: "FailedRecyclerRunning",
|
||||
existingPod: newPod("podOldRecycler", api.PodRunning, ""),
|
||||
createPod: newPod("podNewRecycler", api.PodFailed, "mock message"),
|
||||
eventSequence: []watch.Event{
|
||||
// Old pod failure
|
||||
newPodEvent(watch.Modified, "podOldRecycler", api.PodFailed, "Pod was active on the node longer than specified deadline"),
|
||||
},
|
||||
// If the new pod was used, there would be error with "mock message".
|
||||
expectedError: "Pod was active on the node longer than specified deadline",
|
||||
},
|
||||
}
|
||||
|
||||
// New recycler _would_ succeed if it was run
|
||||
newRecycler := &api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "recycler-test",
|
||||
Namespace: api.NamespaceDefault,
|
||||
},
|
||||
Status: api.PodStatus{
|
||||
Phase: api.PodSucceeded,
|
||||
Message: "bar",
|
||||
},
|
||||
}
|
||||
|
||||
client := &mockRecyclerClient{
|
||||
pod: oldRecycler,
|
||||
}
|
||||
|
||||
err := internalRecycleVolumeByWatchingPodUntilCompletion("pv-name", newRecycler, client)
|
||||
if err == nil {
|
||||
t.Fatalf("Expected pod failure but got nil error returned")
|
||||
}
|
||||
|
||||
// Check the recycler failed with "foo" error message, i.e. it was the
|
||||
// old recycler that finished and not the new one.
|
||||
if err != nil {
|
||||
if !strings.Contains(err.Error(), "foo") {
|
||||
t.Errorf("Expected pod.Status.Message %s but got %s", oldRecycler.Status.Message, err)
|
||||
for _, test := range tests {
|
||||
t.Logf("Test %q", test.name)
|
||||
client := &mockRecyclerClient{
|
||||
events: test.eventSequence,
|
||||
pod: test.existingPod,
|
||||
}
|
||||
err := internalRecycleVolumeByWatchingPodUntilCompletion(test.createPod.Name, test.createPod, client)
|
||||
receivedError := ""
|
||||
if err != nil {
|
||||
receivedError = err.Error()
|
||||
}
|
||||
if receivedError != test.expectedError {
|
||||
t.Errorf("Test %q failed, expected error %q, got %q", test.name, test.expectedError, receivedError)
|
||||
continue
|
||||
}
|
||||
if !client.deletedCalled {
|
||||
t.Errorf("Test %q failed, expected deferred client.Delete to be called on recycler pod", test.name)
|
||||
continue
|
||||
}
|
||||
for i, expectedEvent := range test.expectedEvents {
|
||||
if len(client.receivedEvents) <= i {
|
||||
t.Errorf("Test %q failed, expected event %d: %q not received", test.name, i, expectedEvent.message)
|
||||
continue
|
||||
}
|
||||
receivedEvent := client.receivedEvents[i]
|
||||
if expectedEvent.eventtype != receivedEvent.eventtype {
|
||||
t.Errorf("Test %q failed, event %d does not match: expected eventtype %q, got %q", test.name, i, expectedEvent.eventtype, receivedEvent.eventtype)
|
||||
}
|
||||
if expectedEvent.message != receivedEvent.message {
|
||||
t.Errorf("Test %q failed, event %d does not match: expected message %q, got %q", test.name, i, expectedEvent.message, receivedEvent.message)
|
||||
}
|
||||
}
|
||||
for i := len(test.expectedEvents); i < len(client.receivedEvents); i++ {
|
||||
t.Errorf("Test %q failed, unexpected event received: %s, %q", test.name, client.receivedEvents[i].eventtype, client.receivedEvents[i].message)
|
||||
}
|
||||
}
|
||||
if !client.deletedCalled {
|
||||
t.Errorf("Expected deferred client.Delete to be called on recycler pod")
|
||||
}
|
||||
}
|
||||
|
||||
type mockRecyclerClient struct {
|
||||
pod *api.Pod
|
||||
deletedCalled bool
|
||||
pod *api.Pod
|
||||
deletedCalled bool
|
||||
receivedEvents []mockEvent
|
||||
events []watch.Event
|
||||
}
|
||||
|
||||
type mockEvent struct {
|
||||
eventtype, message string
|
||||
}
|
||||
|
||||
func (c *mockRecyclerClient) CreatePod(pod *api.Pod) (*api.Pod, error) {
|
||||
|
@ -148,10 +237,18 @@ func (c *mockRecyclerClient) DeletePod(name, namespace string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *mockRecyclerClient) WatchPod(name, namespace string, stopChannel chan struct{}) func() *api.Pod {
|
||||
return func() *api.Pod {
|
||||
return c.pod
|
||||
}
|
||||
func (c *mockRecyclerClient) WatchPod(name, namespace string, stopChannel chan struct{}) (<-chan watch.Event, error) {
|
||||
eventCh := make(chan watch.Event, 0)
|
||||
go func() {
|
||||
for _, e := range c.events {
|
||||
eventCh <- e
|
||||
}
|
||||
}()
|
||||
return eventCh, nil
|
||||
}
|
||||
|
||||
func (c *mockRecyclerClient) Event(eventtype, message string) {
|
||||
c.receivedEvents = append(c.receivedEvents, mockEvent{eventtype, message})
|
||||
}
|
||||
|
||||
func TestCalculateTimeoutForVolume(t *testing.T) {
|
||||
|
|
Loading…
Reference in New Issue