Merge pull request #66971 from tnozicka/informer-watcher

Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

#50102 Task 2: Add UntilWithSync

**What this PR does / why we need it**:
This is a split off from https://github.com/kubernetes/kubernetes/pull/50102 to go in smaller pieces.

Introduces UntilWithSync based on informer.

**Needs https://github.com/kubernetes/kubernetes/pull/66906 first**
/hold

**Release note**:
```release-note
NONE
```

/priority important-soon
/kind bug
(bug after the main PR which is this split from)
pull/8/head
Kubernetes Submit Queue 2018-08-23 07:26:25 -07:00 committed by GitHub
commit c4f355a2ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 1039 additions and 105 deletions

View File

@ -214,7 +214,7 @@ func TestListWatchUntil(t *testing.T) {
}
timeout := 10 * time.Second
lastEvent, err := ListWatchUntil(timeout, listwatch, conditions...)
lastEvent, err := watchtools.ListWatchUntil(timeout, listwatch, conditions...)
if err != nil {
t.Fatalf("expected nil error, got %#v", err)
}

View File

@ -85,6 +85,7 @@ go_library(
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/client-go/tools/watch:go_default_library",
"//staging/src/k8s.io/client-go/util/integer:go_default_library",
"//staging/src/k8s.io/client-go/util/retry:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",

View File

@ -33,6 +33,7 @@ import (
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
watchtools "k8s.io/client-go/tools/watch"
"k8s.io/kubernetes/pkg/api/legacyscheme"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/serviceaccount"
@ -122,7 +123,7 @@ func (b SAControllerClientBuilder) Config(name string) (*restclient.Config, erro
return b.CoreClient.Secrets(b.Namespace).Watch(options)
},
}
_, err = cache.ListWatchUntil(30*time.Second, lw,
_, err = watchtools.ListWatchUntil(30*time.Second, lw,
func(event watch.Event) (bool, error) {
switch event.Type {
case watch.Deleted:

View File

@ -2058,10 +2058,6 @@
"ImportPath": "k8s.io/client-go/tools/reference",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/client-go/tools/watch",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/client-go/transport",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

View File

@ -268,3 +268,50 @@ func (f *RaceFreeFakeWatcher) Action(action EventType, obj runtime.Object) {
}
}
}
// ProxyWatcher lets you wrap your channel in watch Interface. Threadsafe.
type ProxyWatcher struct {
result chan Event
stopCh chan struct{}
mutex sync.Mutex
stopped bool
}
var _ Interface = &ProxyWatcher{}
// NewProxyWatcher creates new ProxyWatcher by wrapping a channel
func NewProxyWatcher(ch chan Event) *ProxyWatcher {
return &ProxyWatcher{
result: ch,
stopCh: make(chan struct{}),
stopped: false,
}
}
// Stop implements Interface
func (pw *ProxyWatcher) Stop() {
pw.mutex.Lock()
defer pw.mutex.Unlock()
if !pw.stopped {
pw.stopped = true
close(pw.stopCh)
}
}
// Stopping returns true if Stop() has been called
func (pw *ProxyWatcher) Stopping() bool {
pw.mutex.Lock()
defer pw.mutex.Unlock()
return pw.stopped
}
// ResultChan implements Interface
func (pw *ProxyWatcher) ResultChan() <-chan Event {
return pw.result
}
// StopChan returns stop channel
func (pw *ProxyWatcher) StopChan() <-chan struct{} {
return pw.stopCh
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package watch_test
import (
"reflect"
"testing"
"k8s.io/apimachinery/pkg/runtime"
@ -135,3 +136,40 @@ func TestEmpty(t *testing.T) {
t.Errorf("unexpected result channel result")
}
}
func TestProxyWatcher(t *testing.T) {
events := []Event{
{Added, testType("foo")},
{Modified, testType("qux")},
{Modified, testType("bar")},
{Deleted, testType("bar")},
{Error, testType("error: blah")},
}
ch := make(chan Event, len(events))
w := NewProxyWatcher(ch)
for _, e := range events {
ch <- e
}
for _, e := range events {
g := <-w.ResultChan()
if !reflect.DeepEqual(e, g) {
t.Errorf("Expected %#v, got %#v", e, g)
continue
}
}
w.Stop()
select {
// Closed channel always reads immediately
case <-w.StopChan():
default:
t.Error("Channel isn't closed")
}
// Test double close
w.Stop()
}

View File

@ -1782,10 +1782,6 @@
"ImportPath": "k8s.io/client-go/tools/reference",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/client-go/tools/watch",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/client-go/transport",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

View File

@ -81,7 +81,6 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/tools/pager:go_default_library",
"//staging/src/k8s.io/client-go/tools/watch:go_default_library",
"//staging/src/k8s.io/client-go/util/buffer:go_default_library",
"//staging/src/k8s.io/client-go/util/retry:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",

View File

@ -20,15 +20,12 @@ import (
"context"
"time"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/pager"
watchtools "k8s.io/client-go/tools/watch"
)
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
@ -113,78 +110,3 @@ func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) {
func (lw *ListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) {
return lw.WatchFunc(options)
}
// ListWatchUntil checks the provided conditions against the items returned by the list watcher, returning wait.ErrWaitTimeout
// if timeout is exceeded without all conditions returning true, or an error if an error occurs.
// TODO: check for watch expired error and retry watch from latest point? Same issue exists for Until.
func ListWatchUntil(timeout time.Duration, lw ListerWatcher, conditions ...watchtools.ConditionFunc) (*watch.Event, error) {
if len(conditions) == 0 {
return nil, nil
}
list, err := lw.List(metav1.ListOptions{})
if err != nil {
return nil, err
}
initialItems, err := meta.ExtractList(list)
if err != nil {
return nil, err
}
// use the initial items as simulated "adds"
var lastEvent *watch.Event
currIndex := 0
passedConditions := 0
for _, condition := range conditions {
// check the next condition against the previous event and short circuit waiting for the next watch
if lastEvent != nil {
done, err := condition(*lastEvent)
if err != nil {
return lastEvent, err
}
if done {
passedConditions = passedConditions + 1
continue
}
}
ConditionSucceeded:
for currIndex < len(initialItems) {
lastEvent = &watch.Event{Type: watch.Added, Object: initialItems[currIndex]}
currIndex++
done, err := condition(*lastEvent)
if err != nil {
return lastEvent, err
}
if done {
passedConditions = passedConditions + 1
break ConditionSucceeded
}
}
}
if passedConditions == len(conditions) {
return lastEvent, nil
}
remainingConditions := conditions[passedConditions:]
metaObj, err := meta.ListAccessor(list)
if err != nil {
return nil, err
}
currResourceVersion := metaObj.GetResourceVersion()
watchInterface, err := lw.Watch(metav1.ListOptions{ResourceVersion: currResourceVersion})
if err != nil {
return nil, err
}
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout)
defer cancel()
evt, err := watchtools.UntilWithoutRetry(ctx, watchInterface, remainingConditions...)
if err == watchtools.ErrWatchClosed {
// present a consistent error interface to callers
err = wait.ErrWaitTimeout
}
return evt, err
}

View File

@ -2,26 +2,43 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["until.go"],
srcs = [
"informerwatcher.go",
"until.go",
],
importmap = "k8s.io/kubernetes/vendor/k8s.io/client-go/tools/watch",
importpath = "k8s.io/client-go/tools/watch",
visibility = ["//visibility:public"],
deps = [
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["until_test.go"],
srcs = [
"informerwatcher_test.go",
"until_test.go",
],
embed = [":go_default_library"],
deps = [
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/testing:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/github.com/davecgh/go-spew/spew:go_default_library",
],
)

View File

@ -0,0 +1,114 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package watch
import (
"sync"
"sync/atomic"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
)
func newTicketer() *ticketer {
return &ticketer{
cond: sync.NewCond(&sync.Mutex{}),
}
}
type ticketer struct {
counter uint64
cond *sync.Cond
current uint64
}
func (t *ticketer) GetTicket() uint64 {
// -1 to start from 0
return atomic.AddUint64(&t.counter, 1) - 1
}
func (t *ticketer) WaitForTicket(ticket uint64, f func()) {
t.cond.L.Lock()
defer t.cond.L.Unlock()
for ticket != t.current {
t.cond.Wait()
}
f()
t.current++
t.cond.Broadcast()
}
// NewIndexerInformerWatcher will create an IndexerInformer and wrap it into watch.Interface
// so you can use it anywhere where you'd have used a regular Watcher returned from Watch method.
func NewIndexerInformerWatcher(lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface) {
ch := make(chan watch.Event)
w := watch.NewProxyWatcher(ch)
t := newTicketer()
indexer, informer := cache.NewIndexerInformer(lw, objType, 0, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
go t.WaitForTicket(t.GetTicket(), func() {
select {
case ch <- watch.Event{
Type: watch.Added,
Object: obj.(runtime.Object),
}:
case <-w.StopChan():
}
})
},
UpdateFunc: func(old, new interface{}) {
go t.WaitForTicket(t.GetTicket(), func() {
select {
case ch <- watch.Event{
Type: watch.Modified,
Object: new.(runtime.Object),
}:
case <-w.StopChan():
}
})
},
DeleteFunc: func(obj interface{}) {
go t.WaitForTicket(t.GetTicket(), func() {
staleObj, stale := obj.(cache.DeletedFinalStateUnknown)
if stale {
// We have no means of passing the additional information down using watch API based on watch.Event
// but the caller can filter such objects by checking if metadata.deletionTimestamp is set
obj = staleObj
}
select {
case ch <- watch.Event{
Type: watch.Deleted,
Object: obj.(runtime.Object),
}:
case <-w.StopChan():
}
})
},
}, cache.Indexers{})
go func() {
informer.Run(w.StopChan())
}()
return indexer, informer, w
}

View File

@ -0,0 +1,236 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package watch
import (
"math/rand"
"reflect"
"sort"
"testing"
"time"
"github.com/davecgh/go-spew/spew"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/watch"
fakeclientset "k8s.io/client-go/kubernetes/fake"
testcore "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
)
type byEventTypeAndName []watch.Event
func (a byEventTypeAndName) Len() int { return len(a) }
func (a byEventTypeAndName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a byEventTypeAndName) Less(i, j int) bool {
if a[i].Type < a[j].Type {
return true
}
if a[i].Type > a[j].Type {
return false
}
return a[i].Object.(*corev1.Secret).Name < a[j].Object.(*corev1.Secret).Name
}
func TestTicketer(t *testing.T) {
tg := newTicketer()
const numTickets = 100 // current golang limit for race detector is 8192 simultaneously alive goroutines
var tickets []uint64
for i := 0; i < numTickets; i++ {
ticket := tg.GetTicket()
tickets = append(tickets, ticket)
exp, got := uint64(i), ticket
if got != exp {
t.Fatalf("expected ticket %d, got %d", exp, got)
}
}
// shuffle tickets
rand.Shuffle(len(tickets), func(i, j int) {
tickets[i], tickets[j] = tickets[j], tickets[i]
})
res := make(chan uint64, len(tickets))
for _, ticket := range tickets {
go func(ticket uint64) {
time.Sleep(time.Duration(rand.Intn(50)) * time.Millisecond)
tg.WaitForTicket(ticket, func() {
res <- ticket
})
}(ticket)
}
for i := 0; i < numTickets; i++ {
exp, got := uint64(i), <-res
if got != exp {
t.Fatalf("expected ticket %d, got %d", exp, got)
}
}
}
func TestNewInformerWatcher(t *testing.T) {
// Make sure there are no 2 same types of events on a secret with the same name or that might be flaky.
tt := []struct {
name string
objects []runtime.Object
events []watch.Event
}{
{
name: "basic test",
objects: []runtime.Object{
&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-1",
},
StringData: map[string]string{
"foo-1": "initial",
},
},
&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-2",
},
StringData: map[string]string{
"foo-2": "initial",
},
},
&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-3",
},
StringData: map[string]string{
"foo-3": "initial",
},
},
},
events: []watch.Event{
{
Type: watch.Added,
Object: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-4",
},
StringData: map[string]string{
"foo-4": "initial",
},
},
},
{
Type: watch.Modified,
Object: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-2",
},
StringData: map[string]string{
"foo-2": "new",
},
},
},
{
Type: watch.Deleted,
Object: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "pod-3",
},
},
},
},
},
}
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
var expected []watch.Event
for _, o := range tc.objects {
expected = append(expected, watch.Event{
Type: watch.Added,
Object: o.DeepCopyObject(),
})
}
for _, e := range tc.events {
expected = append(expected, *e.DeepCopy())
}
fake := fakeclientset.NewSimpleClientset(tc.objects...)
fakeWatch := watch.NewFakeWithChanSize(len(tc.events), false)
fake.PrependWatchReactor("secrets", testcore.DefaultWatchReactor(fakeWatch, nil))
for _, e := range tc.events {
fakeWatch.Action(e.Type, e.Object)
}
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return fake.Core().Secrets("").List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return fake.Core().Secrets("").Watch(options)
},
}
_, _, w := NewIndexerInformerWatcher(lw, &corev1.Secret{})
var result []watch.Event
loop:
for {
var event watch.Event
var ok bool
select {
case event, ok = <-w.ResultChan():
if !ok {
t.Errorf("Failed to read event: channel is already closed!")
return
}
result = append(result, *event.DeepCopy())
case <-time.After(time.Second * 1):
// All the events are buffered -> this means we are done
// Also the one sec will make sure that we would detect RetryWatcher's incorrect behaviour after last event
break loop
}
}
// Informers don't guarantee event order so we need to sort these arrays to compare them
sort.Sort(byEventTypeAndName(expected))
sort.Sort(byEventTypeAndName(result))
if !reflect.DeepEqual(expected, result) {
t.Error(spew.Errorf("\nexpected: %#v,\ngot: %#v,\ndiff: %s", expected, result, diff.ObjectReflectDiff(expected, result)))
return
}
// Fill in some data to test watch closing while there are some events to be read
for _, e := range tc.events {
fakeWatch.Action(e.Type, e.Object)
}
// Stop before reading all the data to make sure the informer can deal with closed channel
w.Stop()
// Wait a bit to see if the informer won't panic
// TODO: Try to figure out a more reliable mechanism than time.Sleep (https://github.com/kubernetes/kubernetes/pull/50102/files#r184716591)
time.Sleep(1 * time.Second)
})
}
}

