mirror of https://github.com/k3s-io/k3s
Merge pull request #57504 from yue9944882/fix-fake-client-dummy-watch
Automatic merge from submit-queue. If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. feat(fakeclient): push event on watched channel on add/update/delete **What this PR does / why we need it**: This PR enables watch function for kubernetes [fakeclient](pull/6/head1bcf0b0a22/staging/src/k8s.io/client-go/kubernetes/fake/clientset_generated.go (L88)
). This fake client add watchReactorFunction by wrapping [watch.NewFake](1bcf0b0a22/staging/src/k8s.io/client-go/kubernetes/fake/clientset_generated.go (L98)
) which is a `chan Event` but actually nothing pushes objects into this channel. So all watch function called by fake client will never return or never receive any object. This PR intercepts ReactionFunc of `Create / Update / DeleteActionImpl` and will push the requested object to channel. Which issue(s) this PR fixes (optional, in fixes #<issue number>(, fixes #<issue_number>, ...) format, will close the issue(s) when PR gets merged): Fixes #54075 **Special notes for your reviewer**: **Release note**: ```dev-release-note enable watch function for fake client ```
commit
268555a30a
|
@ -71,7 +71,15 @@ func NewSimpleClientset(objects ...runtime.Object) *Clientset {
|
||||||
|
|
||||||
fakePtr := testing.Fake{}
|
fakePtr := testing.Fake{}
|
||||||
fakePtr.AddReactor("*", "*", testing.ObjectReaction(o))
|
fakePtr.AddReactor("*", "*", testing.ObjectReaction(o))
|
||||||
fakePtr.AddWatchReactor("*", testing.DefaultWatchReactor(watch.NewFake(), nil))
|
fakePtr.AddWatchReactor("*", func(action testing.Action) (handled bool, ret watch.Interface, err error) {
|
||||||
|
gvr := action.GetResource()
|
||||||
|
ns := action.GetNamespace()
|
||||||
|
watch, err := o.Watch(gvr, ns)
|
||||||
|
if err != nil {
|
||||||
|
return false, nil, err
|
||||||
|
}
|
||||||
|
return true, watch, nil
|
||||||
|
})
|
||||||
|
|
||||||
return &Clientset{fakePtr, &fakediscovery.FakeDiscovery{Fake: &fakePtr}}
|
return &Clientset{fakePtr, &fakediscovery.FakeDiscovery{Fake: &fakePtr}}
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,7 +41,15 @@ func NewSimpleClientset(objects ...runtime.Object) *Clientset {
|
||||||
|
|
||||||
fakePtr := testing.Fake{}
|
fakePtr := testing.Fake{}
|
||||||
fakePtr.AddReactor("*", "*", testing.ObjectReaction(o))
|
fakePtr.AddReactor("*", "*", testing.ObjectReaction(o))
|
||||||
fakePtr.AddWatchReactor("*", testing.DefaultWatchReactor(watch.NewFake(), nil))
|
fakePtr.AddWatchReactor("*", func(action testing.Action) (handled bool, ret watch.Interface, err error) {
|
||||||
|
gvr := action.GetResource()
|
||||||
|
ns := action.GetNamespace()
|
||||||
|
watch, err := o.Watch(gvr, ns)
|
||||||
|
if err != nil {
|
||||||
|
return false, nil, err
|
||||||
|
}
|
||||||
|
return true, watch, nil
|
||||||
|
})
|
||||||
|
|
||||||
return &Clientset{fakePtr, &fakediscovery.FakeDiscovery{Fake: &fakePtr}}
|
return &Clientset{fakePtr, &fakediscovery.FakeDiscovery{Fake: &fakePtr}}
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,7 +41,15 @@ func NewSimpleClientset(objects ...runtime.Object) *Clientset {
|
||||||
|
|
||||||
fakePtr := testing.Fake{}
|
fakePtr := testing.Fake{}
|
||||||
fakePtr.AddReactor("*", "*", testing.ObjectReaction(o))
|
fakePtr.AddReactor("*", "*", testing.ObjectReaction(o))
|
||||||
fakePtr.AddWatchReactor("*", testing.DefaultWatchReactor(watch.NewFake(), nil))
|
fakePtr.AddWatchReactor("*", func(action testing.Action) (handled bool, ret watch.Interface, err error) {
|
||||||
|
gvr := action.GetResource()
|
||||||
|
ns := action.GetNamespace()
|
||||||
|
watch, err := o.Watch(gvr, ns)
|
||||||
|
if err != nil {
|
||||||
|
return false, nil, err
|
||||||
|
}
|
||||||
|
return true, watch, nil
|
||||||
|
})
|
||||||
|
|
||||||
return &Clientset{fakePtr, &fakediscovery.FakeDiscovery{Fake: &fakePtr}}
|
return &Clientset{fakePtr, &fakediscovery.FakeDiscovery{Fake: &fakePtr}}
|
||||||
}
|
}
|
||||||
|
|
|
@ -95,7 +95,15 @@ func NewSimpleClientset(objects ...runtime.Object) *Clientset {
|
||||||
|
|
||||||
fakePtr := testing.Fake{}
|
fakePtr := testing.Fake{}
|
||||||
fakePtr.AddReactor("*", "*", testing.ObjectReaction(o))
|
fakePtr.AddReactor("*", "*", testing.ObjectReaction(o))
|
||||||
fakePtr.AddWatchReactor("*", testing.DefaultWatchReactor(watch.NewFake(), nil))
|
fakePtr.AddWatchReactor("*", func(action testing.Action) (handled bool, ret watch.Interface, err error) {
|
||||||
|
gvr := action.GetResource()
|
||||||
|
ns := action.GetNamespace()
|
||||||
|
watch, err := o.Watch(gvr, ns)
|
||||||
|
if err != nil {
|
||||||
|
return false, nil, err
|
||||||
|
}
|
||||||
|
return true, watch, nil
|
||||||
|
})
|
||||||
|
|
||||||
return &Clientset{fakePtr, &fakediscovery.FakeDiscovery{Fake: &fakePtr}}
|
return &Clientset{fakePtr, &fakediscovery.FakeDiscovery{Fake: &fakePtr}}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package(default_visibility = ["//visibility:public"])
|
||||||
load(
|
load(
|
||||||
"@io_bazel_rules_go//go:def.bzl",
|
"@io_bazel_rules_go//go:def.bzl",
|
||||||
"go_library",
|
"go_library",
|
||||||
|
"go_test",
|
||||||
)
|
)
|
||||||
|
|
||||||
go_library(
|
go_library(
|
||||||
|
@ -28,6 +29,24 @@ go_library(
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
go_test(
|
||||||
|
name = "go_default_test",
|
||||||
|
srcs = [
|
||||||
|
"fixture_test.go",
|
||||||
|
],
|
||||||
|
embed = [":go_default_library"],
|
||||||
|
importpath = "k8s.io/client-go/testing",
|
||||||
|
deps = [
|
||||||
|
"//vendor/github.com/stretchr/testify/assert:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
filegroup(
|
filegroup(
|
||||||
name = "package-srcs",
|
name = "package-srcs",
|
||||||
srcs = glob(["**"]),
|
srcs = glob(["**"]),
|
||||||
|
|
|
@ -29,6 +29,11 @@ import (
|
||||||
restclient "k8s.io/client-go/rest"
|
restclient "k8s.io/client-go/rest"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// FakeWatchBufferSize is the max num of watch event can be buffered in the
|
||||||
|
// watch channel. Note that when watch event overflows or exceed this buffer
|
||||||
|
// size, manipulations via fake client may be blocked.
|
||||||
|
const FakeWatchBufferSize = 128
|
||||||
|
|
||||||
// ObjectTracker keeps track of objects. It is intended to be used to
|
// ObjectTracker keeps track of objects. It is intended to be used to
|
||||||
// fake calls to a server by returning objects based on their kind,
|
// fake calls to a server by returning objects based on their kind,
|
||||||
// namespace and name.
|
// namespace and name.
|
||||||
|
@ -54,6 +59,10 @@ type ObjectTracker interface {
|
||||||
// didn't exist in the tracker prior to deletion, Delete returns
|
// didn't exist in the tracker prior to deletion, Delete returns
|
||||||
// no error.
|
// no error.
|
||||||
Delete(gvr schema.GroupVersionResource, ns, name string) error
|
Delete(gvr schema.GroupVersionResource, ns, name string) error
|
||||||
|
|
||||||
|
// Watch watches objects from the tracker. Watch returns a channel
|
||||||
|
// which will push added / modified / deleted object.
|
||||||
|
Watch(gvr schema.GroupVersionResource, ns string) (watch.Interface, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ObjectScheme abstracts the implementation of common operations on objects.
|
// ObjectScheme abstracts the implementation of common operations on objects.
|
||||||
|
@ -132,6 +141,13 @@ type tracker struct {
|
||||||
decoder runtime.Decoder
|
decoder runtime.Decoder
|
||||||
lock sync.RWMutex
|
lock sync.RWMutex
|
||||||
objects map[schema.GroupVersionResource][]runtime.Object
|
objects map[schema.GroupVersionResource][]runtime.Object
|
||||||
|
// The value type of watchers is a map of which the key is either a namespace or
|
||||||
|
// all/non namespace aka "" and its value is list of fake watchers. Each of
|
||||||
|
// fake watcher holds a buffered channel of size "FakeWatchBufferSize" which
|
||||||
|
// is default to 128. Manipulations on resources will broadcast the notification
|
||||||
|
// events into the watchers' channel and note that too many unhandled event may
|
||||||
|
// potentially block the tracker.
|
||||||
|
watchers map[schema.GroupVersionResource]map[string][]*watch.FakeWatcher
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ ObjectTracker = &tracker{}
|
var _ ObjectTracker = &tracker{}
|
||||||
|
@ -140,9 +156,10 @@ var _ ObjectTracker = &tracker{}
|
||||||
// of objects for the fake clientset. Mostly useful for unit tests.
|
// of objects for the fake clientset. Mostly useful for unit tests.
|
||||||
func NewObjectTracker(scheme ObjectScheme, decoder runtime.Decoder) ObjectTracker {
|
func NewObjectTracker(scheme ObjectScheme, decoder runtime.Decoder) ObjectTracker {
|
||||||
return &tracker{
|
return &tracker{
|
||||||
scheme: scheme,
|
scheme: scheme,
|
||||||
decoder: decoder,
|
decoder: decoder,
|
||||||
objects: make(map[schema.GroupVersionResource][]runtime.Object),
|
objects: make(map[schema.GroupVersionResource][]runtime.Object),
|
||||||
|
watchers: make(map[schema.GroupVersionResource]map[string][]*watch.FakeWatcher),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -185,6 +202,19 @@ func (t *tracker) List(gvr schema.GroupVersionResource, gvk schema.GroupVersionK
|
||||||
return list.DeepCopyObject(), nil
|
return list.DeepCopyObject(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *tracker) Watch(gvr schema.GroupVersionResource, ns string) (watch.Interface, error) {
|
||||||
|
t.lock.Lock()
|
||||||
|
defer t.lock.Unlock()
|
||||||
|
|
||||||
|
fakewatcher := watch.NewFakeWithChanSize(FakeWatchBufferSize, true)
|
||||||
|
|
||||||
|
if _, exists := t.watchers[gvr]; !exists {
|
||||||
|
t.watchers[gvr] = make(map[string][]*watch.FakeWatcher)
|
||||||
|
}
|
||||||
|
t.watchers[gvr][ns] = append(t.watchers[gvr][ns], fakewatcher)
|
||||||
|
return fakewatcher, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (t *tracker) Get(gvr schema.GroupVersionResource, ns, name string) (runtime.Object, error) {
|
func (t *tracker) Get(gvr schema.GroupVersionResource, ns, name string) (runtime.Object, error) {
|
||||||
errNotFound := errors.NewNotFound(gvr.GroupResource(), name)
|
errNotFound := errors.NewNotFound(gvr.GroupResource(), name)
|
||||||
|
|
||||||
|
@ -263,6 +293,19 @@ func (t *tracker) Update(gvr schema.GroupVersionResource, obj runtime.Object, ns
|
||||||
return t.add(gvr, obj, ns, true)
|
return t.add(gvr, obj, ns, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *tracker) getWatches(gvr schema.GroupVersionResource, ns string) []*watch.FakeWatcher {
|
||||||
|
watches := []*watch.FakeWatcher{}
|
||||||
|
if t.watchers[gvr] != nil {
|
||||||
|
if w := t.watchers[gvr][ns]; w != nil {
|
||||||
|
watches = append(watches, w...)
|
||||||
|
}
|
||||||
|
if w := t.watchers[gvr][""]; w != nil {
|
||||||
|
watches = append(watches, w...)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return watches
|
||||||
|
}
|
||||||
|
|
||||||
func (t *tracker) add(gvr schema.GroupVersionResource, obj runtime.Object, ns string, replaceExisting bool) error {
|
func (t *tracker) add(gvr schema.GroupVersionResource, obj runtime.Object, ns string, replaceExisting bool) error {
|
||||||
t.lock.Lock()
|
t.lock.Lock()
|
||||||
defer t.lock.Unlock()
|
defer t.lock.Unlock()
|
||||||
|
@ -296,6 +339,9 @@ func (t *tracker) add(gvr schema.GroupVersionResource, obj runtime.Object, ns st
|
||||||
}
|
}
|
||||||
if oldMeta.GetNamespace() == newMeta.GetNamespace() && oldMeta.GetName() == newMeta.GetName() {
|
if oldMeta.GetNamespace() == newMeta.GetNamespace() && oldMeta.GetName() == newMeta.GetName() {
|
||||||
if replaceExisting {
|
if replaceExisting {
|
||||||
|
for _, w := range t.getWatches(gvr, ns) {
|
||||||
|
w.Modify(obj)
|
||||||
|
}
|
||||||
t.objects[gvr][i] = obj
|
t.objects[gvr][i] = obj
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -310,6 +356,10 @@ func (t *tracker) add(gvr schema.GroupVersionResource, obj runtime.Object, ns st
|
||||||
|
|
||||||
t.objects[gvr] = append(t.objects[gvr], obj)
|
t.objects[gvr] = append(t.objects[gvr], obj)
|
||||||
|
|
||||||
|
for _, w := range t.getWatches(gvr, ns) {
|
||||||
|
w.Add(obj)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -342,7 +392,11 @@ func (t *tracker) Delete(gvr schema.GroupVersionResource, ns, name string) error
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if objMeta.GetNamespace() == ns && objMeta.GetName() == name {
|
if objMeta.GetNamespace() == ns && objMeta.GetName() == name {
|
||||||
|
obj := t.objects[gvr][i]
|
||||||
t.objects[gvr] = append(t.objects[gvr][:i], t.objects[gvr][i+1:]...)
|
t.objects[gvr] = append(t.objects[gvr][:i], t.objects[gvr][i+1:]...)
|
||||||
|
for _, w := range t.getWatches(gvr, ns) {
|
||||||
|
w.Delete(obj)
|
||||||
|
}
|
||||||
found = true
|
found = true
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,192 @@
|
||||||
|
/*
|
||||||
|
Copyright 2015 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package testing
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"math/rand"
|
||||||
|
"strconv"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
|
||||||
|
"k8s.io/apimachinery/pkg/api/meta"
|
||||||
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||||
|
runtime "k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
|
serializer "k8s.io/apimachinery/pkg/runtime/serializer"
|
||||||
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
|
)
|
||||||
|
|
||||||
|
func getArbitraryResource(s schema.GroupVersionResource, name, namespace string) *unstructured.Unstructured {
|
||||||
|
return &unstructured.Unstructured{
|
||||||
|
Object: map[string]interface{}{
|
||||||
|
"kind": s.Resource,
|
||||||
|
"apiVersion": s.Version,
|
||||||
|
"metadata": map[string]interface{}{
|
||||||
|
"name": name,
|
||||||
|
"namespace": namespace,
|
||||||
|
"generateName": "test_generateName",
|
||||||
|
"uid": "test_uid",
|
||||||
|
"resourceVersion": "test_resourceVersion",
|
||||||
|
"selfLink": "test_selfLink",
|
||||||
|
},
|
||||||
|
"data": strconv.Itoa(rand.Int()),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWatchCallNonNamespace(t *testing.T) {
|
||||||
|
testResource := schema.GroupVersionResource{Group: "", Version: "test_version", Resource: "test_kind"}
|
||||||
|
testObj := getArbitraryResource(testResource, "test_name", "test_namespace")
|
||||||
|
accessor, err := meta.Accessor(testObj)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
ns := accessor.GetNamespace()
|
||||||
|
scheme := runtime.NewScheme()
|
||||||
|
codecs := serializer.NewCodecFactory(scheme)
|
||||||
|
o := NewObjectTracker(scheme, codecs.UniversalDecoder())
|
||||||
|
watch, err := o.Watch(testResource, ns)
|
||||||
|
go func() {
|
||||||
|
err := o.Create(testResource, testObj, ns)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("test resource creation failed: %v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
out := <-watch.ResultChan()
|
||||||
|
assert.Equal(t, testObj, out.Object, "watched object mismatch")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWatchCallAllNamespace(t *testing.T) {
|
||||||
|
testResource := schema.GroupVersionResource{Group: "", Version: "test_version", Resource: "test_kind"}
|
||||||
|
testObj := getArbitraryResource(testResource, "test_name", "test_namespace")
|
||||||
|
accessor, err := meta.Accessor(testObj)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
ns := accessor.GetNamespace()
|
||||||
|
scheme := runtime.NewScheme()
|
||||||
|
codecs := serializer.NewCodecFactory(scheme)
|
||||||
|
o := NewObjectTracker(scheme, codecs.UniversalDecoder())
|
||||||
|
w, err := o.Watch(testResource, "test_namespace")
|
||||||
|
wAll, err := o.Watch(testResource, "")
|
||||||
|
go func() {
|
||||||
|
err := o.Create(testResource, testObj, ns)
|
||||||
|
assert.NoError(t, err, "test resource creation failed")
|
||||||
|
}()
|
||||||
|
out := <-w.ResultChan()
|
||||||
|
outAll := <-wAll.ResultChan()
|
||||||
|
assert.Equal(t, watch.Added, out.Type, "watch event mismatch")
|
||||||
|
assert.Equal(t, watch.Added, outAll.Type, "watch event mismatch")
|
||||||
|
assert.Equal(t, testObj, out.Object, "watched created object mismatch")
|
||||||
|
assert.Equal(t, testObj, outAll.Object, "watched created object mismatch")
|
||||||
|
go func() {
|
||||||
|
err := o.Update(testResource, testObj, ns)
|
||||||
|
assert.NoError(t, err, "test resource updating failed")
|
||||||
|
}()
|
||||||
|
out = <-w.ResultChan()
|
||||||
|
outAll = <-wAll.ResultChan()
|
||||||
|
assert.Equal(t, watch.Modified, out.Type, "watch event mismatch")
|
||||||
|
assert.Equal(t, watch.Modified, outAll.Type, "watch event mismatch")
|
||||||
|
assert.Equal(t, testObj, out.Object, "watched updated object mismatch")
|
||||||
|
assert.Equal(t, testObj, outAll.Object, "watched updated object mismatch")
|
||||||
|
go func() {
|
||||||
|
err := o.Delete(testResource, "test_namespace", "test_name")
|
||||||
|
assert.NoError(t, err, "test resource deletion failed")
|
||||||
|
}()
|
||||||
|
out = <-w.ResultChan()
|
||||||
|
outAll = <-wAll.ResultChan()
|
||||||
|
assert.Equal(t, watch.Deleted, out.Type, "watch event mismatch")
|
||||||
|
assert.Equal(t, watch.Deleted, outAll.Type, "watch event mismatch")
|
||||||
|
assert.Equal(t, testObj, out.Object, "watched deleted object mismatch")
|
||||||
|
assert.Equal(t, testObj, outAll.Object, "watched deleted object mismatch")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWatchCallMultipleInvocation(t *testing.T) {
|
||||||
|
cases := []struct {
|
||||||
|
name string
|
||||||
|
op watch.EventType
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
"foo",
|
||||||
|
watch.Added,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"bar",
|
||||||
|
watch.Added,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"bar",
|
||||||
|
watch.Modified,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"foo",
|
||||||
|
watch.Deleted,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"bar",
|
||||||
|
watch.Deleted,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
scheme := runtime.NewScheme()
|
||||||
|
codecs := serializer.NewCodecFactory(scheme)
|
||||||
|
testResource := schema.GroupVersionResource{Group: "", Version: "test_version", Resource: "test_kind"}
|
||||||
|
|
||||||
|
o := NewObjectTracker(scheme, codecs.UniversalDecoder())
|
||||||
|
watchNamespaces := []string{
|
||||||
|
"",
|
||||||
|
"",
|
||||||
|
"test_namespace",
|
||||||
|
"test_namespace",
|
||||||
|
}
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(len(watchNamespaces))
|
||||||
|
for idx, watchNamespace := range watchNamespaces {
|
||||||
|
i := idx
|
||||||
|
w, err := o.Watch(testResource, watchNamespace)
|
||||||
|
go func() {
|
||||||
|
assert.NoError(t, err, "watch invocation failed")
|
||||||
|
for _, c := range cases {
|
||||||
|
fmt.Printf("%#v %#v\n", c, i)
|
||||||
|
event := <-w.ResultChan()
|
||||||
|
accessor, err := meta.Accessor(event.Object)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
assert.Equal(t, c.op, event.Type, "watch event mismatched")
|
||||||
|
assert.Equal(t, c.name, accessor.GetName(), "watched object mismatch")
|
||||||
|
}
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
for _, c := range cases {
|
||||||
|
switch c.op {
|
||||||
|
case watch.Added:
|
||||||
|
obj := getArbitraryResource(testResource, c.name, "test_namespace")
|
||||||
|
o.Create(testResource, obj, "test_namespace")
|
||||||
|
case watch.Modified:
|
||||||
|
obj := getArbitraryResource(testResource, c.name, "test_namespace")
|
||||||
|
o.Update(testResource, obj, "test_namespace")
|
||||||
|
case watch.Deleted:
|
||||||
|
o.Delete(testResource, "test_namespace", c.name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
}
|
|
@ -43,7 +43,15 @@ func NewSimpleClientset(objects ...runtime.Object) *Clientset {
|
||||||
|
|
||||||
fakePtr := testing.Fake{}
|
fakePtr := testing.Fake{}
|
||||||
fakePtr.AddReactor("*", "*", testing.ObjectReaction(o))
|
fakePtr.AddReactor("*", "*", testing.ObjectReaction(o))
|
||||||
fakePtr.AddWatchReactor("*", testing.DefaultWatchReactor(watch.NewFake(), nil))
|
fakePtr.AddWatchReactor("*", func(action testing.Action) (handled bool, ret watch.Interface, err error) {
|
||||||
|
gvr := action.GetResource()
|
||||||
|
ns := action.GetNamespace()
|
||||||
|
watch, err := o.Watch(gvr, ns)
|
||||||
|
if err != nil {
|
||||||
|
return false, nil, err
|
||||||
|
}
|
||||||
|
return true, watch, nil
|
||||||
|
})
|
||||||
|
|
||||||
return &Clientset{fakePtr, &fakediscovery.FakeDiscovery{Fake: &fakePtr}}
|
return &Clientset{fakePtr, &fakediscovery.FakeDiscovery{Fake: &fakePtr}}
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,7 +43,15 @@ func NewSimpleClientset(objects ...runtime.Object) *Clientset {
|
||||||
|
|
||||||
fakePtr := testing.Fake{}
|
fakePtr := testing.Fake{}
|
||||||
fakePtr.AddReactor("*", "*", testing.ObjectReaction(o))
|
fakePtr.AddReactor("*", "*", testing.ObjectReaction(o))
|
||||||
fakePtr.AddWatchReactor("*", testing.DefaultWatchReactor(watch.NewFake(), nil))
|
fakePtr.AddWatchReactor("*", func(action testing.Action) (handled bool, ret watch.Interface, err error) {
|
||||||
|
gvr := action.GetResource()
|
||||||
|
ns := action.GetNamespace()
|
||||||
|
watch, err := o.Watch(gvr, ns)
|
||||||
|
if err != nil {
|
||||||
|
return false, nil, err
|
||||||
|
}
|
||||||
|
return true, watch, nil
|
||||||
|
})
|
||||||
|
|
||||||
return &Clientset{fakePtr, &fakediscovery.FakeDiscovery{Fake: &fakePtr}}
|
return &Clientset{fakePtr, &fakediscovery.FakeDiscovery{Fake: &fakePtr}}
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,7 +43,15 @@ func NewSimpleClientset(objects ...runtime.Object) *Clientset {
|
||||||
|
|
||||||
fakePtr := testing.Fake{}
|
fakePtr := testing.Fake{}
|
||||||
fakePtr.AddReactor("*", "*", testing.ObjectReaction(o))
|
fakePtr.AddReactor("*", "*", testing.ObjectReaction(o))
|
||||||
fakePtr.AddWatchReactor("*", testing.DefaultWatchReactor(watch.NewFake(), nil))
|
fakePtr.AddWatchReactor("*", func(action testing.Action) (handled bool, ret watch.Interface, err error) {
|
||||||
|
gvr := action.GetResource()
|
||||||
|
ns := action.GetNamespace()
|
||||||
|
watch, err := o.Watch(gvr, ns)
|
||||||
|
if err != nil {
|
||||||
|
return false, nil, err
|
||||||
|
}
|
||||||
|
return true, watch, nil
|
||||||
|
})
|
||||||
|
|
||||||
return &Clientset{fakePtr, &fakediscovery.FakeDiscovery{Fake: &fakePtr}}
|
return &Clientset{fakePtr, &fakediscovery.FakeDiscovery{Fake: &fakePtr}}
|
||||||
}
|
}
|
||||||
|
|
|
@ -127,7 +127,15 @@ func NewSimpleClientset(objects ...runtime.Object) *Clientset {
|
||||||
|
|
||||||
fakePtr := testing.Fake{}
|
fakePtr := testing.Fake{}
|
||||||
fakePtr.AddReactor("*", "*", testing.ObjectReaction(o))
|
fakePtr.AddReactor("*", "*", testing.ObjectReaction(o))
|
||||||
fakePtr.AddWatchReactor("*", testing.DefaultWatchReactor(watch.NewFake(), nil))
|
fakePtr.AddWatchReactor("*", func(action testing.Action) (handled bool, ret watch.Interface, err error) {
|
||||||
|
gvr := action.GetResource()
|
||||||
|
ns := action.GetNamespace()
|
||||||
|
watch, err := o.Watch(gvr, ns)
|
||||||
|
if err != nil {
|
||||||
|
return false, nil, err
|
||||||
|
}
|
||||||
|
return true, watch, nil
|
||||||
|
})
|
||||||
|
|
||||||
return &Clientset{fakePtr, &fakediscovery.FakeDiscovery{Fake: &fakePtr}}
|
return &Clientset{fakePtr, &fakediscovery.FakeDiscovery{Fake: &fakePtr}}
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,7 +41,15 @@ func NewSimpleClientset(objects ...runtime.Object) *Clientset {
|
||||||
|
|
||||||
fakePtr := testing.Fake{}
|
fakePtr := testing.Fake{}
|
||||||
fakePtr.AddReactor("*", "*", testing.ObjectReaction(o))
|
fakePtr.AddReactor("*", "*", testing.ObjectReaction(o))
|
||||||
fakePtr.AddWatchReactor("*", testing.DefaultWatchReactor(watch.NewFake(), nil))
|
fakePtr.AddWatchReactor("*", func(action testing.Action) (handled bool, ret watch.Interface, err error) {
|
||||||
|
gvr := action.GetResource()
|
||||||
|
ns := action.GetNamespace()
|
||||||
|
watch, err := o.Watch(gvr, ns)
|
||||||
|
if err != nil {
|
||||||
|
return false, nil, err
|
||||||
|
}
|
||||||
|
return true, watch, nil
|
||||||
|
})
|
||||||
|
|
||||||
return &Clientset{fakePtr, &fakediscovery.FakeDiscovery{Fake: &fakePtr}}
|
return &Clientset{fakePtr, &fakediscovery.FakeDiscovery{Fake: &fakePtr}}
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,7 +41,15 @@ func NewSimpleClientset(objects ...runtime.Object) *Clientset {
|
||||||
|
|
||||||
fakePtr := testing.Fake{}
|
fakePtr := testing.Fake{}
|
||||||
fakePtr.AddReactor("*", "*", testing.ObjectReaction(o))
|
fakePtr.AddReactor("*", "*", testing.ObjectReaction(o))
|
||||||
fakePtr.AddWatchReactor("*", testing.DefaultWatchReactor(watch.NewFake(), nil))
|
fakePtr.AddWatchReactor("*", func(action testing.Action) (handled bool, ret watch.Interface, err error) {
|
||||||
|
gvr := action.GetResource()
|
||||||
|
ns := action.GetNamespace()
|
||||||
|
watch, err := o.Watch(gvr, ns)
|
||||||
|
if err != nil {
|
||||||
|
return false, nil, err
|
||||||
|
}
|
||||||
|
return true, watch, nil
|
||||||
|
})
|
||||||
|
|
||||||
return &Clientset{fakePtr, &fakediscovery.FakeDiscovery{Fake: &fakePtr}}
|
return &Clientset{fakePtr, &fakediscovery.FakeDiscovery{Fake: &fakePtr}}
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,7 +43,15 @@ func NewSimpleClientset(objects ...runtime.Object) *Clientset {
|
||||||
|
|
||||||
fakePtr := testing.Fake{}
|
fakePtr := testing.Fake{}
|
||||||
fakePtr.AddReactor("*", "*", testing.ObjectReaction(o))
|
fakePtr.AddReactor("*", "*", testing.ObjectReaction(o))
|
||||||
fakePtr.AddWatchReactor("*", testing.DefaultWatchReactor(watch.NewFake(), nil))
|
fakePtr.AddWatchReactor("*", func(action testing.Action) (handled bool, ret watch.Interface, err error) {
|
||||||
|
gvr := action.GetResource()
|
||||||
|
ns := action.GetNamespace()
|
||||||
|
watch, err := o.Watch(gvr, ns)
|
||||||
|
if err != nil {
|
||||||
|
return false, nil, err
|
||||||
|
}
|
||||||
|
return true, watch, nil
|
||||||
|
})
|
||||||
|
|
||||||
return &Clientset{fakePtr, &fakediscovery.FakeDiscovery{Fake: &fakePtr}}
|
return &Clientset{fakePtr, &fakediscovery.FakeDiscovery{Fake: &fakePtr}}
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,7 +41,15 @@ func NewSimpleClientset(objects ...runtime.Object) *Clientset {
|
||||||
|
|
||||||
fakePtr := testing.Fake{}
|
fakePtr := testing.Fake{}
|
||||||
fakePtr.AddReactor("*", "*", testing.ObjectReaction(o))
|
fakePtr.AddReactor("*", "*", testing.ObjectReaction(o))
|
||||||
fakePtr.AddWatchReactor("*", testing.DefaultWatchReactor(watch.NewFake(), nil))
|
fakePtr.AddWatchReactor("*", func(action testing.Action) (handled bool, ret watch.Interface, err error) {
|
||||||
|
gvr := action.GetResource()
|
||||||
|
ns := action.GetNamespace()
|
||||||
|
watch, err := o.Watch(gvr, ns)
|
||||||
|
if err != nil {
|
||||||
|
return false, nil, err
|
||||||
|
}
|
||||||
|
return true, watch, nil
|
||||||
|
})
|
||||||
|
|
||||||
return &Clientset{fakePtr, &fakediscovery.FakeDiscovery{Fake: &fakePtr}}
|
return &Clientset{fakePtr, &fakediscovery.FakeDiscovery{Fake: &fakePtr}}
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,7 +41,15 @@ func NewSimpleClientset(objects ...runtime.Object) *Clientset {
|
||||||
|
|
||||||
fakePtr := testing.Fake{}
|
fakePtr := testing.Fake{}
|
||||||
fakePtr.AddReactor("*", "*", testing.ObjectReaction(o))
|
fakePtr.AddReactor("*", "*", testing.ObjectReaction(o))
|
||||||
fakePtr.AddWatchReactor("*", testing.DefaultWatchReactor(watch.NewFake(), nil))
|
fakePtr.AddWatchReactor("*", func(action testing.Action) (handled bool, ret watch.Interface, err error) {
|
||||||
|
gvr := action.GetResource()
|
||||||
|
ns := action.GetNamespace()
|
||||||
|
watch, err := o.Watch(gvr, ns)
|
||||||
|
if err != nil {
|
||||||
|
return false, nil, err
|
||||||
|
}
|
||||||
|
return true, watch, nil
|
||||||
|
})
|
||||||
|
|
||||||
return &Clientset{fakePtr, &fakediscovery.FakeDiscovery{Fake: &fakePtr}}
|
return &Clientset{fakePtr, &fakediscovery.FakeDiscovery{Fake: &fakePtr}}
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,7 +41,15 @@ func NewSimpleClientset(objects ...runtime.Object) *Clientset {
|
||||||
|
|
||||||
fakePtr := testing.Fake{}
|
fakePtr := testing.Fake{}
|
||||||
fakePtr.AddReactor("*", "*", testing.ObjectReaction(o))
|
fakePtr.AddReactor("*", "*", testing.ObjectReaction(o))
|
||||||
fakePtr.AddWatchReactor("*", testing.DefaultWatchReactor(watch.NewFake(), nil))
|
fakePtr.AddWatchReactor("*", func(action testing.Action) (handled bool, ret watch.Interface, err error) {
|
||||||
|
gvr := action.GetResource()
|
||||||
|
ns := action.GetNamespace()
|
||||||
|
watch, err := o.Watch(gvr, ns)
|
||||||
|
if err != nil {
|
||||||
|
return false, nil, err
|
||||||
|
}
|
||||||
|
return true, watch, nil
|
||||||
|
})
|
||||||
|
|
||||||
return &Clientset{fakePtr, &fakediscovery.FakeDiscovery{Fake: &fakePtr}}
|
return &Clientset{fakePtr, &fakediscovery.FakeDiscovery{Fake: &fakePtr}}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue