Merge pull request #69308 from p0lyn0mial/dynamic_informer

dynamic informer factory
pull/58/head
k8s-ci-robot 2018-10-11 02:24:13 -07:00 committed by GitHub
commit effd009dab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 492 additions and 0 deletions

View File

@ -59,6 +59,7 @@ filegroup(
name = "all-srcs", name = "all-srcs",
srcs = [ srcs = [
":package-srcs", ":package-srcs",
"//staging/src/k8s.io/client-go/dynamic/dynamicinformer:all-srcs",
"//staging/src/k8s.io/client-go/dynamic/dynamiclister:all-srcs", "//staging/src/k8s.io/client-go/dynamic/dynamiclister:all-srcs",
"//staging/src/k8s.io/client-go/dynamic/fake:all-srcs", "//staging/src/k8s.io/client-go/dynamic/fake:all-srcs",
], ],

View File

@ -0,0 +1,53 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"informer.go",
"interface.go",
],
importmap = "k8s.io/kubernetes/vendor/k8s.io/client-go/dynamic/dynamicinformer",
importpath = "k8s.io/client-go/dynamic/dynamicinformer",
visibility = ["//visibility:public"],
deps = [
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/client-go/dynamic:go_default_library",
"//staging/src/k8s.io/client-go/dynamic/dynamiclister:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["informer_test.go"],
embed = [":go_default_library"],
deps = [
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/client-go/dynamic/fake:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,155 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dynamicinformer
import (
"sync"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamiclister"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
)
// NewDynamicSharedInformerFactory constructs a new instance of dynamicSharedInformerFactory for all namespaces.
func NewDynamicSharedInformerFactory(client dynamic.Interface, defaultResync time.Duration) DynamicSharedInformerFactory {
return NewFilteredDynamicSharedInformerFactory(client, defaultResync, metav1.NamespaceAll, nil)
}
// NewFilteredDynamicSharedInformerFactory constructs a new instance of dynamicSharedInformerFactory.
// Listers obtained via this factory will be subject to the same filters as specified here.
func NewFilteredDynamicSharedInformerFactory(client dynamic.Interface, defaultResync time.Duration, namespace string, tweakListOptions TweakListOptionsFunc) DynamicSharedInformerFactory {
return &dynamicSharedInformerFactory{
client: client,
defaultResync: defaultResync,
namespace: metav1.NamespaceAll,
informers: map[schema.GroupVersionResource]informers.GenericInformer{},
startedInformers: make(map[schema.GroupVersionResource]bool),
}
}
type dynamicSharedInformerFactory struct {
client dynamic.Interface
defaultResync time.Duration
namespace string
lock sync.Mutex
informers map[schema.GroupVersionResource]informers.GenericInformer
// startedInformers is used for tracking which informers have been started.
// This allows Start() to be called multiple times safely.
startedInformers map[schema.GroupVersionResource]bool
}
var _ DynamicSharedInformerFactory = &dynamicSharedInformerFactory{}
func (f *dynamicSharedInformerFactory) ForResource(gvr schema.GroupVersionResource) informers.GenericInformer {
f.lock.Lock()
defer f.lock.Unlock()
key := gvr
informer, exists := f.informers[key]
if exists {
return informer
}
informer = NewFilteredDynamicInformer(f.client, gvr, f.namespace, f.defaultResync, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, nil)
f.informers[key] = informer
return informer
}
// Start initializes all requested informers.
func (f *dynamicSharedInformerFactory) Start(stopCh <-chan struct{}) {
f.lock.Lock()
defer f.lock.Unlock()
for informerType, informer := range f.informers {
if !f.startedInformers[informerType] {
go informer.Informer().Run(stopCh)
f.startedInformers[informerType] = true
}
}
}
// WaitForCacheSync waits for all started informers' cache were synced.
func (f *dynamicSharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[schema.GroupVersionResource]bool {
informers := func() map[schema.GroupVersionResource]cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
informers := map[schema.GroupVersionResource]cache.SharedIndexInformer{}
for informerType, informer := range f.informers {
if f.startedInformers[informerType] {
informers[informerType] = informer.Informer()
}
}
return informers
}()
res := map[schema.GroupVersionResource]bool{}
for informType, informer := range informers {
res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)
}
return res
}
// NewFilteredDynamicInformer constructs a new informer for a dynamic type.
func NewFilteredDynamicInformer(client dynamic.Interface, gvr schema.GroupVersionResource, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions TweakListOptionsFunc) informers.GenericInformer {
return &dynamicInformer{
gvr: gvr,
informer: cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.Resource(gvr).Namespace(namespace).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.Resource(gvr).Namespace(namespace).Watch(options)
},
},
&unstructured.Unstructured{},
resyncPeriod,
indexers,
),
}
}
type dynamicInformer struct {
informer cache.SharedIndexInformer
gvr schema.GroupVersionResource
}
var _ informers.GenericInformer = &dynamicInformer{}
func (d *dynamicInformer) Informer() cache.SharedIndexInformer {
return d.informer
}
func (d *dynamicInformer) Lister() cache.GenericLister {
return dynamiclister.NewRuntimeObjectShim(dynamiclister.New(d.informer.GetIndexer(), d.gvr))
}

View File

@ -0,0 +1,160 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dynamicinformer_test
import (
"context"
"testing"
"time"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/dynamic/fake"
"k8s.io/client-go/tools/cache"
)
func TestDynamicSharedInformerFactory(t *testing.T) {
scenarios := []struct {
name string
existingObj *unstructured.Unstructured
gvr schema.GroupVersionResource
ns string
trigger func(gvr schema.GroupVersionResource, ns string, fakeClient *fake.FakeDynamicClient, testObject *unstructured.Unstructured) *unstructured.Unstructured
handler func(rcvCh chan<- *unstructured.Unstructured) *cache.ResourceEventHandlerFuncs
}{
// scenario 1
{
name: "scenario 1: test if adding an object triggers AddFunc",
ns: "ns-foo",
gvr: schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "deployments"},
trigger: func(gvr schema.GroupVersionResource, ns string, fakeClient *fake.FakeDynamicClient, _ *unstructured.Unstructured) *unstructured.Unstructured {
testObject := newUnstructured("extensions/v1beta1", "Deployment", "ns-foo", "name-foo")
createdObj, err := fakeClient.Resource(gvr).Namespace(ns).Create(testObject, metav1.CreateOptions{})
if err != nil {
t.Error(err)
}
return createdObj
},
handler: func(rcvCh chan<- *unstructured.Unstructured) *cache.ResourceEventHandlerFuncs {
return &cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
rcvCh <- obj.(*unstructured.Unstructured)
},
}
},
},
// scenario 2
{
name: "scenario 2: tests if updating an object triggers UpdateFunc",
ns: "ns-foo",
gvr: schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "deployments"},
existingObj: newUnstructured("extensions/v1beta1", "Deployment", "ns-foo", "name-foo"),
trigger: func(gvr schema.GroupVersionResource, ns string, fakeClient *fake.FakeDynamicClient, testObject *unstructured.Unstructured) *unstructured.Unstructured {
testObject.Object["spec"] = "updatedName"
updatedObj, err := fakeClient.Resource(gvr).Namespace(ns).Update(testObject, metav1.UpdateOptions{})
if err != nil {
t.Error(err)
}
return updatedObj
},
handler: func(rcvCh chan<- *unstructured.Unstructured) *cache.ResourceEventHandlerFuncs {
return &cache.ResourceEventHandlerFuncs{
UpdateFunc: func(old, updated interface{}) {
rcvCh <- updated.(*unstructured.Unstructured)
},
}
},
},
// scenario 3
{
name: "scenario 3: test if deleting an object triggers DeleteFunc",
ns: "ns-foo",
gvr: schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "deployments"},
existingObj: newUnstructured("extensions/v1beta1", "Deployment", "ns-foo", "name-foo"),
trigger: func(gvr schema.GroupVersionResource, ns string, fakeClient *fake.FakeDynamicClient, testObject *unstructured.Unstructured) *unstructured.Unstructured {
err := fakeClient.Resource(gvr).Namespace(ns).Delete(testObject.GetName(), &metav1.DeleteOptions{})
if err != nil {
t.Error(err)
}
return testObject
},
handler: func(rcvCh chan<- *unstructured.Unstructured) *cache.ResourceEventHandlerFuncs {
return &cache.ResourceEventHandlerFuncs{
DeleteFunc: func(obj interface{}) {
rcvCh <- obj.(*unstructured.Unstructured)
},
}
},
},
}
for _, ts := range scenarios {
t.Run(ts.name, func(t *testing.T) {
// test data
timeout := time.Duration(3 * time.Second)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
scheme := runtime.NewScheme()
informerReciveObjectCh := make(chan *unstructured.Unstructured, 1)
objs := []runtime.Object{}
if ts.existingObj != nil {
objs = append(objs, ts.existingObj)
}
fakeClient := fake.NewSimpleDynamicClient(scheme, objs...)
target := dynamicinformer.NewDynamicSharedInformerFactory(fakeClient, 0)
// act
informerListerForGvr := target.ForResource(ts.gvr)
informerListerForGvr.Informer().AddEventHandler(ts.handler(informerReciveObjectCh))
target.Start(ctx.Done())
if synced := target.WaitForCacheSync(ctx.Done()); !synced[ts.gvr] {
t.Errorf("informer for %s hasn't synced", ts.gvr)
}
testObject := ts.trigger(ts.gvr, ts.ns, fakeClient, ts.existingObj)
select {
case objFromInformer := <-informerReciveObjectCh:
if !equality.Semantic.DeepEqual(testObject, objFromInformer) {
t.Fatalf("%v", diff.ObjectDiff(testObject, objFromInformer))
}
case <-ctx.Done():
t.Errorf("tested informer haven't received an object, waited %v", timeout)
}
})
}
}
func newUnstructured(apiVersion, kind, namespace, name string) *unstructured.Unstructured {
return &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": apiVersion,
"kind": kind,
"metadata": map[string]interface{}{
"namespace": namespace,
"name": name,
},
"spec": name,
},
}
}