View File

@ -19,13 +19,22 @@ package watch
import (
"context"
"errors"
"fmt"
"time"
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
)
// PreconditionFunc returns true if the condition has been reached, false if it has not been reached yet,
// or an error if the condition failed or detected an error state.
type PreconditionFunc func(store cache.Store) (bool, error)
// ConditionFunc returns true if the condition has been reached, false if it has not been reached yet,
// or an error if the condition cannot be checked and should terminate. In general, it is better to define
// level driven conditions over edge driven conditions (pod has ready=true, vs pod modified and ready changed
@ -86,6 +95,42 @@ func UntilWithoutRetry(ctx context.Context, watcher watch.Interface, conditions
return lastEvent, nil
}
// UntilWithSync creates an informer from lw, optionally checks precondition when the store is synced,
// and watches the output until each provided condition succeeds, in a way that is identical
// to function UntilWithoutRetry. (See above.)
// UntilWithSync can deal with all errors like API timeout, lost connections and 'Resource version too old'.
// It is the only function that can recover from 'Resource version too old', Until and UntilWithoutRetry will
// just fail in that case. On the other hand it can't provide you with guarantees as strong as using simple
// Watch method with Until. It can skip some intermediate events in case of watch function failing but it will
// re-list to recover and you always get an event, if there has been a change, after recovery.
// Also with the current implementation based on DeltaFIFO, order of the events you receive is guaranteed only for
// particular object, not between more of them even it's the same resource.
// The most frequent usage would be a command that needs to watch the "state of the world" and should't fail, like:
// waiting for object reaching a state, "small" controllers, ...
func UntilWithSync(ctx context.Context, lw cache.ListerWatcher, objType runtime.Object, precondition PreconditionFunc, conditions ...ConditionFunc) (*watch.Event, error) {
indexer, informer, watcher := NewIndexerInformerWatcher(lw, objType)
// Proxy watcher can be stopped multiple times so it's fine to use defer here to cover alternative branches and
// let UntilWithoutRetry to stop it
defer watcher.Stop()
if precondition != nil {
if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) {
return nil, fmt.Errorf("UntilWithSync: unable to sync caches: %v", ctx.Err())
}
done, err := precondition(indexer)
if err != nil {
return nil, err
}
if done {
return nil, nil
}
}
return UntilWithoutRetry(ctx, watcher, conditions...)
}
// ContextWithOptionalTimeout wraps context.WithTimeout and handles infinite timeouts expressed as 0 duration.
func ContextWithOptionalTimeout(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc) {
if timeout < 0 {
@ -100,3 +145,81 @@ func ContextWithOptionalTimeout(parent context.Context, timeout time.Duration) (
return context.WithTimeout(parent, timeout)
}
// ListWatchUntil checks the provided conditions against the items returned by the list watcher, returning wait.ErrWaitTimeout
// if timeout is exceeded without all conditions returning true, or an error if an error occurs.
// TODO: check for watch expired error and retry watch from latest point? Same issue exists for Until.
// TODO: remove when no longer used
//
// Deprecated: Use UntilWithSync instead.
func ListWatchUntil(timeout time.Duration, lw cache.ListerWatcher, conditions ...ConditionFunc) (*watch.Event, error) {
if len(conditions) == 0 {
return nil, nil
}
list, err := lw.List(metav1.ListOptions{})
if err != nil {
return nil, err
}
initialItems, err := meta.ExtractList(list)
if err != nil {
return nil, err
}
// use the initial items as simulated "adds"
var lastEvent *watch.Event
currIndex := 0
passedConditions := 0
for _, condition := range conditions {
// check the next condition against the previous event and short circuit waiting for the next watch
if lastEvent != nil {
done, err := condition(*lastEvent)
if err != nil {
return lastEvent, err
}
if done {
passedConditions = passedConditions + 1
continue
}
}
ConditionSucceeded:
for currIndex < len(initialItems) {
lastEvent = &watch.Event{Type: watch.Added, Object: initialItems[currIndex]}
currIndex++
done, err := condition(*lastEvent)
if err != nil {
return lastEvent, err
}
if done {
passedConditions = passedConditions + 1
break ConditionSucceeded
}
}
}
if passedConditions == len(conditions) {
return lastEvent, nil
}
remainingConditions := conditions[passedConditions:]
metaObj, err := meta.ListAccessor(list)
if err != nil {
return nil, err
}
currResourceVersion := metaObj.GetResourceVersion()
watchInterface, err := lw.Watch(metav1.ListOptions{ResourceVersion: currResourceVersion})
if err != nil {
return nil, err
}
ctx, cancel := ContextWithOptionalTimeout(context.Background(), timeout)
defer cancel()
evt, err := UntilWithoutRetry(ctx, watchInterface, remainingConditions...)
if err == ErrWatchClosed {
// present a consistent error interface to callers
err = wait.ErrWaitTimeout
}
return evt, err
}

View File

@ -19,14 +19,19 @@ package watch
import (
"context"
"errors"
"reflect"
"strings"
"testing"
"time"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
fakeclient "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/cache"
)
type fakePod struct {
@ -172,3 +177,127 @@ func TestUntilErrorCondition(t *testing.T) {
t.Fatalf("expected %q in error string, got %q", expected, err.Error())
}
}
func TestUntilWithSync(t *testing.T) {
// FIXME: test preconditions
tt := []struct {
name string
lw *cache.ListWatch
preconditionFunc PreconditionFunc
conditionFunc ConditionFunc
expectedErr error
expectedEvent *watch.Event
}{
{
name: "doesn't wait for sync with no precondition",
lw: &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
select {}
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
select {}
},
},
preconditionFunc: nil,
conditionFunc: func(e watch.Event) (bool, error) {
return true, nil
},
expectedErr: errors.New("timed out waiting for the condition"),
expectedEvent: nil,
},
{
name: "waits indefinitely with precondition if it can't sync",
lw: &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
select {}
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
select {}
},
},
preconditionFunc: func(store cache.Store) (bool, error) {
return true, nil
},
conditionFunc: func(e watch.Event) (bool, error) {
return true, nil
},
expectedErr: errors.New("UntilWithSync: unable to sync caches: context deadline exceeded"),
expectedEvent: nil,
},
{
name: "precondition can stop the loop",
lw: func() *cache.ListWatch {
fakeclient := fakeclient.NewSimpleClientset(&corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "first"}})
return &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return fakeclient.CoreV1().Secrets("").List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return fakeclient.CoreV1().Secrets("").Watch(options)
},
}
}(),
preconditionFunc: func(store cache.Store) (bool, error) {
_, exists, err := store.Get(&metav1.ObjectMeta{Namespace: "", Name: "first"})
if err != nil {
return true, err
}
if exists {
return true, nil
}
return false, nil
},
conditionFunc: func(e watch.Event) (bool, error) {
return true, errors.New("should never reach this")
},
expectedErr: nil,
expectedEvent: nil,
},
{
name: "precondition lets it proceed to regular condition",
lw: func() *cache.ListWatch {
fakeclient := fakeclient.NewSimpleClientset(&corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "first"}})
return &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return fakeclient.CoreV1().Secrets("").List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return fakeclient.CoreV1().Secrets("").Watch(options)
},
}
}(),
preconditionFunc: func(store cache.Store) (bool, error) {
return false, nil
},
conditionFunc: func(e watch.Event) (bool, error) {
if e.Type == watch.Added {
return true, nil
}
panic("no other events are expected")
},
expectedErr: nil,
expectedEvent: &watch.Event{Type: watch.Added, Object: &corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "first"}}},
},
}
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
// Informer waits for caches to sync by polling in 100ms intervals,
// timeout needs to be reasonably higher
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
event, err := UntilWithSync(ctx, tc.lw, &corev1.Secret{}, tc.preconditionFunc, tc.conditionFunc)
if !reflect.DeepEqual(err, tc.expectedErr) {
t.Errorf("expected error %#v, got %#v", tc.expectedErr, err)
}
if !reflect.DeepEqual(event, tc.expectedEvent) {
t.Errorf("expected event %#v, got %#v", tc.expectedEvent, event)
}
})
}
}

