mirror of https://github.com/k3s-io/k3s
WatchingSecretManager
parent
a481f4bbe8
commit
068844aeb1
|
@ -5,6 +5,7 @@ go_library(
|
||||||
srcs = [
|
srcs = [
|
||||||
"cache_based_manager.go",
|
"cache_based_manager.go",
|
||||||
"manager.go",
|
"manager.go",
|
||||||
|
"watch_based_manager.go",
|
||||||
],
|
],
|
||||||
importpath = "k8s.io/kubernetes/pkg/kubelet/util/manager",
|
importpath = "k8s.io/kubernetes/pkg/kubelet/util/manager",
|
||||||
visibility = ["//visibility:public"],
|
visibility = ["//visibility:public"],
|
||||||
|
@ -13,19 +14,28 @@ go_library(
|
||||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
|
||||||
"//vendor/k8s.io/apiserver/pkg/storage/etcd:go_default_library",
|
"//vendor/k8s.io/apiserver/pkg/storage/etcd:go_default_library",
|
||||||
|
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
go_test(
|
go_test(
|
||||||
name = "go_default_test",
|
name = "go_default_test",
|
||||||
srcs = ["cache_based_manager_test.go"],
|
srcs = [
|
||||||
|
"cache_based_manager_test.go",
|
||||||
|
"watch_based_manager_test.go",
|
||||||
|
],
|
||||||
embed = [":go_default_library"],
|
embed = [":go_default_library"],
|
||||||
deps = [
|
deps = [
|
||||||
"//pkg/api/v1/pod:go_default_library",
|
"//pkg/api/v1/pod:go_default_library",
|
||||||
|
"//pkg/apis/core/v1:go_default_library",
|
||||||
"//vendor/github.com/stretchr/testify/assert:go_default_library",
|
"//vendor/github.com/stretchr/testify/assert:go_default_library",
|
||||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||||
|
@ -33,6 +43,8 @@ go_test(
|
||||||
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
||||||
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||||
|
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
|
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
|
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
|
||||||
"//vendor/k8s.io/client-go/testing:go_default_library",
|
"//vendor/k8s.io/client-go/testing:go_default_library",
|
||||||
|
|
|
@ -0,0 +1,194 @@
|
||||||
|
/*
|
||||||
|
Copyright 2018 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
// TODO: We did some scalability tests and using watchBasedManager
|
||||||
|
// seems to help with apiserver performance at scale visibly.
|
||||||
|
// No issues we also observed at the scale of ~200k watchers with a
|
||||||
|
// single apiserver.
|
||||||
|
// However, we need to perform more extensive testing before we
|
||||||
|
// enable this in production setups.
|
||||||
|
|
||||||
|
package manager
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"k8s.io/api/core/v1"
|
||||||
|
"k8s.io/client-go/tools/cache"
|
||||||
|
|
||||||
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/fields"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
|
)
|
||||||
|
|
||||||
|
type listObjectFunc func(string, metav1.ListOptions) (runtime.Object, error)
|
||||||
|
type watchObjectFunc func(string, metav1.ListOptions) (watch.Interface, error)
|
||||||
|
type newObjectFunc func() runtime.Object
|
||||||
|
|
||||||
|
// objectCacheItem is a single item stored in objectCache.
|
||||||
|
type objectCacheItem struct {
|
||||||
|
refCount int
|
||||||
|
store cache.Store
|
||||||
|
hasSynced func() (bool, error)
|
||||||
|
stopCh chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// objectCache is a local cache of objects propagated via
|
||||||
|
// individual watches.
|
||||||
|
type objectCache struct {
|
||||||
|
listObject listObjectFunc
|
||||||
|
watchObject watchObjectFunc
|
||||||
|
newObject newObjectFunc
|
||||||
|
groupResource schema.GroupResource
|
||||||
|
|
||||||
|
lock sync.Mutex
|
||||||
|
items map[objectKey]*objectCacheItem
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewObjectCache returns a new watch-based instance of Store interface.
|
||||||
|
func NewObjectCache(listObject listObjectFunc, watchObject watchObjectFunc, newObject newObjectFunc, groupResource schema.GroupResource) Store {
|
||||||
|
return &objectCache{
|
||||||
|
listObject: listObject,
|
||||||
|
watchObject: watchObject,
|
||||||
|
newObject: newObject,
|
||||||
|
groupResource: groupResource,
|
||||||
|
items: make(map[objectKey]*objectCacheItem),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *objectCache) newStore() cache.Store {
|
||||||
|
// TODO: We may consider created a dedicated store keeping just a single
|
||||||
|
// item, instead of using a generic store implementation for this purpose.
|
||||||
|
// However, simple benchmarks show that memory overhead in that case is
|
||||||
|
// decrease from ~600B to ~300B per object. So we are not optimizing it
|
||||||
|
// until we will see a good reason for that.
|
||||||
|
return cache.NewStore(cache.MetaNamespaceKeyFunc)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *objectCache) newReflector(namespace, name string) *objectCacheItem {
|
||||||
|
fieldSelector := fields.Set{"metadata.name": name}.AsSelector().String()
|
||||||
|
listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
|
||||||
|
options.FieldSelector = fieldSelector
|
||||||
|
return c.listObject(namespace, options)
|
||||||
|
}
|
||||||
|
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
|
||||||
|
options.FieldSelector = fieldSelector
|
||||||
|
return c.watchObject(namespace, options)
|
||||||
|
}
|
||||||
|
store := c.newStore()
|
||||||
|
reflector := cache.NewNamedReflector(
|
||||||
|
fmt.Sprintf("object-%q/%q", namespace, name),
|
||||||
|
&cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc},
|
||||||
|
c.newObject(),
|
||||||
|
store,
|
||||||
|
0,
|
||||||
|
)
|
||||||
|
stopCh := make(chan struct{})
|
||||||
|
go reflector.Run(stopCh)
|
||||||
|
return &objectCacheItem{
|
||||||
|
refCount: 0,
|
||||||
|
store: store,
|
||||||
|
hasSynced: func() (bool, error) { return reflector.LastSyncResourceVersion() != "", nil },
|
||||||
|
stopCh: stopCh,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *objectCache) AddReference(namespace, name string) {
|
||||||
|
key := objectKey{namespace: namespace, name: name}
|
||||||
|
|
||||||
|
// AddReference is called from RegisterPod thus it needs to be efficient.
|
||||||
|
// Thus, it is only increaisng refCount and in case of first registration
|
||||||
|
// of a given object it starts corresponding reflector.
|
||||||
|
// It's responsibility of the first Get operation to wait until the
|
||||||
|
// reflector propagated the store.
|
||||||
|
c.lock.Lock()
|
||||||
|
defer c.lock.Unlock()
|
||||||
|
item, exists := c.items[key]
|
||||||
|
if !exists {
|
||||||
|
item = c.newReflector(namespace, name)
|
||||||
|
c.items[key] = item
|
||||||
|
}
|
||||||
|
item.refCount++
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *objectCache) DeleteReference(namespace, name string) {
|
||||||
|
key := objectKey{namespace: namespace, name: name}
|
||||||
|
|
||||||
|
c.lock.Lock()
|
||||||
|
defer c.lock.Unlock()
|
||||||
|
if item, ok := c.items[key]; ok {
|
||||||
|
item.refCount--
|
||||||
|
if item.refCount == 0 {
|
||||||
|
// Stop the underlying reflector.
|
||||||
|
close(item.stopCh)
|
||||||
|
delete(c.items, key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// key returns key of an object with a given name and namespace.
|
||||||
|
// This has to be in-sync with cache.MetaNamespaceKeyFunc.
|
||||||
|
func (c *objectCache) key(namespace, name string) string {
|
||||||
|
if len(namespace) > 0 {
|
||||||
|
return namespace + "/" + name
|
||||||
|
}
|
||||||
|
return name
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *objectCache) Get(namespace, name string) (runtime.Object, error) {
|
||||||
|
key := objectKey{namespace: namespace, name: name}
|
||||||
|
|
||||||
|
c.lock.Lock()
|
||||||
|
item, exists := c.items[key]
|
||||||
|
c.lock.Unlock()
|
||||||
|
|
||||||
|
if !exists {
|
||||||
|
return nil, fmt.Errorf("object %q/%q not registered", namespace, name)
|
||||||
|
}
|
||||||
|
if err := wait.PollImmediate(10*time.Millisecond, time.Second, item.hasSynced); err != nil {
|
||||||
|
return nil, fmt.Errorf("couldn't propagate object cache: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
obj, exists, err := item.store.GetByKey(c.key(namespace, name))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if !exists {
|
||||||
|
return nil, apierrors.NewNotFound(c.groupResource, name)
|
||||||
|
}
|
||||||
|
if object, ok := obj.(runtime.Object); ok {
|
||||||
|
return object, nil
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("unexpected object type: %v", obj)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewWatchBasedManager creates a manager that keeps a cache of all objects
|
||||||
|
// necessary for registered pods.
|
||||||
|
// It implements the following logic:
|
||||||
|
// - whenever a pod is created or updated, we start individual watches for all
|
||||||
|
// referenced objects that aren't referenced from other registered pods
|
||||||
|
// - every GetObject() returns a value from local cache propagated via watches
|
||||||
|
func NewWatchBasedManager(listObject listObjectFunc, watchObject watchObjectFunc, newObject newObjectFunc, groupResource schema.GroupResource, getReferencedObjects func(*v1.Pod) sets.String) Manager {
|
||||||
|
objectStore := NewObjectCache(listObject, watchObject, newObject, groupResource)
|
||||||
|
return NewCacheBasedManager(objectStore, getReferencedObjects)
|
||||||
|
}
|
|
@ -0,0 +1,184 @@
|
||||||
|
/*
|
||||||
|
Copyright 2018 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package manager
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"k8s.io/api/core/v1"
|
||||||
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
|
|
||||||
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
|
"k8s.io/client-go/kubernetes/fake"
|
||||||
|
core "k8s.io/client-go/testing"
|
||||||
|
|
||||||
|
corev1 "k8s.io/kubernetes/pkg/apis/core/v1"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
)
|
||||||
|
|
||||||
|
func listSecret(fakeClient clientset.Interface) listObjectFunc {
|
||||||
|
return func(namespace string, opts metav1.ListOptions) (runtime.Object, error) {
|
||||||
|
return fakeClient.CoreV1().Secrets(namespace).List(opts)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func watchSecret(fakeClient clientset.Interface) watchObjectFunc {
|
||||||
|
return func(namespace string, opts metav1.ListOptions) (watch.Interface, error) {
|
||||||
|
return fakeClient.CoreV1().Secrets(namespace).Watch(opts)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newSecretCache(fakeClient clientset.Interface) *objectCache {
|
||||||
|
return &objectCache{
|
||||||
|
listObject: listSecret(fakeClient),
|
||||||
|
watchObject: watchSecret(fakeClient),
|
||||||
|
newObject: func() runtime.Object { return &v1.Secret{} },
|
||||||
|
groupResource: corev1.Resource("secret"),
|
||||||
|
items: make(map[objectKey]*objectCacheItem),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSecretCache(t *testing.T) {
|
||||||
|
fakeClient := &fake.Clientset{}
|
||||||
|
|
||||||
|
listReactor := func(a core.Action) (bool, runtime.Object, error) {
|
||||||
|
result := &v1.SecretList{
|
||||||
|
ListMeta: metav1.ListMeta{
|
||||||
|
ResourceVersion: "123",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return true, result, nil
|
||||||
|
}
|
||||||
|
fakeClient.AddReactor("list", "secrets", listReactor)
|
||||||
|
fakeWatch := watch.NewFake()
|
||||||
|
fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil))
|
||||||
|
|
||||||
|
store := newSecretCache(fakeClient)
|
||||||
|
|
||||||
|
store.AddReference("ns", "name")
|
||||||
|
_, err := store.Get("ns", "name")
|
||||||
|
if !apierrors.IsNotFound(err) {
|
||||||
|
t.Errorf("Expected NotFound error, got: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Eventually we should be able to read added secret.
|
||||||
|
secret := &v1.Secret{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: "name", Namespace: "ns", ResourceVersion: "125"},
|
||||||
|
}
|
||||||
|
fakeWatch.Add(secret)
|
||||||
|
getFn := func() (bool, error) {
|
||||||
|
object, err := store.Get("ns", "name")
|
||||||
|
if err != nil {
|
||||||
|
if apierrors.IsNotFound(err) {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
secret := object.(*v1.Secret)
|
||||||
|
if secret == nil || secret.Name != "name" || secret.Namespace != "ns" {
|
||||||
|
return false, fmt.Errorf("unexpected secret: %v", secret)
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
if err := wait.PollImmediate(10*time.Millisecond, time.Second, getFn); err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Eventually we should observer secret deletion.
|
||||||
|
fakeWatch.Delete(secret)
|
||||||
|
getFn = func() (bool, error) {
|
||||||
|
_, err := store.Get("ns", "name")
|
||||||
|
if err != nil {
|
||||||
|
if apierrors.IsNotFound(err) {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
if err := wait.PollImmediate(10*time.Millisecond, time.Second, getFn); err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
store.DeleteReference("ns", "name")
|
||||||
|
_, err = store.Get("ns", "name")
|
||||||
|
if err == nil || !strings.Contains(err.Error(), "not registered") {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSecretCacheMultipleRegistrations(t *testing.T) {
|
||||||
|
fakeClient := &fake.Clientset{}
|
||||||
|
|
||||||
|
listReactor := func(a core.Action) (bool, runtime.Object, error) {
|
||||||
|
result := &v1.SecretList{
|
||||||
|
ListMeta: metav1.ListMeta{
|
||||||
|
ResourceVersion: "123",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return true, result, nil
|
||||||
|
}
|
||||||
|
fakeClient.AddReactor("list", "secrets", listReactor)
|
||||||
|
fakeWatch := watch.NewFake()
|
||||||
|
fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil))
|
||||||
|
|
||||||
|
store := newSecretCache(fakeClient)
|
||||||
|
|
||||||
|
store.AddReference("ns", "name")
|
||||||
|
// This should trigger List and Watch actions eventually.
|
||||||
|
actionsFn := func() (bool, error) {
|
||||||
|
actions := fakeClient.Actions()
|
||||||
|
if len(actions) > 2 {
|
||||||
|
return false, fmt.Errorf("too many actions: %v", actions)
|
||||||
|
}
|
||||||
|
if len(actions) < 2 {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
if actions[0].GetVerb() != "list" || actions[1].GetVerb() != "watch" {
|
||||||
|
return false, fmt.Errorf("unexpected actions: %v", actions)
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
if err := wait.PollImmediate(10*time.Millisecond, time.Second, actionsFn); err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Next registrations shouldn't trigger any new actions.
|
||||||
|
for i := 0; i < 20; i++ {
|
||||||
|
store.AddReference("ns", "name")
|
||||||
|
store.DeleteReference("ns", "name")
|
||||||
|
}
|
||||||
|
actions := fakeClient.Actions()
|
||||||
|
assert.Equal(t, 2, len(actions), "unexpected actions: %#v", actions)
|
||||||
|
|
||||||
|
// Final delete also doesn't trigger any action.
|
||||||
|
store.DeleteReference("ns", "name")
|
||||||
|
_, err := store.Get("ns", "name")
|
||||||
|
if err == nil || !strings.Contains(err.Error(), "not registered") {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
actions = fakeClient.Actions()
|
||||||
|
assert.Equal(t, 2, len(actions), "unexpected actions: %#v", actions)
|
||||||
|
}
|
Loading…
Reference in New Issue