View File

@ -0,0 +1,34 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dynamicinformer
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/informers"
)
// DynamicSharedInformerFactory provides access to a shared informer and lister for dynamic client
type DynamicSharedInformerFactory interface {
Start(stopCh <-chan struct{})
ForResource(gvr schema.GroupVersionResource) informers.GenericInformer
WaitForCacheSync(stopCh <-chan struct{}) map[schema.GroupVersionResource]bool
}
// TweakListOptionsFunc defines the signature of a helper function
// that wants to provide more listing options to API
type TweakListOptionsFunc func(*metav1.ListOptions)

View File

@ -5,6 +5,7 @@ go_library(
srcs = [ srcs = [
"interface.go", "interface.go",
"lister.go", "lister.go",
"shim.go",
], ],
importmap = "k8s.io/kubernetes/vendor/k8s.io/client-go/dynamic/dynamiclister", importmap = "k8s.io/kubernetes/vendor/k8s.io/client-go/dynamic/dynamiclister",
importpath = "k8s.io/client-go/dynamic/dynamiclister", importpath = "k8s.io/client-go/dynamic/dynamiclister",
@ -13,6 +14,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library",
], ],

View File

@ -0,0 +1,87 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dynamiclister
import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/cache"
)
var _ cache.GenericLister = &dynamicListerShim{}
var _ cache.GenericNamespaceLister = &dynamicNamespaceListerShim{}
// dynamicListerShim implements the cache.GenericLister interface.
type dynamicListerShim struct {
lister Lister
}
// NewRuntimeObjectShim returns a new shim for Lister.
// It wraps Lister so that it implements cache.GenericLister interface
func NewRuntimeObjectShim(lister Lister) cache.GenericLister {
return &dynamicListerShim{lister: lister}
}
// List will return all objects across namespaces
func (s *dynamicListerShim) List(selector labels.Selector) (ret []runtime.Object, err error) {
objs, err := s.lister.List(selector)
if err != nil {
return nil, err
}
ret = make([]runtime.Object, len(objs))
for index, obj := range objs {
ret[index] = obj
}
return ret, err
}
// Get will attempt to retrieve assuming that name==key
func (s *dynamicListerShim) Get(name string) (runtime.Object, error) {
return s.lister.Get(name)
}
func (s *dynamicListerShim) ByNamespace(namespace string) cache.GenericNamespaceLister {
return &dynamicNamespaceListerShim{
namespaceLister: s.lister.Namespace(namespace),
}
}
// dynamicNamespaceListerShim implements the NamespaceLister interface.
// It wraps NamespaceLister so that it implements cache.GenericNamespaceLister interface
type dynamicNamespaceListerShim struct {
namespaceLister NamespaceLister
}
// List will return all objects in this namespace
func (ns *dynamicNamespaceListerShim) List(selector labels.Selector) (ret []runtime.Object, err error) {
objs, err := ns.namespaceLister.List(selector)
if err != nil {
return nil, err
}
ret = make([]runtime.Object, len(objs))
for index, obj := range objs {
ret[index] = obj
}
return ret, err
}
// Get will attempt to retrieve by namespace and name
func (ns *dynamicNamespaceListerShim) Get(name string) (runtime.Object, error) {
return ns.namespaceLister.Get(name)
}