From 3d4a02abb54244861f9f05b8db2fdfdaa2c6f67c Mon Sep 17 00:00:00 2001 From: Tomas Nozicka Date: Thu, 2 Aug 2018 14:11:59 +0200 Subject: [PATCH] Rename Until to UntilWithoutRetry and move to using context so it's cancelable --- pkg/client/tests/listwatch_test.go | 3 +- pkg/kubectl/cmd/get/get.go | 8 ++- pkg/kubectl/cmd/rollout/rollout_status.go | 11 +++- pkg/kubectl/cmd/run.go | 15 +++-- pkg/kubectl/cmd/wait/wait.go | 16 +++-- pkg/kubectl/polymorphichelpers/helpers.go | 7 +- .../k8s.io/client-go/tools/cache/listwatch.go | 9 ++- .../src/k8s.io/client-go/tools/watch/until.go | 49 +++++++++----- .../client-go/tools/watch/until_test.go | 66 ++++++++++--------- test/e2e/apps/statefulset.go | 16 +++-- test/e2e/common/init_container.go | 22 +++++-- test/e2e/framework/deployment_util.go | 6 +- test/e2e/framework/util.go | 16 +++-- test/e2e_node/apparmor_test.go | 6 +- test/integration/auth/rbac_test.go | 6 +- test/integration/quota/quota_test.go | 10 ++- 16 files changed, 178 insertions(+), 88 deletions(-) diff --git a/pkg/client/tests/listwatch_test.go b/pkg/client/tests/listwatch_test.go index ed9907ff3b..0fcc79c486 100644 --- a/pkg/client/tests/listwatch_test.go +++ b/pkg/client/tests/listwatch_test.go @@ -30,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/watch" restclient "k8s.io/client-go/rest" . "k8s.io/client-go/tools/cache" + watchtools "k8s.io/client-go/tools/watch" utiltesting "k8s.io/client-go/util/testing" "k8s.io/kubernetes/pkg/api/testapi" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" @@ -201,7 +202,7 @@ func TestListWatchUntil(t *testing.T) { watch: fw, } - conditions := []watch.ConditionFunc{ + conditions := []watchtools.ConditionFunc{ func(event watch.Event) (bool, error) { t.Logf("got %#v", event) return event.Type == watch.Added, nil diff --git a/pkg/kubectl/cmd/get/get.go b/pkg/kubectl/cmd/get/get.go index f28db78081..f563a6c7b5 100644 --- a/pkg/kubectl/cmd/get/get.go +++ b/pkg/kubectl/cmd/get/get.go @@ -17,6 +17,7 @@ limitations under the License. package get import ( + "context" "encoding/json" "fmt" "io" @@ -36,6 +37,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/rest" + watchtools "k8s.io/client-go/tools/watch" "k8s.io/kubernetes/pkg/api/legacyscheme" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/kubectl" @@ -564,9 +566,11 @@ func (o *GetOptions) watch(f cmdutil.Factory, cmd *cobra.Command, args []string) } first := true - intr := interrupt.New(nil, w.Stop) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + intr := interrupt.New(nil, cancel) intr.Run(func() error { - _, err := watch.Until(0, w, func(e watch.Event) (bool, error) { + _, err := watchtools.UntilWithoutRetry(ctx, w, func(e watch.Event) (bool, error) { if !isList && first { // drop the initial watch event in the single resource case first = false diff --git a/pkg/kubectl/cmd/rollout/rollout_status.go b/pkg/kubectl/cmd/rollout/rollout_status.go index 3dccd06773..0a6d237ba7 100644 --- a/pkg/kubectl/cmd/rollout/rollout_status.go +++ b/pkg/kubectl/cmd/rollout/rollout_status.go @@ -17,12 +17,15 @@ limitations under the License. package rollout import ( + "context" "fmt" + "time" "github.com/spf13/cobra" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/watch" + watchtools "k8s.io/client-go/tools/watch" "k8s.io/kubernetes/pkg/kubectl" "k8s.io/kubernetes/pkg/kubectl/cmd/templates" cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" @@ -191,9 +194,13 @@ func (o *RolloutStatusOptions) Run() error { } // if the rollout isn't done yet, keep watching deployment status - intr := interrupt.New(nil, w.Stop) + // TODO: expose timeout + timeout := 0 * time.Second + ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout) + defer cancel() + intr := interrupt.New(nil, cancel) return intr.Run(func() error { - _, err := watch.Until(0, w, func(e watch.Event) (bool, error) { + _, err := watchtools.UntilWithoutRetry(ctx, w, func(e watch.Event) (bool, error) { // print deployment's status status, done, err := statusViewer.Status(info.Namespace, info.Name, o.Revision) if err != nil { diff --git a/pkg/kubectl/cmd/run.go b/pkg/kubectl/cmd/run.go index 5a9d9578f7..16933555b7 100644 --- a/pkg/kubectl/cmd/run.go +++ b/pkg/kubectl/cmd/run.go @@ -17,12 +17,13 @@ limitations under the License. package cmd import ( + "context" "fmt" + "time" "github.com/docker/distribution/reference" - "github.com/spf13/cobra" - "github.com/golang/glog" + "github.com/spf13/cobra" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -34,6 +35,7 @@ import ( "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" + watchtools "k8s.io/client-go/tools/watch" "k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/pkg/kubectl" "k8s.io/kubernetes/pkg/kubectl/cmd/templates" @@ -467,16 +469,19 @@ func (o *RunOptions) removeCreatedObjects(f cmdutil.Factory, createdObjects []*R } // waitForPod watches the given pod until the exitCondition is true -func waitForPod(podClient corev1client.PodsGetter, ns, name string, exitCondition watch.ConditionFunc) (*corev1.Pod, error) { +func waitForPod(podClient corev1client.PodsGetter, ns, name string, exitCondition watchtools.ConditionFunc) (*corev1.Pod, error) { w, err := podClient.Pods(ns).Watch(metav1.SingleObject(metav1.ObjectMeta{Name: name})) if err != nil { return nil, err } - intr := interrupt.New(nil, w.Stop) + // TODO: expose the timeout + ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), 0*time.Second) + defer cancel() + intr := interrupt.New(nil, cancel) var result *corev1.Pod err = intr.Run(func() error { - ev, err := watch.Until(0, w, func(ev watch.Event) (bool, error) { + ev, err := watchtools.UntilWithoutRetry(ctx, w, func(ev watch.Event) (bool, error) { return exitCondition(ev) }) if ev != nil { diff --git a/pkg/kubectl/cmd/wait/wait.go b/pkg/kubectl/cmd/wait/wait.go index 69cdb6c5e5..ad2ea0af7a 100644 --- a/pkg/kubectl/cmd/wait/wait.go +++ b/pkg/kubectl/cmd/wait/wait.go @@ -17,6 +17,7 @@ limitations under the License. package wait import ( + "context" "errors" "fmt" "strings" @@ -33,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/dynamic" + watchtools "k8s.io/client-go/tools/watch" "k8s.io/kubernetes/pkg/kubectl/cmd/templates" cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" "k8s.io/kubernetes/pkg/kubectl/genericclioptions" @@ -272,11 +274,14 @@ func IsDeleted(info *resource.Info, o *WaitOptions) (runtime.Object, bool, error // we're out of time return gottenObj, false, wait.ErrWaitTimeout } - watchEvent, err := watch.Until(o.Timeout, objWatch, isDeleted) + + ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), o.Timeout) + watchEvent, err := watchtools.UntilWithoutRetry(ctx, objWatch, isDeleted) + cancel() switch { case err == nil: return watchEvent.Object, true, nil - case err == watch.ErrWatchClosed: + case err == watchtools.ErrWatchClosed: continue case err == wait.ErrWaitTimeout: if watchEvent != nil { @@ -334,11 +339,14 @@ func (w ConditionalWait) IsConditionMet(info *resource.Info, o *WaitOptions) (ru // we're out of time return gottenObj, false, wait.ErrWaitTimeout } - watchEvent, err := watch.Until(o.Timeout, objWatch, w.isConditionMet) + + ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), o.Timeout) + watchEvent, err := watchtools.UntilWithoutRetry(ctx, objWatch, w.isConditionMet) + cancel() switch { case err == nil: return watchEvent.Object, true, nil - case err == watch.ErrWatchClosed: + case err == watchtools.ErrWatchClosed: continue case err == wait.ErrWaitTimeout: if watchEvent != nil { diff --git a/pkg/kubectl/polymorphichelpers/helpers.go b/pkg/kubectl/polymorphichelpers/helpers.go index 3e1ba59cce..f55ec48933 100644 --- a/pkg/kubectl/polymorphichelpers/helpers.go +++ b/pkg/kubectl/polymorphichelpers/helpers.go @@ -17,6 +17,7 @@ limitations under the License. package polymorphichelpers import ( + "context" "fmt" "sort" "time" @@ -33,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" coreclient "k8s.io/client-go/kubernetes/typed/core/v1" + watchtools "k8s.io/client-go/tools/watch" "k8s.io/kubernetes/pkg/apis/apps" "k8s.io/kubernetes/pkg/apis/batch" api "k8s.io/kubernetes/pkg/apis/core" @@ -69,7 +71,10 @@ func GetFirstPod(client coreclient.PodsGetter, namespace string, selector string condition := func(event watch.Event) (bool, error) { return event.Type == watch.Added || event.Type == watch.Modified, nil } - event, err := watch.Until(timeout, w, condition) + + ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout) + defer cancel() + event, err := watchtools.UntilWithoutRetry(ctx, w, condition) if err != nil { return nil, 0, err } diff --git a/staging/src/k8s.io/client-go/tools/cache/listwatch.go b/staging/src/k8s.io/client-go/tools/cache/listwatch.go index 8bf41f517e..30463aea7d 100644 --- a/staging/src/k8s.io/client-go/tools/cache/listwatch.go +++ b/staging/src/k8s.io/client-go/tools/cache/listwatch.go @@ -28,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/watch" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/pager" + watchtools "k8s.io/client-go/tools/watch" ) // ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource. @@ -116,7 +117,7 @@ func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) // ListWatchUntil checks the provided conditions against the items returned by the list watcher, returning wait.ErrWaitTimeout // if timeout is exceeded without all conditions returning true, or an error if an error occurs. // TODO: check for watch expired error and retry watch from latest point? Same issue exists for Until. -func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watch.ConditionFunc) (*watch.Event, error) { +func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watchtools.ConditionFunc) (*watch.Event, error) { if len(conditions) == 0 { return nil, nil } @@ -178,8 +179,10 @@ func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watch return nil, err } - evt, err := watch.Until(timeout, watchInterface, remainingConditions...) - if err == watch.ErrWatchClosed { + ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout) + defer cancel() + evt, err := watchtools.UntilWithoutRetry(ctx, watchInterface, remainingConditions...) + if err == watchtools.ErrWatchClosed { // present a consistent error interface to callers err = wait.ErrWaitTimeout } diff --git a/staging/src/k8s.io/client-go/tools/watch/until.go b/staging/src/k8s.io/client-go/tools/watch/until.go index c2772ddb57..4a891b2351 100644 --- a/staging/src/k8s.io/client-go/tools/watch/until.go +++ b/staging/src/k8s.io/client-go/tools/watch/until.go @@ -17,38 +17,39 @@ limitations under the License. package watch import ( + "context" "errors" "time" + "github.com/golang/glog" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" ) // ConditionFunc returns true if the condition has been reached, false if it has not been reached yet, // or an error if the condition cannot be checked and should terminate. In general, it is better to define // level driven conditions over edge driven conditions (pod has ready=true, vs pod modified and ready changed // from false to true). -type ConditionFunc func(event Event) (bool, error) +type ConditionFunc func(event watch.Event) (bool, error) -// ErrWatchClosed is returned when the watch channel is closed before timeout in Until. -var ErrWatchClosed = errors.New("watch closed before Until timeout") +// ErrWatchClosed is returned when the watch channel is closed before timeout in UntilWithoutRetry. +var ErrWatchClosed = errors.New("watch closed before UntilWithoutRetry timeout") -// Until reads items from the watch until each provided condition succeeds, and then returns the last watch +// UntilWithoutRetry reads items from the watch until each provided condition succeeds, and then returns the last watch // encountered. The first condition that returns an error terminates the watch (and the event is also returned). // If no event has been received, the returned event will be nil. // Conditions are satisfied sequentially so as to provide a useful primitive for higher level composition. -// A zero timeout means to wait forever. -func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc) (*Event, error) { +// Waits until context deadline or until context is canceled. +// +// Warning: Unless you have a very specific use case (probably a special Watcher) don't use this function!!! +// Warning: This will fail e.g. on API timeouts and/or 'too old resource version' error. +// Warning: You are most probably looking for a function *Until* or *UntilWithSync* below, +// Warning: solving such issues. +// TODO: Consider making this function private to prevent misuse when the other occurrences in our codebase are gone. +func UntilWithoutRetry(ctx context.Context, watcher watch.Interface, conditions ...ConditionFunc) (*watch.Event, error) { ch := watcher.ResultChan() defer watcher.Stop() - var after <-chan time.Time - if timeout > 0 { - after = time.After(timeout) - } else { - ch := make(chan time.Time) - defer close(ch) - after = ch - } - var lastEvent *Event + var lastEvent *watch.Event for _, condition := range conditions { // check the next condition against the previous event and short circuit waiting for the next watch if lastEvent != nil { @@ -69,7 +70,6 @@ func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc } lastEvent = &event - // TODO: check for watch expired error and retry watch from latest point? done, err := condition(event) if err != nil { return lastEvent, err @@ -78,10 +78,25 @@ func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc break ConditionSucceeded } - case <-after: + case <-ctx.Done(): return lastEvent, wait.ErrWaitTimeout } } } return lastEvent, nil } + +// ContextWithOptionalTimeout wraps context.WithTimeout and handles infinite timeouts expressed as 0 duration. +func ContextWithOptionalTimeout(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc) { + if timeout < 0 { + // This should be handled in validation + glog.Errorf("Timeout for context shall not be negative!") + timeout = 0 + } + + if timeout == 0 { + return context.WithCancel(parent) + } + + return context.WithTimeout(parent, timeout) +} diff --git a/staging/src/k8s.io/client-go/tools/watch/until_test.go b/staging/src/k8s.io/client-go/tools/watch/until_test.go index e872c36813..e766acd736 100644 --- a/staging/src/k8s.io/client-go/tools/watch/until_test.go +++ b/staging/src/k8s.io/client-go/tools/watch/until_test.go @@ -17,6 +17,7 @@ limitations under the License. package watch import ( + "context" "errors" "strings" "testing" @@ -25,6 +26,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" ) type fakePod struct { @@ -35,26 +37,26 @@ func (obj *fakePod) GetObjectKind() schema.ObjectKind { return schema.EmptyObjec func (obj *fakePod) DeepCopyObject() runtime.Object { panic("DeepCopyObject not supported by fakePod") } func TestUntil(t *testing.T) { - fw := NewFake() + fw := watch.NewFake() go func() { var obj *fakePod fw.Add(obj) fw.Modify(obj) }() conditions := []ConditionFunc{ - func(event Event) (bool, error) { return event.Type == Added, nil }, - func(event Event) (bool, error) { return event.Type == Modified, nil }, + func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil }, + func(event watch.Event) (bool, error) { return event.Type == watch.Modified, nil }, } - timeout := time.Minute - lastEvent, err := Until(timeout, fw, conditions...) + ctx, _ := context.WithTimeout(context.Background(), time.Minute) + lastEvent, err := UntilWithoutRetry(ctx, fw, conditions...) if err != nil { t.Fatalf("expected nil error, got %#v", err) } if lastEvent == nil { t.Fatal("expected an event") } - if lastEvent.Type != Modified { + if lastEvent.Type != watch.Modified { t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type) } if got, isPod := lastEvent.Object.(*fakePod); !isPod { @@ -63,25 +65,25 @@ func TestUntil(t *testing.T) { } func TestUntilMultipleConditions(t *testing.T) { - fw := NewFake() + fw := watch.NewFake() go func() { var obj *fakePod fw.Add(obj) }() conditions := []ConditionFunc{ - func(event Event) (bool, error) { return event.Type == Added, nil }, - func(event Event) (bool, error) { return event.Type == Added, nil }, + func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil }, + func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil }, } - timeout := time.Minute - lastEvent, err := Until(timeout, fw, conditions...) + ctx, _ := context.WithTimeout(context.Background(), time.Minute) + lastEvent, err := UntilWithoutRetry(ctx, fw, conditions...) if err != nil { t.Fatalf("expected nil error, got %#v", err) } if lastEvent == nil { t.Fatal("expected an event") } - if lastEvent.Type != Added { + if lastEvent.Type != watch.Added { t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type) } if got, isPod := lastEvent.Object.(*fakePod); !isPod { @@ -90,26 +92,26 @@ func TestUntilMultipleConditions(t *testing.T) { } func TestUntilMultipleConditionsFail(t *testing.T) { - fw := NewFake() + fw := watch.NewFake() go func() { var obj *fakePod fw.Add(obj) }() conditions := []ConditionFunc{ - func(event Event) (bool, error) { return event.Type == Added, nil }, - func(event Event) (bool, error) { return event.Type == Added, nil }, - func(event Event) (bool, error) { return event.Type == Deleted, nil }, + func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil }, + func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil }, + func(event watch.Event) (bool, error) { return event.Type == watch.Deleted, nil }, } - timeout := 10 * time.Second - lastEvent, err := Until(timeout, fw, conditions...) + ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) + lastEvent, err := UntilWithoutRetry(ctx, fw, conditions...) if err != wait.ErrWaitTimeout { t.Fatalf("expected ErrWaitTimeout error, got %#v", err) } if lastEvent == nil { t.Fatal("expected an event") } - if lastEvent.Type != Added { + if lastEvent.Type != watch.Added { t.Fatalf("expected ADDED event type, got %v", lastEvent.Type) } if got, isPod := lastEvent.Object.(*fakePod); !isPod { @@ -118,30 +120,29 @@ func TestUntilMultipleConditionsFail(t *testing.T) { } func TestUntilTimeout(t *testing.T) { - fw := NewFake() + fw := watch.NewFake() go func() { var obj *fakePod fw.Add(obj) fw.Modify(obj) }() conditions := []ConditionFunc{ - func(event Event) (bool, error) { - return event.Type == Added, nil + func(event watch.Event) (bool, error) { + return event.Type == watch.Added, nil }, - func(event Event) (bool, error) { - return event.Type == Modified, nil + func(event watch.Event) (bool, error) { + return event.Type == watch.Modified, nil }, } - timeout := time.Duration(0) - lastEvent, err := Until(timeout, fw, conditions...) + lastEvent, err := UntilWithoutRetry(context.Background(), fw, conditions...) if err != nil { t.Fatalf("expected nil error, got %#v", err) } if lastEvent == nil { t.Fatal("expected an event") } - if lastEvent.Type != Modified { + if lastEvent.Type != watch.Modified { t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type) } if got, isPod := lastEvent.Object.(*fakePod); !isPod { @@ -150,19 +151,20 @@ func TestUntilTimeout(t *testing.T) { } func TestUntilErrorCondition(t *testing.T) { - fw := NewFake() + fw := watch.NewFake() go func() { var obj *fakePod fw.Add(obj) }() expected := "something bad" conditions := []ConditionFunc{ - func(event Event) (bool, error) { return event.Type == Added, nil }, - func(event Event) (bool, error) { return false, errors.New(expected) }, + func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil }, + func(event watch.Event) (bool, error) { return false, errors.New(expected) }, } - timeout := time.Minute - _, err := Until(timeout, fw, conditions...) + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + _, err := UntilWithoutRetry(ctx, fw, conditions...) if err == nil { t.Fatal("expected an error") } diff --git a/test/e2e/apps/statefulset.go b/test/e2e/apps/statefulset.go index 2ed0332642..cc5b58d974 100644 --- a/test/e2e/apps/statefulset.go +++ b/test/e2e/apps/statefulset.go @@ -17,6 +17,7 @@ limitations under the License. package apps import ( + "context" "fmt" "strings" "time" @@ -31,6 +32,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" clientset "k8s.io/client-go/kubernetes" + watchtools "k8s.io/client-go/tools/watch" "k8s.io/kubernetes/test/e2e/framework" imageutils "k8s.io/kubernetes/test/utils/image" ) @@ -599,7 +601,9 @@ var _ = SIGDescribe("StatefulSet", func() { By("Verifying that stateful set " + ssName + " was scaled up in order") expectedOrder := []string{ssName + "-0", ssName + "-1", ssName + "-2"} - _, err = watch.Until(framework.StatefulSetTimeout, watcher, func(event watch.Event) (bool, error) { + ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), framework.StatefulSetTimeout) + defer cancel() + _, err = watchtools.UntilWithoutRetry(ctx, watcher, func(event watch.Event) (bool, error) { if event.Type != watch.Added { return false, nil } @@ -630,7 +634,9 @@ var _ = SIGDescribe("StatefulSet", func() { By("Verifying that stateful set " + ssName + " was scaled down in reverse order") expectedOrder = []string{ssName + "-2", ssName + "-1", ssName + "-0"} - _, err = watch.Until(framework.StatefulSetTimeout, watcher, func(event watch.Event) (bool, error) { + ctx, cancel = watchtools.ContextWithOptionalTimeout(context.Background(), framework.StatefulSetTimeout) + defer cancel() + _, err = watchtools.UntilWithoutRetry(ctx, watcher, func(event watch.Event) (bool, error) { if event.Type != watch.Deleted { return false, nil } @@ -736,8 +742,10 @@ var _ = SIGDescribe("StatefulSet", func() { By("Waiting until stateful pod " + statefulPodName + " will be recreated and deleted at least once in namespace " + f.Namespace.Name) w, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Watch(metav1.SingleObject(metav1.ObjectMeta{Name: statefulPodName})) framework.ExpectNoError(err) + ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), framework.StatefulPodTimeout) + defer cancel() // we need to get UID from pod in any state and wait until stateful set controller will remove pod atleast once - _, err = watch.Until(framework.StatefulPodTimeout, w, func(event watch.Event) (bool, error) { + _, err = watchtools.UntilWithoutRetry(ctx, w, func(event watch.Event) (bool, error) { pod := event.Object.(*v1.Pod) switch event.Type { case watch.Deleted: @@ -761,7 +769,7 @@ var _ = SIGDescribe("StatefulSet", func() { framework.ExpectNoError(err) By("Waiting when stateful pod " + statefulPodName + " will be recreated in namespace " + f.Namespace.Name + " and will be in running state") - // we may catch delete event, thats why we are waiting for running phase like this, and not with watch.Until + // we may catch delete event, that's why we are waiting for running phase like this, and not with watchtools.UntilWithoutRetry Eventually(func() error { statefulPod, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Get(statefulPodName, metav1.GetOptions{}) if err != nil { diff --git a/test/e2e/common/init_container.go b/test/e2e/common/init_container.go index 698d48157f..df144994ef 100644 --- a/test/e2e/common/init_container.go +++ b/test/e2e/common/init_container.go @@ -17,6 +17,7 @@ limitations under the License. package common import ( + "context" "fmt" "strconv" "time" @@ -26,6 +27,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/watch" + watchtools "k8s.io/client-go/tools/watch" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/client/conditions" "k8s.io/kubernetes/test/e2e/framework" @@ -90,7 +92,9 @@ var _ = framework.KubeDescribe("InitContainer [NodeConformance]", func() { w, err := podClient.Watch(metav1.SingleObject(startedPod.ObjectMeta)) Expect(err).NotTo(HaveOccurred(), "error watching a pod") wr := watch.NewRecorder(w) - event, err := watch.Until(framework.PodStartTimeout, wr, conditions.PodCompleted) + ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), framework.PodStartTimeout) + defer cancel() + event, err := watchtools.UntilWithoutRetry(ctx, wr, conditions.PodCompleted) Expect(err).To(BeNil()) framework.CheckInvariants(wr.Events(), framework.ContainerInitInvariant) endPod := event.Object.(*v1.Pod) @@ -159,7 +163,9 @@ var _ = framework.KubeDescribe("InitContainer [NodeConformance]", func() { w, err := podClient.Watch(metav1.SingleObject(startedPod.ObjectMeta)) Expect(err).NotTo(HaveOccurred(), "error watching a pod") wr := watch.NewRecorder(w) - event, err := watch.Until(framework.PodStartTimeout, wr, conditions.PodRunning) + ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), framework.PodStartTimeout) + defer cancel() + event, err := watchtools.UntilWithoutRetry(ctx, wr, conditions.PodRunning) Expect(err).To(BeNil()) framework.CheckInvariants(wr.Events(), framework.ContainerInitInvariant) endPod := event.Object.(*v1.Pod) @@ -230,8 +236,10 @@ var _ = framework.KubeDescribe("InitContainer [NodeConformance]", func() { Expect(err).NotTo(HaveOccurred(), "error watching a pod") wr := watch.NewRecorder(w) - event, err := watch.Until( - framework.PodStartTimeout, wr, + ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), framework.PodStartTimeout) + defer cancel() + event, err := watchtools.UntilWithoutRetry( + ctx, wr, // check for the first container to fail at least once func(evt watch.Event) (bool, error) { switch t := evt.Object.(type) { @@ -346,8 +354,10 @@ var _ = framework.KubeDescribe("InitContainer [NodeConformance]", func() { Expect(err).NotTo(HaveOccurred(), "error watching a pod") wr := watch.NewRecorder(w) - event, err := watch.Until( - framework.PodStartTimeout, wr, + ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), framework.PodStartTimeout) + defer cancel() + event, err := watchtools.UntilWithoutRetry( + ctx, wr, // check for the second container to fail at least once func(evt watch.Event) (bool, error) { switch t := evt.Object.(type) { diff --git a/test/e2e/framework/deployment_util.go b/test/e2e/framework/deployment_util.go index f039487ae3..a525d47ecc 100644 --- a/test/e2e/framework/deployment_util.go +++ b/test/e2e/framework/deployment_util.go @@ -17,6 +17,7 @@ limitations under the License. package framework import ( + "context" "fmt" "time" @@ -30,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/watch" clientset "k8s.io/client-go/kubernetes" scaleclient "k8s.io/client-go/scale" + watchtools "k8s.io/client-go/tools/watch" appsinternal "k8s.io/kubernetes/pkg/apis/apps" deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util" testutils "k8s.io/kubernetes/test/utils" @@ -173,7 +175,9 @@ func WatchRecreateDeployment(c clientset.Interface, d *apps.Deployment) error { d.Generation <= d.Status.ObservedGeneration, nil } - _, err = watch.Until(2*time.Minute, w, condition) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + _, err = watchtools.UntilWithoutRetry(ctx, w, condition) if err == wait.ErrWaitTimeout { err = fmt.Errorf("deployment %q never completed: %#v", d.Name, status) } diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index 4d7d2b8c52..a9a7c18685 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -66,15 +66,15 @@ import ( "k8s.io/apimachinery/pkg/util/wait" utilyaml "k8s.io/apimachinery/pkg/util/yaml" "k8s.io/apimachinery/pkg/watch" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/discovery" "k8s.io/client-go/dynamic" + clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" + scaleclient "k8s.io/client-go/scale" "k8s.io/client-go/tools/clientcmd" clientcmdapi "k8s.io/client-go/tools/clientcmd/api" - - utilfeature "k8s.io/apiserver/pkg/util/feature" - clientset "k8s.io/client-go/kubernetes" - scaleclient "k8s.io/client-go/scale" + watchtools "k8s.io/client-go/tools/watch" "k8s.io/kubernetes/pkg/api/legacyscheme" podutil "k8s.io/kubernetes/pkg/api/v1/pod" appsinternal "k8s.io/kubernetes/pkg/apis/apps" @@ -862,7 +862,9 @@ func waitForServiceAccountInNamespace(c clientset.Interface, ns, serviceAccountN if err != nil { return err } - _, err = watch.Until(timeout, w, conditions.ServiceAccountHasSecrets) + ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout) + defer cancel() + _, err = watchtools.UntilWithoutRetry(ctx, w, conditions.ServiceAccountHasSecrets) return err } @@ -1578,7 +1580,9 @@ func WaitForRCToStabilize(c clientset.Interface, ns, name string, timeout time.D if err != nil { return err } - _, err = watch.Until(timeout, w, func(event watch.Event) (bool, error) { + ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout) + defer cancel() + _, err = watchtools.UntilWithoutRetry(ctx, w, func(event watch.Event) (bool, error) { switch event.Type { case watch.Deleted: return false, apierrs.NewNotFound(schema.GroupResource{Resource: "replicationcontrollers"}, "") diff --git a/test/e2e_node/apparmor_test.go b/test/e2e_node/apparmor_test.go index 6c9c713f77..0bf75efe64 100644 --- a/test/e2e_node/apparmor_test.go +++ b/test/e2e_node/apparmor_test.go @@ -18,6 +18,7 @@ package e2e_node import ( "bytes" + "context" "fmt" "io/ioutil" "os" @@ -31,6 +32,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" + watchtools "k8s.io/client-go/tools/watch" "k8s.io/kubernetes/pkg/security/apparmor" "k8s.io/kubernetes/test/e2e/framework" @@ -151,7 +153,9 @@ func runAppArmorTest(f *framework.Framework, shouldRun bool, profile string) v1. // Pod should remain in the pending state. Wait for the Reason to be set to "AppArmor". w, err := f.PodClient().Watch(metav1.SingleObject(metav1.ObjectMeta{Name: pod.Name})) framework.ExpectNoError(err) - _, err = watch.Until(framework.PodStartTimeout, w, func(e watch.Event) (bool, error) { + ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), framework.PodStartTimeout) + defer cancel() + _, err = watchtools.UntilWithoutRetry(ctx, w, func(e watch.Event) (bool, error) { switch e.Type { case watch.Deleted: return false, errors.NewNotFound(schema.GroupResource{Resource: "pods"}, pod.Name) diff --git a/test/integration/auth/rbac_test.go b/test/integration/auth/rbac_test.go index 6f642a5b11..0c386e4d97 100644 --- a/test/integration/auth/rbac_test.go +++ b/test/integration/auth/rbac_test.go @@ -17,6 +17,7 @@ limitations under the License. package auth import ( + "context" "fmt" "io" "io/ioutil" @@ -38,6 +39,7 @@ import ( "k8s.io/apiserver/pkg/registry/generic" externalclientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" + watchtools "k8s.io/client-go/tools/watch" "k8s.io/client-go/transport" "k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/pkg/api/testapi" @@ -583,7 +585,9 @@ func TestBootstrapping(t *testing.T) { if err != nil { t.Fatalf("unexpected error: %v", err) } - _, err = watch.Until(30*time.Second, watcher, func(event watch.Event) (bool, error) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + _, err = watchtools.UntilWithoutRetry(ctx, watcher, func(event watch.Event) (bool, error) { if event.Type != watch.Added { return false, nil } diff --git a/test/integration/quota/quota_test.go b/test/integration/quota/quota_test.go index 4689a7dd68..9bc6f384bf 100644 --- a/test/integration/quota/quota_test.go +++ b/test/integration/quota/quota_test.go @@ -17,6 +17,7 @@ limitations under the License. package quota import ( + "context" "fmt" "net/http" "net/http/httptest" @@ -35,6 +36,7 @@ import ( clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/record" + watchtools "k8s.io/client-go/tools/watch" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" internalinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" "k8s.io/kubernetes/pkg/controller" @@ -161,7 +163,9 @@ func waitForQuota(t *testing.T, quota *v1.ResourceQuota, clientset *clientset.Cl t.Fatalf("unexpected error: %v", err) } - _, err = watch.Until(1*time.Minute, w, func(event watch.Event) (bool, error) { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + defer cancel() + _, err = watchtools.UntilWithoutRetry(ctx, w, func(event watch.Event) (bool, error) { switch event.Type { case watch.Modified: default: @@ -218,7 +222,9 @@ func scale(t *testing.T, namespace string, clientset *clientset.Clientset) { t.Fatalf("unexpected error: %v", err) } - _, err = watch.Until(3*time.Minute, w, func(event watch.Event) (bool, error) { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + defer cancel() + _, err = watchtools.UntilWithoutRetry(ctx, w, func(event watch.Event) (bool, error) { switch event.Type { case watch.Modified: default: