diff --git a/pkg/kubectl/cmd/drain.go b/pkg/kubectl/cmd/drain.go index 19a4b249ee..c5948b2c2c 100644 --- a/pkg/kubectl/cmd/drain.go +++ b/pkg/kubectl/cmd/drain.go @@ -34,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/json" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/strategicpatch" @@ -572,38 +573,39 @@ func (o *DrainOptions) deleteOrEvictPods(pods []corev1.Pod) error { } func (o *DrainOptions) evictPods(pods []corev1.Pod, policyGroupVersion string, getPodFn func(namespace, name string) (*corev1.Pod, error)) error { - doneCh := make(chan bool, len(pods)) - errCh := make(chan error, 1) + returnCh := make(chan error, 1) for _, pod := range pods { - go func(pod corev1.Pod, doneCh chan bool, errCh chan error) { + go func(pod corev1.Pod, returnCh chan error) { var err error for { err = o.evictPod(pod, policyGroupVersion) if err == nil { break } else if apierrors.IsNotFound(err) { - doneCh <- true + returnCh <- nil return } else if apierrors.IsTooManyRequests(err) { fmt.Fprintf(o.ErrOut, "error when evicting pod %q (will retry after 5s): %v\n", pod.Name, err) time.Sleep(5 * time.Second) } else { - errCh <- fmt.Errorf("error when evicting pod %q: %v", pod.Name, err) + returnCh <- fmt.Errorf("error when evicting pod %q: %v", pod.Name, err) return } } podArray := []corev1.Pod{pod} _, err = o.waitForDelete(podArray, 1*time.Second, time.Duration(math.MaxInt64), true, getPodFn) if err == nil { - doneCh <- true + returnCh <- nil } else { - errCh <- fmt.Errorf("error when waiting for pod %q terminating: %v", pod.Name, err) + returnCh <- fmt.Errorf("error when waiting for pod %q terminating: %v", pod.Name, err) } - }(pod, doneCh, errCh) + }(pod, returnCh) } doneCount := 0 + var errors []error + // 0 timeout means infinite, we use MaxInt64 to represent it. var globalTimeout time.Duration if o.Timeout == 0 { @@ -612,19 +614,19 @@ func (o *DrainOptions) evictPods(pods []corev1.Pod, policyGroupVersion string, g globalTimeout = o.Timeout } globalTimeoutCh := time.After(globalTimeout) - for { + numPods := len(pods) + for doneCount < numPods { select { - case err := <-errCh: - return err - case <-doneCh: + case err := <-returnCh: doneCount++ - if doneCount == len(pods) { - return nil + if err != nil { + errors = append(errors, err) } case <-globalTimeoutCh: return fmt.Errorf("Drain did not complete within %v", globalTimeout) } } + return utilerrors.NewAggregate(errors) } func (o *DrainOptions) deletePods(pods []corev1.Pod, getPodFn func(namespace, name string) (*corev1.Pod, error)) error {