diff --git a/pkg/kubelet/util/manager/BUILD b/pkg/kubelet/util/manager/BUILD index 60005033de..e789de207c 100644 --- a/pkg/kubelet/util/manager/BUILD +++ b/pkg/kubelet/util/manager/BUILD @@ -5,6 +5,7 @@ go_library( srcs = [ "cache_based_manager.go", "manager.go", + "watch_based_manager.go", ], importpath = "k8s.io/kubernetes/pkg/kubelet/util/manager", visibility = ["//visibility:public"], @@ -13,19 +14,28 @@ go_library( "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/fields:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", "//vendor/k8s.io/apiserver/pkg/storage/etcd:go_default_library", + "//vendor/k8s.io/client-go/tools/cache:go_default_library", ], ) go_test( name = "go_default_test", - srcs = ["cache_based_manager_test.go"], + srcs = [ + "cache_based_manager_test.go", + "watch_based_manager_test.go", + ], embed = [":go_default_library"], deps = [ "//pkg/api/v1/pod:go_default_library", + "//pkg/apis/core/v1:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", @@ -33,6 +43,8 @@ go_test( "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/kubernetes/fake:go_default_library", "//vendor/k8s.io/client-go/testing:go_default_library", diff --git a/pkg/kubelet/util/manager/watch_based_manager.go b/pkg/kubelet/util/manager/watch_based_manager.go new file mode 100644 index 0000000000..719f1f3b1d --- /dev/null +++ b/pkg/kubelet/util/manager/watch_based_manager.go @@ -0,0 +1,194 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// TODO: We did some scalability tests and using watchBasedManager +// seems to help with apiserver performance at scale visibly. +// No issues we also observed at the scale of ~200k watchers with a +// single apiserver. +// However, we need to perform more extensive testing before we +// enable this in production setups. + +package manager + +import ( + "fmt" + "sync" + "time" + + "k8s.io/api/core/v1" + "k8s.io/client-go/tools/cache" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" +) + +type listObjectFunc func(string, metav1.ListOptions) (runtime.Object, error) +type watchObjectFunc func(string, metav1.ListOptions) (watch.Interface, error) +type newObjectFunc func() runtime.Object + +// objectCacheItem is a single item stored in objectCache. +type objectCacheItem struct { + refCount int + store cache.Store + hasSynced func() (bool, error) + stopCh chan struct{} +} + +// objectCache is a local cache of objects propagated via +// individual watches. +type objectCache struct { + listObject listObjectFunc + watchObject watchObjectFunc + newObject newObjectFunc + groupResource schema.GroupResource + + lock sync.Mutex + items map[objectKey]*objectCacheItem +} + +// NewObjectCache returns a new watch-based instance of Store interface. +func NewObjectCache(listObject listObjectFunc, watchObject watchObjectFunc, newObject newObjectFunc, groupResource schema.GroupResource) Store { + return &objectCache{ + listObject: listObject, + watchObject: watchObject, + newObject: newObject, + groupResource: groupResource, + items: make(map[objectKey]*objectCacheItem), + } +} + +func (c *objectCache) newStore() cache.Store { + // TODO: We may consider created a dedicated store keeping just a single + // item, instead of using a generic store implementation for this purpose. + // However, simple benchmarks show that memory overhead in that case is + // decrease from ~600B to ~300B per object. So we are not optimizing it + // until we will see a good reason for that. + return cache.NewStore(cache.MetaNamespaceKeyFunc) +} + +func (c *objectCache) newReflector(namespace, name string) *objectCacheItem { + fieldSelector := fields.Set{"metadata.name": name}.AsSelector().String() + listFunc := func(options metav1.ListOptions) (runtime.Object, error) { + options.FieldSelector = fieldSelector + return c.listObject(namespace, options) + } + watchFunc := func(options metav1.ListOptions) (watch.Interface, error) { + options.FieldSelector = fieldSelector + return c.watchObject(namespace, options) + } + store := c.newStore() + reflector := cache.NewNamedReflector( + fmt.Sprintf("object-%q/%q", namespace, name), + &cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}, + c.newObject(), + store, + 0, + ) + stopCh := make(chan struct{}) + go reflector.Run(stopCh) + return &objectCacheItem{ + refCount: 0, + store: store, + hasSynced: func() (bool, error) { return reflector.LastSyncResourceVersion() != "", nil }, + stopCh: stopCh, + } +} + +func (c *objectCache) AddReference(namespace, name string) { + key := objectKey{namespace: namespace, name: name} + + // AddReference is called from RegisterPod thus it needs to be efficient. + // Thus, it is only increaisng refCount and in case of first registration + // of a given object it starts corresponding reflector. + // It's responsibility of the first Get operation to wait until the + // reflector propagated the store. + c.lock.Lock() + defer c.lock.Unlock() + item, exists := c.items[key] + if !exists { + item = c.newReflector(namespace, name) + c.items[key] = item + } + item.refCount++ +} + +func (c *objectCache) DeleteReference(namespace, name string) { + key := objectKey{namespace: namespace, name: name} + + c.lock.Lock() + defer c.lock.Unlock() + if item, ok := c.items[key]; ok { + item.refCount-- + if item.refCount == 0 { + // Stop the underlying reflector. + close(item.stopCh) + delete(c.items, key) + } + } +} + +// key returns key of an object with a given name and namespace. +// This has to be in-sync with cache.MetaNamespaceKeyFunc. +func (c *objectCache) key(namespace, name string) string { + if len(namespace) > 0 { + return namespace + "/" + name + } + return name +} + +func (c *objectCache) Get(namespace, name string) (runtime.Object, error) { + key := objectKey{namespace: namespace, name: name} + + c.lock.Lock() + item, exists := c.items[key] + c.lock.Unlock() + + if !exists { + return nil, fmt.Errorf("object %q/%q not registered", namespace, name) + } + if err := wait.PollImmediate(10*time.Millisecond, time.Second, item.hasSynced); err != nil { + return nil, fmt.Errorf("couldn't propagate object cache: %v", err) + } + + obj, exists, err := item.store.GetByKey(c.key(namespace, name)) + if err != nil { + return nil, err + } + if !exists { + return nil, apierrors.NewNotFound(c.groupResource, name) + } + if object, ok := obj.(runtime.Object); ok { + return object, nil + } + return nil, fmt.Errorf("unexpected object type: %v", obj) +} + +// NewWatchBasedManager creates a manager that keeps a cache of all objects +// necessary for registered pods. +// It implements the following logic: +// - whenever a pod is created or updated, we start individual watches for all +// referenced objects that aren't referenced from other registered pods +// - every GetObject() returns a value from local cache propagated via watches +func NewWatchBasedManager(listObject listObjectFunc, watchObject watchObjectFunc, newObject newObjectFunc, groupResource schema.GroupResource, getReferencedObjects func(*v1.Pod) sets.String) Manager { + objectStore := NewObjectCache(listObject, watchObject, newObject, groupResource) + return NewCacheBasedManager(objectStore, getReferencedObjects) +} diff --git a/pkg/kubelet/util/manager/watch_based_manager_test.go b/pkg/kubelet/util/manager/watch_based_manager_test.go new file mode 100644 index 0000000000..fc27a7374d --- /dev/null +++ b/pkg/kubelet/util/manager/watch_based_manager_test.go @@ -0,0 +1,184 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package manager + +import ( + "fmt" + "strings" + "testing" + "time" + + "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" + core "k8s.io/client-go/testing" + + corev1 "k8s.io/kubernetes/pkg/apis/core/v1" + + "github.com/stretchr/testify/assert" +) + +func listSecret(fakeClient clientset.Interface) listObjectFunc { + return func(namespace string, opts metav1.ListOptions) (runtime.Object, error) { + return fakeClient.CoreV1().Secrets(namespace).List(opts) + } +} + +func watchSecret(fakeClient clientset.Interface) watchObjectFunc { + return func(namespace string, opts metav1.ListOptions) (watch.Interface, error) { + return fakeClient.CoreV1().Secrets(namespace).Watch(opts) + } +} + +func newSecretCache(fakeClient clientset.Interface) *objectCache { + return &objectCache{ + listObject: listSecret(fakeClient), + watchObject: watchSecret(fakeClient), + newObject: func() runtime.Object { return &v1.Secret{} }, + groupResource: corev1.Resource("secret"), + items: make(map[objectKey]*objectCacheItem), + } +} + +func TestSecretCache(t *testing.T) { + fakeClient := &fake.Clientset{} + + listReactor := func(a core.Action) (bool, runtime.Object, error) { + result := &v1.SecretList{ + ListMeta: metav1.ListMeta{ + ResourceVersion: "123", + }, + } + return true, result, nil + } + fakeClient.AddReactor("list", "secrets", listReactor) + fakeWatch := watch.NewFake() + fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil)) + + store := newSecretCache(fakeClient) + + store.AddReference("ns", "name") + _, err := store.Get("ns", "name") + if !apierrors.IsNotFound(err) { + t.Errorf("Expected NotFound error, got: %v", err) + } + + // Eventually we should be able to read added secret. + secret := &v1.Secret{ + ObjectMeta: metav1.ObjectMeta{Name: "name", Namespace: "ns", ResourceVersion: "125"}, + } + fakeWatch.Add(secret) + getFn := func() (bool, error) { + object, err := store.Get("ns", "name") + if err != nil { + if apierrors.IsNotFound(err) { + return false, nil + } + return false, err + } + secret := object.(*v1.Secret) + if secret == nil || secret.Name != "name" || secret.Namespace != "ns" { + return false, fmt.Errorf("unexpected secret: %v", secret) + } + return true, nil + } + if err := wait.PollImmediate(10*time.Millisecond, time.Second, getFn); err != nil { + t.Errorf("unexpected error: %v", err) + } + + // Eventually we should observer secret deletion. + fakeWatch.Delete(secret) + getFn = func() (bool, error) { + _, err := store.Get("ns", "name") + if err != nil { + if apierrors.IsNotFound(err) { + return true, nil + } + return false, err + } + return false, nil + } + if err := wait.PollImmediate(10*time.Millisecond, time.Second, getFn); err != nil { + t.Errorf("unexpected error: %v", err) + } + + store.DeleteReference("ns", "name") + _, err = store.Get("ns", "name") + if err == nil || !strings.Contains(err.Error(), "not registered") { + t.Errorf("unexpected error: %v", err) + } +} + +func TestSecretCacheMultipleRegistrations(t *testing.T) { + fakeClient := &fake.Clientset{} + + listReactor := func(a core.Action) (bool, runtime.Object, error) { + result := &v1.SecretList{ + ListMeta: metav1.ListMeta{ + ResourceVersion: "123", + }, + } + return true, result, nil + } + fakeClient.AddReactor("list", "secrets", listReactor) + fakeWatch := watch.NewFake() + fakeClient.AddWatchReactor("secrets", core.DefaultWatchReactor(fakeWatch, nil)) + + store := newSecretCache(fakeClient) + + store.AddReference("ns", "name") + // This should trigger List and Watch actions eventually. + actionsFn := func() (bool, error) { + actions := fakeClient.Actions() + if len(actions) > 2 { + return false, fmt.Errorf("too many actions: %v", actions) + } + if len(actions) < 2 { + return false, nil + } + if actions[0].GetVerb() != "list" || actions[1].GetVerb() != "watch" { + return false, fmt.Errorf("unexpected actions: %v", actions) + } + return true, nil + } + if err := wait.PollImmediate(10*time.Millisecond, time.Second, actionsFn); err != nil { + t.Errorf("unexpected error: %v", err) + } + + // Next registrations shouldn't trigger any new actions. + for i := 0; i < 20; i++ { + store.AddReference("ns", "name") + store.DeleteReference("ns", "name") + } + actions := fakeClient.Actions() + assert.Equal(t, 2, len(actions), "unexpected actions: %#v", actions) + + // Final delete also doesn't trigger any action. + store.DeleteReference("ns", "name") + _, err := store.Get("ns", "name") + if err == nil || !strings.Contains(err.Error(), "not registered") { + t.Errorf("unexpected error: %v", err) + } + actions = fakeClient.Actions() + assert.Equal(t, 2, len(actions), "unexpected actions: %#v", actions) +}