Prevent quota controller using unsynced listers

k3s-v1.15.3
Jordan Liggitt 2019-03-13 18:49:56 -07:00
parent 8146fe47d8
commit ec7a04bd20
3 changed files with 212 additions and 6 deletions

View File

@ -1,9 +1,6 @@
package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
)
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
@ -38,3 +35,14 @@ filegroup(
srcs = [":package-srcs"],
tags = ["automanaged"],
)
go_test(
name = "go_default_test",
srcs = ["evaluator_test.go"],
embed = [":go_default_library"],
deps = [
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
],
)

View File

@ -18,6 +18,7 @@ package generic
import (
"fmt"
"sync/atomic"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
@ -33,17 +34,83 @@ import (
// InformerForResourceFunc knows how to provision an informer
type InformerForResourceFunc func(schema.GroupVersionResource) (informers.GenericInformer, error)
// ListerFuncForResourceFunc knows how to provision a lister from an informer func
// ListerFuncForResourceFunc knows how to provision a lister from an informer func.
// The lister returns errors until the informer has synced.
func ListerFuncForResourceFunc(f InformerForResourceFunc) quota.ListerForResourceFunc {
return func(gvr schema.GroupVersionResource) (cache.GenericLister, error) {
informer, err := f(gvr)
if err != nil {
return nil, err
}
return informer.Lister(), nil
return &protectedLister{
hasSynced: cachedHasSynced(informer.Informer().HasSynced),
notReadyErr: fmt.Errorf("%v not yet synced", gvr),
delegate: informer.Lister(),
}, nil
}
}
// cachedHasSynced returns a function that calls hasSynced() until it returns true once, then returns true
func cachedHasSynced(hasSynced func() bool) func() bool {
cache := &atomic.Value{}
cache.Store(false)
return func() bool {
if cache.Load().(bool) {
// short-circuit if already synced
return true
}
if hasSynced() {
// remember we synced
cache.Store(true)
return true
}
return false
}
}
// protectedLister returns notReadyError if hasSynced returns false, otherwise delegates to delegate
type protectedLister struct {
hasSynced func() bool
notReadyErr error
delegate cache.GenericLister
}
func (p *protectedLister) List(selector labels.Selector) (ret []runtime.Object, err error) {
if !p.hasSynced() {
return nil, p.notReadyErr
}
return p.delegate.List(selector)
}
func (p *protectedLister) Get(name string) (runtime.Object, error) {
if !p.hasSynced() {
return nil, p.notReadyErr
}
return p.delegate.Get(name)
}
func (p *protectedLister) ByNamespace(namespace string) cache.GenericNamespaceLister {
return &protectedNamespaceLister{p.hasSynced, p.notReadyErr, p.delegate.ByNamespace(namespace)}
}
// protectedNamespaceLister returns notReadyError if hasSynced returns false, otherwise delegates to delegate
type protectedNamespaceLister struct {
hasSynced func() bool
notReadyErr error
delegate cache.GenericNamespaceLister
}
func (p *protectedNamespaceLister) List(selector labels.Selector) (ret []runtime.Object, err error) {
if !p.hasSynced() {
return nil, p.notReadyErr
}
return p.delegate.List(selector)
}
func (p *protectedNamespaceLister) Get(name string) (runtime.Object, error) {
if !p.hasSynced() {
return nil, p.notReadyErr
}
return p.delegate.Get(name)
}
// ListResourceUsingListerFunc returns a listing function based on the shared informer factory for the specified resource.
func ListResourceUsingListerFunc(l quota.ListerForResourceFunc, resource schema.GroupVersionResource) ListFuncByNamespace {
return func(namespace string) ([]runtime.Object, error) {

View File

@ -0,0 +1,131 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package generic
import (
"errors"
"testing"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/cache"
)
func TestCachedHasSynced(t *testing.T) {
called := 0
result := false
cachedFunc := cachedHasSynced(func() bool {
called++
return result
})
if cachedFunc() {
t.Fatal("expected false")
}
if called != 1 {
t.Fatalf("expected called=1, got %d", called)
}
if cachedFunc() {
t.Fatal("expected false")
}
if called != 2 {
t.Fatalf("expected called=2, got %d", called)
}
result = true
if !cachedFunc() {
t.Fatal("expected true")
}
if called != 3 {
t.Fatalf("expected called=3, got %d", called)
}
if !cachedFunc() {
t.Fatal("expected true")
}
if called != 3 {
// no more calls once we return true
t.Fatalf("expected called=3, got %d", called)
}
}
func TestProtectedLister(t *testing.T) {
hasSynced := false
notReadyErr := errors.New("not ready")
fake := &fakeLister{}
l := &protectedLister{
hasSynced: func() bool { return hasSynced },
notReadyErr: notReadyErr,
delegate: fake,
}
if _, err := l.List(nil); err != notReadyErr {
t.Fatalf("expected %v, got %v", notReadyErr, err)
}
if _, err := l.Get(""); err != notReadyErr {
t.Fatalf("expected %v, got %v", notReadyErr, err)
}
if fake.called != 0 {
t.Fatalf("expected called=0, got %d", fake.called)
}
fake.called = 0
hasSynced = true
if _, err := l.List(nil); err != errFakeLister {
t.Fatalf("expected %v, got %v", errFakeLister, err)
}
if _, err := l.Get(""); err != errFakeLister {
t.Fatalf("expected %v, got %v", errFakeLister, err)
}
if fake.called != 2 {
t.Fatalf("expected called=2, got %d", fake.called)
}
fake.called = 0
hasSynced = false
if _, err := l.List(nil); err != notReadyErr {
t.Fatalf("expected %v, got %v", notReadyErr, err)
}
if _, err := l.Get(""); err != notReadyErr {
t.Fatalf("expected %v, got %v", notReadyErr, err)
}
if fake.called != 0 {
t.Fatalf("expected called=2, got %d", fake.called)
}
}
var errFakeLister = errors.New("errFakeLister")
type fakeLister struct {
called int
}
func (f *fakeLister) List(selector labels.Selector) (ret []runtime.Object, err error) {
f.called++
return nil, errFakeLister
}
func (f *fakeLister) Get(name string) (runtime.Object, error) {
f.called++
return nil, errFakeLister
}
func (f *fakeLister) ByNamespace(namespace string) cache.GenericNamespaceLister {
panic("not implemented")
}