mirror of https://github.com/k3s-io/k3s
Rename Until to UntilWithoutRetry and move to using context so it's
cancelablepull/8/head
parent
ccb92f6ef8
commit
3d4a02abb5
|
@ -30,6 +30,7 @@ import (
|
|||
"k8s.io/apimachinery/pkg/watch"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
. "k8s.io/client-go/tools/cache"
|
||||
watchtools "k8s.io/client-go/tools/watch"
|
||||
utiltesting "k8s.io/client-go/util/testing"
|
||||
"k8s.io/kubernetes/pkg/api/testapi"
|
||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||
|
@ -201,7 +202,7 @@ func TestListWatchUntil(t *testing.T) {
|
|||
watch: fw,
|
||||
}
|
||||
|
||||
conditions := []watch.ConditionFunc{
|
||||
conditions := []watchtools.ConditionFunc{
|
||||
func(event watch.Event) (bool, error) {
|
||||
t.Logf("got %#v", event)
|
||||
return event.Type == watch.Added, nil
|
||||
|
|
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||
package get
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
|
@ -36,6 +37,7 @@ import (
|
|||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/client-go/rest"
|
||||
watchtools "k8s.io/client-go/tools/watch"
|
||||
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
||||
api "k8s.io/kubernetes/pkg/apis/core"
|
||||
"k8s.io/kubernetes/pkg/kubectl"
|
||||
|
@ -564,9 +566,11 @@ func (o *GetOptions) watch(f cmdutil.Factory, cmd *cobra.Command, args []string)
|
|||
}
|
||||
|
||||
first := true
|
||||
intr := interrupt.New(nil, w.Stop)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
intr := interrupt.New(nil, cancel)
|
||||
intr.Run(func() error {
|
||||
_, err := watch.Until(0, w, func(e watch.Event) (bool, error) {
|
||||
_, err := watchtools.UntilWithoutRetry(ctx, w, func(e watch.Event) (bool, error) {
|
||||
if !isList && first {
|
||||
// drop the initial watch event in the single resource case
|
||||
first = false
|
||||
|
|
|
@ -17,12 +17,15 @@ limitations under the License.
|
|||
package rollout
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
watchtools "k8s.io/client-go/tools/watch"
|
||||
"k8s.io/kubernetes/pkg/kubectl"
|
||||
"k8s.io/kubernetes/pkg/kubectl/cmd/templates"
|
||||
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
|
||||
|
@ -191,9 +194,13 @@ func (o *RolloutStatusOptions) Run() error {
|
|||
}
|
||||
|
||||
// if the rollout isn't done yet, keep watching deployment status
|
||||
intr := interrupt.New(nil, w.Stop)
|
||||
// TODO: expose timeout
|
||||
timeout := 0 * time.Second
|
||||
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout)
|
||||
defer cancel()
|
||||
intr := interrupt.New(nil, cancel)
|
||||
return intr.Run(func() error {
|
||||
_, err := watch.Until(0, w, func(e watch.Event) (bool, error) {
|
||||
_, err := watchtools.UntilWithoutRetry(ctx, w, func(e watch.Event) (bool, error) {
|
||||
// print deployment's status
|
||||
status, done, err := statusViewer.Status(info.Namespace, info.Name, o.Revision)
|
||||
if err != nil {
|
||||
|
|
|
@ -17,12 +17,13 @@ limitations under the License.
|
|||
package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/docker/distribution/reference"
|
||||
"github.com/spf13/cobra"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"github.com/spf13/cobra"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
|
@ -34,6 +35,7 @@ import (
|
|||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
watchtools "k8s.io/client-go/tools/watch"
|
||||
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
||||
"k8s.io/kubernetes/pkg/kubectl"
|
||||
"k8s.io/kubernetes/pkg/kubectl/cmd/templates"
|
||||
|
@ -467,16 +469,19 @@ func (o *RunOptions) removeCreatedObjects(f cmdutil.Factory, createdObjects []*R
|
|||
}
|
||||
|
||||
// waitForPod watches the given pod until the exitCondition is true
|
||||
func waitForPod(podClient corev1client.PodsGetter, ns, name string, exitCondition watch.ConditionFunc) (*corev1.Pod, error) {
|
||||
func waitForPod(podClient corev1client.PodsGetter, ns, name string, exitCondition watchtools.ConditionFunc) (*corev1.Pod, error) {
|
||||
w, err := podClient.Pods(ns).Watch(metav1.SingleObject(metav1.ObjectMeta{Name: name}))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
intr := interrupt.New(nil, w.Stop)
|
||||
// TODO: expose the timeout
|
||||
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), 0*time.Second)
|
||||
defer cancel()
|
||||
intr := interrupt.New(nil, cancel)
|
||||
var result *corev1.Pod
|
||||
err = intr.Run(func() error {
|
||||
ev, err := watch.Until(0, w, func(ev watch.Event) (bool, error) {
|
||||
ev, err := watchtools.UntilWithoutRetry(ctx, w, func(ev watch.Event) (bool, error) {
|
||||
return exitCondition(ev)
|
||||
})
|
||||
if ev != nil {
|
||||
|
|
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||
package wait
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
@ -33,6 +34,7 @@ import (
|
|||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/client-go/dynamic"
|
||||
watchtools "k8s.io/client-go/tools/watch"
|
||||
"k8s.io/kubernetes/pkg/kubectl/cmd/templates"
|
||||
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
|
||||
"k8s.io/kubernetes/pkg/kubectl/genericclioptions"
|
||||
|
@ -272,11 +274,14 @@ func IsDeleted(info *resource.Info, o *WaitOptions) (runtime.Object, bool, error
|
|||
// we're out of time
|
||||
return gottenObj, false, wait.ErrWaitTimeout
|
||||
}
|
||||
watchEvent, err := watch.Until(o.Timeout, objWatch, isDeleted)
|
||||
|
||||
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), o.Timeout)
|
||||
watchEvent, err := watchtools.UntilWithoutRetry(ctx, objWatch, isDeleted)
|
||||
cancel()
|
||||
switch {
|
||||
case err == nil:
|
||||
return watchEvent.Object, true, nil
|
||||
case err == watch.ErrWatchClosed:
|
||||
case err == watchtools.ErrWatchClosed:
|
||||
continue
|
||||
case err == wait.ErrWaitTimeout:
|
||||
if watchEvent != nil {
|
||||
|
@ -334,11 +339,14 @@ func (w ConditionalWait) IsConditionMet(info *resource.Info, o *WaitOptions) (ru
|
|||
// we're out of time
|
||||
return gottenObj, false, wait.ErrWaitTimeout
|
||||
}
|
||||
watchEvent, err := watch.Until(o.Timeout, objWatch, w.isConditionMet)
|
||||
|
||||
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), o.Timeout)
|
||||
watchEvent, err := watchtools.UntilWithoutRetry(ctx, objWatch, w.isConditionMet)
|
||||
cancel()
|
||||
switch {
|
||||
case err == nil:
|
||||
return watchEvent.Object, true, nil
|
||||
case err == watch.ErrWatchClosed:
|
||||
case err == watchtools.ErrWatchClosed:
|
||||
continue
|
||||
case err == wait.ErrWaitTimeout:
|
||||
if watchEvent != nil {
|
||||
|
|
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||
package polymorphichelpers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
"time"
|
||||
|
@ -33,6 +34,7 @@ import (
|
|||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
coreclient "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
watchtools "k8s.io/client-go/tools/watch"
|
||||
"k8s.io/kubernetes/pkg/apis/apps"
|
||||
"k8s.io/kubernetes/pkg/apis/batch"
|
||||
api "k8s.io/kubernetes/pkg/apis/core"
|
||||
|
@ -69,7 +71,10 @@ func GetFirstPod(client coreclient.PodsGetter, namespace string, selector string
|
|||
condition := func(event watch.Event) (bool, error) {
|
||||
return event.Type == watch.Added || event.Type == watch.Modified, nil
|
||||
}
|
||||
event, err := watch.Until(timeout, w, condition)
|
||||
|
||||
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout)
|
||||
defer cancel()
|
||||
event, err := watchtools.UntilWithoutRetry(ctx, w, condition)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ import (
|
|||
"k8s.io/apimachinery/pkg/watch"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/pager"
|
||||
watchtools "k8s.io/client-go/tools/watch"
|
||||
)
|
||||
|
||||
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
|
||||
|
@ -116,7 +117,7 @@ func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error)
|
|||
// ListWatchUntil checks the provided conditions against the items returned by the list watcher, returning wait.ErrWaitTimeout
|
||||
// if timeout is exceeded without all conditions returning true, or an error if an error occurs.
|
||||
// TODO: check for watch expired error and retry watch from latest point? Same issue exists for Until.
|
||||
func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watch.ConditionFunc) (*watch.Event, error) {
|
||||
func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watchtools.ConditionFunc) (*watch.Event, error) {
|
||||
if len(conditions) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
@ -178,8 +179,10 @@ func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watch
|
|||
return nil, err
|
||||
}
|
||||
|
||||
evt, err := watch.Until(timeout, watchInterface, remainingConditions...)
|
||||
if err == watch.ErrWatchClosed {
|
||||
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout)
|
||||
defer cancel()
|
||||
evt, err := watchtools.UntilWithoutRetry(ctx, watchInterface, remainingConditions...)
|
||||
if err == watchtools.ErrWatchClosed {
|
||||
// present a consistent error interface to callers
|
||||
err = wait.ErrWaitTimeout
|
||||
}
|
||||
|
|
|
@ -17,38 +17,39 @@ limitations under the License.
|
|||
package watch
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
)
|
||||
|
||||
// ConditionFunc returns true if the condition has been reached, false if it has not been reached yet,
|
||||
// or an error if the condition cannot be checked and should terminate. In general, it is better to define
|
||||
// level driven conditions over edge driven conditions (pod has ready=true, vs pod modified and ready changed
|
||||
// from false to true).
|
||||
type ConditionFunc func(event Event) (bool, error)
|
||||
type ConditionFunc func(event watch.Event) (bool, error)
|
||||
|
||||
// ErrWatchClosed is returned when the watch channel is closed before timeout in Until.
|
||||
var ErrWatchClosed = errors.New("watch closed before Until timeout")
|
||||
// ErrWatchClosed is returned when the watch channel is closed before timeout in UntilWithoutRetry.
|
||||
var ErrWatchClosed = errors.New("watch closed before UntilWithoutRetry timeout")
|
||||
|
||||
// Until reads items from the watch until each provided condition succeeds, and then returns the last watch
|
||||
// UntilWithoutRetry reads items from the watch until each provided condition succeeds, and then returns the last watch
|
||||
// encountered. The first condition that returns an error terminates the watch (and the event is also returned).
|
||||
// If no event has been received, the returned event will be nil.
|
||||
// Conditions are satisfied sequentially so as to provide a useful primitive for higher level composition.
|
||||
// A zero timeout means to wait forever.
|
||||
func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc) (*Event, error) {
|
||||
// Waits until context deadline or until context is canceled.
|
||||
//
|
||||
// Warning: Unless you have a very specific use case (probably a special Watcher) don't use this function!!!
|
||||
// Warning: This will fail e.g. on API timeouts and/or 'too old resource version' error.
|
||||
// Warning: You are most probably looking for a function *Until* or *UntilWithSync* below,
|
||||
// Warning: solving such issues.
|
||||
// TODO: Consider making this function private to prevent misuse when the other occurrences in our codebase are gone.
|
||||
func UntilWithoutRetry(ctx context.Context, watcher watch.Interface, conditions ...ConditionFunc) (*watch.Event, error) {
|
||||
ch := watcher.ResultChan()
|
||||
defer watcher.Stop()
|
||||
var after <-chan time.Time
|
||||
if timeout > 0 {
|
||||
after = time.After(timeout)
|
||||
} else {
|
||||
ch := make(chan time.Time)
|
||||
defer close(ch)
|
||||
after = ch
|
||||
}
|
||||
var lastEvent *Event
|
||||
var lastEvent *watch.Event
|
||||
for _, condition := range conditions {
|
||||
// check the next condition against the previous event and short circuit waiting for the next watch
|
||||
if lastEvent != nil {
|
||||
|
@ -69,7 +70,6 @@ func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc
|
|||
}
|
||||
lastEvent = &event
|
||||
|
||||
// TODO: check for watch expired error and retry watch from latest point?
|
||||
done, err := condition(event)
|
||||
if err != nil {
|
||||
return lastEvent, err
|
||||
|
@ -78,10 +78,25 @@ func Until(timeout time.Duration, watcher Interface, conditions ...ConditionFunc
|
|||
break ConditionSucceeded
|
||||
}
|
||||
|
||||
case <-after:
|
||||
case <-ctx.Done():
|
||||
return lastEvent, wait.ErrWaitTimeout
|
||||
}
|
||||
}
|
||||
}
|
||||
return lastEvent, nil
|
||||
}
|
||||
|
||||
// ContextWithOptionalTimeout wraps context.WithTimeout and handles infinite timeouts expressed as 0 duration.
|
||||
func ContextWithOptionalTimeout(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc) {
|
||||
if timeout < 0 {
|
||||
// This should be handled in validation
|
||||
glog.Errorf("Timeout for context shall not be negative!")
|
||||
timeout = 0
|
||||
}
|
||||
|
||||
if timeout == 0 {
|
||||
return context.WithCancel(parent)
|
||||
}
|
||||
|
||||
return context.WithTimeout(parent, timeout)
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||
package watch
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strings"
|
||||
"testing"
|
||||
|
@ -25,6 +26,7 @@ import (
|
|||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
)
|
||||
|
||||
type fakePod struct {
|
||||
|
@ -35,26 +37,26 @@ func (obj *fakePod) GetObjectKind() schema.ObjectKind { return schema.EmptyObjec
|
|||
func (obj *fakePod) DeepCopyObject() runtime.Object { panic("DeepCopyObject not supported by fakePod") }
|
||||
|
||||
func TestUntil(t *testing.T) {
|
||||
fw := NewFake()
|
||||
fw := watch.NewFake()
|
||||
go func() {
|
||||
var obj *fakePod
|
||||
fw.Add(obj)
|
||||
fw.Modify(obj)
|
||||
}()
|
||||
conditions := []ConditionFunc{
|
||||
func(event Event) (bool, error) { return event.Type == Added, nil },
|
||||
func(event Event) (bool, error) { return event.Type == Modified, nil },
|
||||
func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil },
|
||||
func(event watch.Event) (bool, error) { return event.Type == watch.Modified, nil },
|
||||
}
|
||||
|
||||
timeout := time.Minute
|
||||
lastEvent, err := Until(timeout, fw, conditions...)
|
||||
ctx, _ := context.WithTimeout(context.Background(), time.Minute)
|
||||
lastEvent, err := UntilWithoutRetry(ctx, fw, conditions...)
|
||||
if err != nil {
|
||||
t.Fatalf("expected nil error, got %#v", err)
|
||||
}
|
||||
if lastEvent == nil {
|
||||
t.Fatal("expected an event")
|
||||
}
|
||||
if lastEvent.Type != Modified {
|
||||
if lastEvent.Type != watch.Modified {
|
||||
t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type)
|
||||
}
|
||||
if got, isPod := lastEvent.Object.(*fakePod); !isPod {
|
||||
|
@ -63,25 +65,25 @@ func TestUntil(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestUntilMultipleConditions(t *testing.T) {
|
||||
fw := NewFake()
|
||||
fw := watch.NewFake()
|
||||
go func() {
|
||||
var obj *fakePod
|
||||
fw.Add(obj)
|
||||
}()
|
||||
conditions := []ConditionFunc{
|
||||
func(event Event) (bool, error) { return event.Type == Added, nil },
|
||||
func(event Event) (bool, error) { return event.Type == Added, nil },
|
||||
func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil },
|
||||
func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil },
|
||||
}
|
||||
|
||||
timeout := time.Minute
|
||||
lastEvent, err := Until(timeout, fw, conditions...)
|
||||
ctx, _ := context.WithTimeout(context.Background(), time.Minute)
|
||||
lastEvent, err := UntilWithoutRetry(ctx, fw, conditions...)
|
||||
if err != nil {
|
||||
t.Fatalf("expected nil error, got %#v", err)
|
||||
}
|
||||
if lastEvent == nil {
|
||||
t.Fatal("expected an event")
|
||||
}
|
||||
if lastEvent.Type != Added {
|
||||
if lastEvent.Type != watch.Added {
|
||||
t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type)
|
||||
}
|
||||
if got, isPod := lastEvent.Object.(*fakePod); !isPod {
|
||||
|
@ -90,26 +92,26 @@ func TestUntilMultipleConditions(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestUntilMultipleConditionsFail(t *testing.T) {
|
||||
fw := NewFake()
|
||||
fw := watch.NewFake()
|
||||
go func() {
|
||||
var obj *fakePod
|
||||
fw.Add(obj)
|
||||
}()
|
||||
conditions := []ConditionFunc{
|
||||
func(event Event) (bool, error) { return event.Type == Added, nil },
|
||||
func(event Event) (bool, error) { return event.Type == Added, nil },
|
||||
func(event Event) (bool, error) { return event.Type == Deleted, nil },
|
||||
func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil },
|
||||
func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil },
|
||||
func(event watch.Event) (bool, error) { return event.Type == watch.Deleted, nil },
|
||||
}
|
||||
|
||||
timeout := 10 * time.Second
|
||||
lastEvent, err := Until(timeout, fw, conditions...)
|
||||
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
lastEvent, err := UntilWithoutRetry(ctx, fw, conditions...)
|
||||
if err != wait.ErrWaitTimeout {
|
||||
t.Fatalf("expected ErrWaitTimeout error, got %#v", err)
|
||||
}
|
||||
if lastEvent == nil {
|
||||
t.Fatal("expected an event")
|
||||
}
|
||||
if lastEvent.Type != Added {
|
||||
if lastEvent.Type != watch.Added {
|
||||
t.Fatalf("expected ADDED event type, got %v", lastEvent.Type)
|
||||
}
|
||||
if got, isPod := lastEvent.Object.(*fakePod); !isPod {
|
||||
|
@ -118,30 +120,29 @@ func TestUntilMultipleConditionsFail(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestUntilTimeout(t *testing.T) {
|
||||
fw := NewFake()
|
||||
fw := watch.NewFake()
|
||||
go func() {
|
||||
var obj *fakePod
|
||||
fw.Add(obj)
|
||||
fw.Modify(obj)
|
||||
}()
|
||||
conditions := []ConditionFunc{
|
||||
func(event Event) (bool, error) {
|
||||
return event.Type == Added, nil
|
||||
func(event watch.Event) (bool, error) {
|
||||
return event.Type == watch.Added, nil
|
||||
},
|
||||
func(event Event) (bool, error) {
|
||||
return event.Type == Modified, nil
|
||||
func(event watch.Event) (bool, error) {
|
||||
return event.Type == watch.Modified, nil
|
||||
},
|
||||
}
|
||||
|
||||
timeout := time.Duration(0)
|
||||
lastEvent, err := Until(timeout, fw, conditions...)
|
||||
lastEvent, err := UntilWithoutRetry(context.Background(), fw, conditions...)
|
||||
if err != nil {
|
||||
t.Fatalf("expected nil error, got %#v", err)
|
||||
}
|
||||
if lastEvent == nil {
|
||||
t.Fatal("expected an event")
|
||||
}
|
||||
if lastEvent.Type != Modified {
|
||||
if lastEvent.Type != watch.Modified {
|
||||
t.Fatalf("expected MODIFIED event type, got %v", lastEvent.Type)
|
||||
}
|
||||
if got, isPod := lastEvent.Object.(*fakePod); !isPod {
|
||||
|
@ -150,19 +151,20 @@ func TestUntilTimeout(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestUntilErrorCondition(t *testing.T) {
|
||||
fw := NewFake()
|
||||
fw := watch.NewFake()
|
||||
go func() {
|
||||
var obj *fakePod
|
||||
fw.Add(obj)
|
||||
}()
|
||||
expected := "something bad"
|
||||
conditions := []ConditionFunc{
|
||||
func(event Event) (bool, error) { return event.Type == Added, nil },
|
||||
func(event Event) (bool, error) { return false, errors.New(expected) },
|
||||
func(event watch.Event) (bool, error) { return event.Type == watch.Added, nil },
|
||||
func(event watch.Event) (bool, error) { return false, errors.New(expected) },
|
||||
}
|
||||
|
||||
timeout := time.Minute
|
||||
_, err := Until(timeout, fw, conditions...)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
|
||||
defer cancel()
|
||||
_, err := UntilWithoutRetry(ctx, fw, conditions...)
|
||||
if err == nil {
|
||||
t.Fatal("expected an error")
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||
package apps
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
@ -31,6 +32,7 @@ import (
|
|||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
watchtools "k8s.io/client-go/tools/watch"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
imageutils "k8s.io/kubernetes/test/utils/image"
|
||||
)
|
||||
|
@ -599,7 +601,9 @@ var _ = SIGDescribe("StatefulSet", func() {
|
|||
|
||||
By("Verifying that stateful set " + ssName + " was scaled up in order")
|
||||
expectedOrder := []string{ssName + "-0", ssName + "-1", ssName + "-2"}
|
||||
_, err = watch.Until(framework.StatefulSetTimeout, watcher, func(event watch.Event) (bool, error) {
|
||||
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), framework.StatefulSetTimeout)
|
||||
defer cancel()
|
||||
_, err = watchtools.UntilWithoutRetry(ctx, watcher, func(event watch.Event) (bool, error) {
|
||||
if event.Type != watch.Added {
|
||||
return false, nil
|
||||
}
|
||||
|
@ -630,7 +634,9 @@ var _ = SIGDescribe("StatefulSet", func() {
|
|||
|
||||
By("Verifying that stateful set " + ssName + " was scaled down in reverse order")
|
||||
expectedOrder = []string{ssName + "-2", ssName + "-1", ssName + "-0"}
|
||||
_, err = watch.Until(framework.StatefulSetTimeout, watcher, func(event watch.Event) (bool, error) {
|
||||
ctx, cancel = watchtools.ContextWithOptionalTimeout(context.Background(), framework.StatefulSetTimeout)
|
||||
defer cancel()
|
||||
_, err = watchtools.UntilWithoutRetry(ctx, watcher, func(event watch.Event) (bool, error) {
|
||||
if event.Type != watch.Deleted {
|
||||
return false, nil
|
||||
}
|
||||
|
@ -736,8 +742,10 @@ var _ = SIGDescribe("StatefulSet", func() {
|
|||
By("Waiting until stateful pod " + statefulPodName + " will be recreated and deleted at least once in namespace " + f.Namespace.Name)
|
||||
w, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Watch(metav1.SingleObject(metav1.ObjectMeta{Name: statefulPodName}))
|
||||
framework.ExpectNoError(err)
|
||||
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), framework.StatefulPodTimeout)
|
||||
defer cancel()
|
||||
// we need to get UID from pod in any state and wait until stateful set controller will remove pod atleast once
|
||||
_, err = watch.Until(framework.StatefulPodTimeout, w, func(event watch.Event) (bool, error) {
|
||||
_, err = watchtools.UntilWithoutRetry(ctx, w, func(event watch.Event) (bool, error) {
|
||||
pod := event.Object.(*v1.Pod)
|
||||
switch event.Type {
|
||||
case watch.Deleted:
|
||||
|
@ -761,7 +769,7 @@ var _ = SIGDescribe("StatefulSet", func() {
|
|||
framework.ExpectNoError(err)
|
||||
|
||||
By("Waiting when stateful pod " + statefulPodName + " will be recreated in namespace " + f.Namespace.Name + " and will be in running state")
|
||||
// we may catch delete event, thats why we are waiting for running phase like this, and not with watch.Until
|
||||
// we may catch delete event, that's why we are waiting for running phase like this, and not with watchtools.UntilWithoutRetry
|
||||
Eventually(func() error {
|
||||
statefulPod, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Get(statefulPodName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
|
|
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||
package common
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
@ -26,6 +27,7 @@ import (
|
|||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/uuid"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
watchtools "k8s.io/client-go/tools/watch"
|
||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||
"k8s.io/kubernetes/pkg/client/conditions"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
|
@ -90,7 +92,9 @@ var _ = framework.KubeDescribe("InitContainer [NodeConformance]", func() {
|
|||
w, err := podClient.Watch(metav1.SingleObject(startedPod.ObjectMeta))
|
||||
Expect(err).NotTo(HaveOccurred(), "error watching a pod")
|
||||
wr := watch.NewRecorder(w)
|
||||
event, err := watch.Until(framework.PodStartTimeout, wr, conditions.PodCompleted)
|
||||
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), framework.PodStartTimeout)
|
||||
defer cancel()
|
||||
event, err := watchtools.UntilWithoutRetry(ctx, wr, conditions.PodCompleted)
|
||||
Expect(err).To(BeNil())
|
||||
framework.CheckInvariants(wr.Events(), framework.ContainerInitInvariant)
|
||||
endPod := event.Object.(*v1.Pod)
|
||||
|
@ -159,7 +163,9 @@ var _ = framework.KubeDescribe("InitContainer [NodeConformance]", func() {
|
|||
w, err := podClient.Watch(metav1.SingleObject(startedPod.ObjectMeta))
|
||||
Expect(err).NotTo(HaveOccurred(), "error watching a pod")
|
||||
wr := watch.NewRecorder(w)
|
||||
event, err := watch.Until(framework.PodStartTimeout, wr, conditions.PodRunning)
|
||||
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), framework.PodStartTimeout)
|
||||
defer cancel()
|
||||
event, err := watchtools.UntilWithoutRetry(ctx, wr, conditions.PodRunning)
|
||||
Expect(err).To(BeNil())
|
||||
framework.CheckInvariants(wr.Events(), framework.ContainerInitInvariant)
|
||||
endPod := event.Object.(*v1.Pod)
|
||||
|
@ -230,8 +236,10 @@ var _ = framework.KubeDescribe("InitContainer [NodeConformance]", func() {
|
|||
Expect(err).NotTo(HaveOccurred(), "error watching a pod")
|
||||
|
||||
wr := watch.NewRecorder(w)
|
||||
event, err := watch.Until(
|
||||
framework.PodStartTimeout, wr,
|
||||
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), framework.PodStartTimeout)
|
||||
defer cancel()
|
||||
event, err := watchtools.UntilWithoutRetry(
|
||||
ctx, wr,
|
||||
// check for the first container to fail at least once
|
||||
func(evt watch.Event) (bool, error) {
|
||||
switch t := evt.Object.(type) {
|
||||
|
@ -346,8 +354,10 @@ var _ = framework.KubeDescribe("InitContainer [NodeConformance]", func() {
|
|||
Expect(err).NotTo(HaveOccurred(), "error watching a pod")
|
||||
|
||||
wr := watch.NewRecorder(w)
|
||||
event, err := watch.Until(
|
||||
framework.PodStartTimeout, wr,
|
||||
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), framework.PodStartTimeout)
|
||||
defer cancel()
|
||||
event, err := watchtools.UntilWithoutRetry(
|
||||
ctx, wr,
|
||||
// check for the second container to fail at least once
|
||||
func(evt watch.Event) (bool, error) {
|
||||
switch t := evt.Object.(type) {
|
||||
|
|
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||
package framework
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
|
@ -30,6 +31,7 @@ import (
|
|||
"k8s.io/apimachinery/pkg/watch"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
scaleclient "k8s.io/client-go/scale"
|
||||
watchtools "k8s.io/client-go/tools/watch"
|
||||
appsinternal "k8s.io/kubernetes/pkg/apis/apps"
|
||||
deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
|
||||
testutils "k8s.io/kubernetes/test/utils"
|
||||
|
@ -173,7 +175,9 @@ func WatchRecreateDeployment(c clientset.Interface, d *apps.Deployment) error {
|
|||
d.Generation <= d.Status.ObservedGeneration, nil
|
||||
}
|
||||
|
||||
_, err = watch.Until(2*time.Minute, w, condition)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
|
||||
defer cancel()
|
||||
_, err = watchtools.UntilWithoutRetry(ctx, w, condition)
|
||||
if err == wait.ErrWaitTimeout {
|
||||
err = fmt.Errorf("deployment %q never completed: %#v", d.Name, status)
|
||||
}
|
||||
|
|
|
@ -66,15 +66,15 @@ import (
|
|||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
utilyaml "k8s.io/apimachinery/pkg/util/yaml"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/discovery"
|
||||
"k8s.io/client-go/dynamic"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
scaleclient "k8s.io/client-go/scale"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
|
||||
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
scaleclient "k8s.io/client-go/scale"
|
||||
watchtools "k8s.io/client-go/tools/watch"
|
||||
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||
appsinternal "k8s.io/kubernetes/pkg/apis/apps"
|
||||
|
@ -862,7 +862,9 @@ func waitForServiceAccountInNamespace(c clientset.Interface, ns, serviceAccountN
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = watch.Until(timeout, w, conditions.ServiceAccountHasSecrets)
|
||||
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout)
|
||||
defer cancel()
|
||||
_, err = watchtools.UntilWithoutRetry(ctx, w, conditions.ServiceAccountHasSecrets)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -1578,7 +1580,9 @@ func WaitForRCToStabilize(c clientset.Interface, ns, name string, timeout time.D
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = watch.Until(timeout, w, func(event watch.Event) (bool, error) {
|
||||
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout)
|
||||
defer cancel()
|
||||
_, err = watchtools.UntilWithoutRetry(ctx, w, func(event watch.Event) (bool, error) {
|
||||
switch event.Type {
|
||||
case watch.Deleted:
|
||||
return false, apierrs.NewNotFound(schema.GroupResource{Resource: "replicationcontrollers"}, "")
|
||||
|
|
|
@ -18,6 +18,7 @@ package e2e_node
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
|
@ -31,6 +32,7 @@ import (
|
|||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
watchtools "k8s.io/client-go/tools/watch"
|
||||
"k8s.io/kubernetes/pkg/security/apparmor"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
|
||||
|
@ -151,7 +153,9 @@ func runAppArmorTest(f *framework.Framework, shouldRun bool, profile string) v1.
|
|||
// Pod should remain in the pending state. Wait for the Reason to be set to "AppArmor".
|
||||
w, err := f.PodClient().Watch(metav1.SingleObject(metav1.ObjectMeta{Name: pod.Name}))
|
||||
framework.ExpectNoError(err)
|
||||
_, err = watch.Until(framework.PodStartTimeout, w, func(e watch.Event) (bool, error) {
|
||||
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), framework.PodStartTimeout)
|
||||
defer cancel()
|
||||
_, err = watchtools.UntilWithoutRetry(ctx, w, func(e watch.Event) (bool, error) {
|
||||
switch e.Type {
|
||||
case watch.Deleted:
|
||||
return false, errors.NewNotFound(schema.GroupResource{Resource: "pods"}, pod.Name)
|
||||
|
|
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||
package auth
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
|
@ -38,6 +39,7 @@ import (
|
|||
"k8s.io/apiserver/pkg/registry/generic"
|
||||
externalclientset "k8s.io/client-go/kubernetes"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
watchtools "k8s.io/client-go/tools/watch"
|
||||
"k8s.io/client-go/transport"
|
||||
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
||||
"k8s.io/kubernetes/pkg/api/testapi"
|
||||
|
@ -583,7 +585,9 @@ func TestBootstrapping(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
_, err = watch.Until(30*time.Second, watcher, func(event watch.Event) (bool, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
_, err = watchtools.UntilWithoutRetry(ctx, watcher, func(event watch.Event) (bool, error) {
|
||||
if event.Type != watch.Added {
|
||||
return false, nil
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||
package quota
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
|
@ -35,6 +36,7 @@ import (
|
|||
clientset "k8s.io/client-go/kubernetes"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/record"
|
||||
watchtools "k8s.io/client-go/tools/watch"
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||
internalinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
|
@ -161,7 +163,9 @@ func waitForQuota(t *testing.T, quota *v1.ResourceQuota, clientset *clientset.Cl
|
|||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
_, err = watch.Until(1*time.Minute, w, func(event watch.Event) (bool, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
|
||||
defer cancel()
|
||||
_, err = watchtools.UntilWithoutRetry(ctx, w, func(event watch.Event) (bool, error) {
|
||||
switch event.Type {
|
||||
case watch.Modified:
|
||||
default:
|
||||
|
@ -218,7 +222,9 @@ func scale(t *testing.T, namespace string, clientset *clientset.Clientset) {
|
|||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
_, err = watch.Until(3*time.Minute, w, func(event watch.Event) (bool, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
|
||||
defer cancel()
|
||||
_, err = watchtools.UntilWithoutRetry(ctx, w, func(event watch.Event) (bool, error) {
|
||||
switch event.Type {
|
||||
case watch.Modified:
|
||||
default:
|
||||
|
|
Loading…
Reference in New Issue