View File

@ -22,6 +22,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/certificates/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/watch:go_default_library",
"//staging/src/k8s.io/client-go/util/cert:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
],

View File

@ -24,10 +24,11 @@ import (
"encoding/base64"
"encoding/pem"
"fmt"
"github.com/golang/glog"
"reflect"
"time"
"github.com/golang/glog"
certificates "k8s.io/api/certificates/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -38,6 +39,7 @@ import (
"k8s.io/apimachinery/pkg/watch"
certificatesclient "k8s.io/client-go/kubernetes/typed/certificates/v1beta1"
"k8s.io/client-go/tools/cache"
watchtools "k8s.io/client-go/tools/watch"
certutil "k8s.io/client-go/util/cert"
)
@ -121,7 +123,7 @@ func RequestCertificate(client certificatesclient.CertificateSigningRequestInter
func WaitForCertificate(client certificatesclient.CertificateSigningRequestInterface, req *certificates.CertificateSigningRequest, timeout time.Duration) (certData []byte, err error) {
fieldSelector := fields.OneTermEqualSelector("metadata.name", req.Name).String()
event, err := cache.ListWatchUntil(
event, err := watchtools.ListWatchUntil(
timeout,
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {

View File

@ -1702,10 +1702,6 @@
"ImportPath": "k8s.io/client-go/tools/reference",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/client-go/tools/watch",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/client-go/transport",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

View File

@ -1670,10 +1670,6 @@
"ImportPath": "k8s.io/client-go/tools/reference",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/client-go/tools/watch",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/client-go/transport",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

View File

@ -1066,10 +1066,6 @@
"ImportPath": "k8s.io/client-go/tools/reference",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/client-go/tools/watch",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/client-go/transport",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

View File

@ -35,6 +35,7 @@ filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//test/integration/apimachinery:all-srcs",
"//test/integration/apiserver:all-srcs",
"//test/integration/auth:all-srcs",
"//test/integration/benchmark/jsonify:all-srcs",

View File

@ -0,0 +1,38 @@
load("@io_bazel_rules_go//go:def.bzl", "go_test")
go_test(
name = "go_default_test",
srcs = [
"main_test.go",
"watch_restart_test.go",
],
deps = [
"//pkg/api/testapi:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/watch:go_default_library",
"//test/integration/framework:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,27 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apimachinery
import (
"testing"
"k8s.io/kubernetes/test/integration/framework"
)
func TestMain(m *testing.M) {
framework.EtcdMain(m.Run)
}

View File

@ -0,0 +1,258 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apimachinery
import (
"context"
"fmt"
"reflect"
"testing"
"time"
"k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
watchtools "k8s.io/client-go/tools/watch"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/test/integration/framework"
)
func noopNormalization(output []string) []string {
return output
}
func normalizeInformerOutputFunc(initialVal string) func(output []string) []string {
return func(output []string) []string {
result := make([]string, 0, len(output))
// Removes initial value and all of its direct repetitions
lastVal := initialVal
for _, v := range output {
// Make values unique as informer(List+Watch) duplicates some events
if v == lastVal {
continue
}
result = append(result, v)
lastVal = v
}
return result
}
}
func TestWatchRestartsIfTimeoutNotReached(t *testing.T) {
// Has to be longer than 5 seconds
timeout := 2 * time.Minute
// Set up a master
masterConfig := framework.NewIntegrationTestMasterConfig()
// Timeout is set random between MinRequestTimeout and 2x
masterConfig.GenericConfig.MinRequestTimeout = int(timeout.Seconds()) / 4
_, s, closeFn := framework.RunAMaster(masterConfig)
defer closeFn()
config := &restclient.Config{
Host: s.URL,
ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Groups[corev1.GroupName].GroupVersion()},
}
namespaceObject := framework.CreateTestingNamespace("retry-watch", s, t)
defer framework.DeleteTestingNamespace(namespaceObject, s, t)
getListFunc := func(c *kubernetes.Clientset, secret *v1.Secret) func(options metav1.ListOptions) *v1.SecretList {
return func(options metav1.ListOptions) *v1.SecretList {
options.FieldSelector = fields.OneTermEqualSelector("metadata.name", secret.Name).String()
res, err := c.CoreV1().Secrets(secret.Namespace).List(options)
if err != nil {
t.Fatalf("Failed to list Secrets: %v", err)
}
return res
}
}
getWatchFunc := func(c *kubernetes.Clientset, secret *v1.Secret) func(options metav1.ListOptions) (watch.Interface, error) {
return func(options metav1.ListOptions) (watch.Interface, error) {
options.FieldSelector = fields.OneTermEqualSelector("metadata.name", secret.Name).String()
res, err := c.CoreV1().Secrets(secret.Namespace).Watch(options)
if err != nil {
t.Fatalf("Failed to create a watcher on Secrets: %v", err)
}
return res, err
}
}
generateEvents := func(t *testing.T, c *kubernetes.Clientset, secret *v1.Secret, referenceOutput *[]string, stopChan chan struct{}, stoppedChan chan struct{}) {
defer close(stoppedChan)
counter := 0
// These 5 seconds are here to protect against a race at the end when we could write something there at the same time as watch.Until ends
softTimeout := timeout - 5*time.Second
if softTimeout < 0 {
panic("Timeout has to be grater than 5 seconds!")
}
endChannel := time.After(softTimeout)
for {
select {
// TODO: get this lower once we figure out how to extend ETCD cache
case <-time.After(1000 * time.Millisecond):
counter = counter + 1
patch := fmt.Sprintf(`{"metadata": {"annotations": {"count": "%d"}}}`, counter)
_, err := c.CoreV1().Secrets(secret.Namespace).Patch(secret.Name, types.StrategicMergePatchType, []byte(patch))
if err != nil {
t.Fatalf("Failed to patch secret: %v", err)
}
*referenceOutput = append(*referenceOutput, fmt.Sprintf("%d", counter))
case <-endChannel:
return
case <-stopChan:
return
}
}
}
initialCount := "0"
newTestSecret := func(name string) *v1.Secret {
return &v1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespaceObject.Name,
Annotations: map[string]string{
"count": initialCount,
},
},
Data: map[string][]byte{
"data": []byte("value1\n"),
},
}
}
tt := []struct {
name string
succeed bool
secret *v1.Secret
getWatcher func(c *kubernetes.Clientset, secret *v1.Secret) (watch.Interface, error)
normalizeOutputFunc func(referenceOutput []string) []string
}{
{
name: "regular watcher should fail",
succeed: false,
secret: newTestSecret("secret-01"),
getWatcher: func(c *kubernetes.Clientset, secret *v1.Secret) (watch.Interface, error) {
options := metav1.ListOptions{
ResourceVersion: secret.ResourceVersion,
}
return getWatchFunc(c, secret)(options)
}, // regular watcher; unfortunately destined to fail
normalizeOutputFunc: noopNormalization,
},
{
name: "InformerWatcher survives closed watches",
succeed: true,
secret: newTestSecret("secret-03"),
getWatcher: func(c *kubernetes.Clientset, secret *v1.Secret) (watch.Interface, error) {
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return getListFunc(c, secret)(options), nil
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return getWatchFunc(c, secret)(options)
},
}
_, _, w := watchtools.NewIndexerInformerWatcher(lw, &v1.Secret{})
return w, nil
},
normalizeOutputFunc: normalizeInformerOutputFunc(initialCount),
},
}
for _, tmptc := range tt {
tc := tmptc // we need to copy it for parallel runs
t.Run(tc.name, func(t *testing.T) {
c, err := kubernetes.NewForConfig(config)
if err != nil {
t.Fatalf("Failed to create clientset: %v", err)
}
secret, err := c.CoreV1().Secrets(tc.secret.Namespace).Create(tc.secret)
if err != nil {
t.Fatalf("Failed to create testing secret %s/%s: %v", tc.secret.Namespace, tc.secret.Name, err)
}
watcher, err := tc.getWatcher(c, secret)
if err != nil {
t.Fatalf("Failed to create watcher: %v", err)
}
var referenceOutput []string
var output []string
stopChan := make(chan struct{})
stoppedChan := make(chan struct{})
go generateEvents(t, c, secret, &referenceOutput, stopChan, stoppedChan)
// Record current time to be able to asses if the timeout has been reached
startTime := time.Now()
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), timeout)
defer cancel()
_, err = watchtools.UntilWithoutRetry(ctx, watcher, func(event watch.Event) (bool, error) {
s, ok := event.Object.(*v1.Secret)
if !ok {
t.Fatalf("Received an object that is not a Secret: %#v", event.Object)
}
output = append(output, s.Annotations["count"])
// Watch will never end voluntarily
return false, nil
})
watchDuration := time.Since(startTime)
close(stopChan)
<-stoppedChan
output = tc.normalizeOutputFunc(output)
t.Logf("Watch duration: %v; timeout: %v", watchDuration, timeout)
if err == nil && !tc.succeed {
t.Fatalf("Watch should have timed out but it exited without an error!")
}
if err != wait.ErrWaitTimeout && tc.succeed {
t.Fatalf("Watch exited with error: %v!", err)
}
if watchDuration < timeout && tc.succeed {
t.Fatalf("Watch should have timed out after %v but it timed out prematurely after %v!", timeout, watchDuration)
}
if watchDuration >= timeout && !tc.succeed {
t.Fatalf("Watch should have timed out but it succeeded!")
}
if tc.succeed && !reflect.DeepEqual(referenceOutput, output) {
t.Fatalf("Reference and real output differ! We must have lost some events or read some multiple times!\nRef: %#v\nReal: %#v", referenceOutput, output)
}
})
}
}