mirror of https://github.com/k3s-io/k3s
Refactor ConfigMapManager
parent
a481f4bbe8
commit
01e58de70c
|
@ -15,13 +15,12 @@ go_library(
|
|||
importpath = "k8s.io/kubernetes/pkg/kubelet/configmap",
|
||||
deps = [
|
||||
"//pkg/api/v1/pod:go_default_library",
|
||||
"//pkg/kubelet/util:go_default_library",
|
||||
"//pkg/kubelet/util/manager:go_default_library",
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/storage/etcd:go_default_library",
|
||||
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
|
||||
],
|
||||
)
|
||||
|
@ -44,13 +43,12 @@ go_test(
|
|||
srcs = ["configmap_manager_test.go"],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//vendor/github.com/stretchr/testify/assert:go_default_library",
|
||||
"//pkg/kubelet/util/manager:go_default_library",
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library",
|
||||
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
|
||||
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
|
||||
"//vendor/k8s.io/client-go/testing:go_default_library",
|
||||
],
|
||||
)
|
||||
|
|
|
@ -18,26 +18,19 @@ package configmap
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
storageetcd "k8s.io/apiserver/pkg/storage/etcd"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util/manager"
|
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultTTL = time.Minute
|
||||
)
|
||||
|
||||
type Manager interface {
|
||||
// Get configmap by configmap namespace and name.
|
||||
GetConfigMap(namespace, name string) (*v1.ConfigMap, error)
|
||||
|
@ -73,191 +66,31 @@ func (s *simpleConfigMapManager) RegisterPod(pod *v1.Pod) {
|
|||
func (s *simpleConfigMapManager) UnregisterPod(pod *v1.Pod) {
|
||||
}
|
||||
|
||||
type GetObjectTTLFunc func() (time.Duration, bool)
|
||||
|
||||
type objectKey struct {
|
||||
namespace string
|
||||
name string
|
||||
// configMapManager keeps a cache of all configmaps necessary
|
||||
// for registered pods. Different implementation of the store
|
||||
// may result in different semantics for freshness of configmaps
|
||||
// (e.g. ttl-based implementation vs watch-based implementation).
|
||||
type configMapManager struct {
|
||||
manager manager.Manager
|
||||
}
|
||||
|
||||
// configMapStoreItems is a single item stored in configMapStore.
|
||||
type configMapStoreItem struct {
|
||||
refCount int
|
||||
configMap *configMapData
|
||||
}
|
||||
|
||||
type configMapData struct {
|
||||
sync.Mutex
|
||||
|
||||
configMap *v1.ConfigMap
|
||||
err error
|
||||
lastUpdateTime time.Time
|
||||
}
|
||||
|
||||
// configMapStore is a local cache of configmaps.
|
||||
type configMapStore struct {
|
||||
kubeClient clientset.Interface
|
||||
clock clock.Clock
|
||||
|
||||
lock sync.Mutex
|
||||
items map[objectKey]*configMapStoreItem
|
||||
|
||||
defaultTTL time.Duration
|
||||
getTTL GetObjectTTLFunc
|
||||
}
|
||||
|
||||
func newConfigMapStore(kubeClient clientset.Interface, clock clock.Clock, getTTL GetObjectTTLFunc, ttl time.Duration) *configMapStore {
|
||||
return &configMapStore{
|
||||
kubeClient: kubeClient,
|
||||
clock: clock,
|
||||
items: make(map[objectKey]*configMapStoreItem),
|
||||
defaultTTL: ttl,
|
||||
getTTL: getTTL,
|
||||
func (c *configMapManager) GetConfigMap(namespace, name string) (*v1.ConfigMap, error) {
|
||||
object, err := c.manager.GetObject(namespace, name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
func isConfigMapOlder(newConfigMap, oldConfigMap *v1.ConfigMap) bool {
|
||||
if newConfigMap == nil || oldConfigMap == nil {
|
||||
return false
|
||||
if configmap, ok := object.(*v1.ConfigMap); ok {
|
||||
return configmap, nil
|
||||
}
|
||||
newVersion, _ := storageetcd.Versioner.ObjectResourceVersion(newConfigMap)
|
||||
oldVersion, _ := storageetcd.Versioner.ObjectResourceVersion(oldConfigMap)
|
||||
return newVersion < oldVersion
|
||||
return nil, fmt.Errorf("unexpected object type: %v", object)
|
||||
}
|
||||
|
||||
func (s *configMapStore) Add(namespace, name string) {
|
||||
key := objectKey{namespace: namespace, name: name}
|
||||
|
||||
// Add is called from RegisterPod, thus it needs to be efficient.
|
||||
// Thus Add() is only increasing refCount and generation of a given configmap.
|
||||
// Then Get() is responsible for fetching if needed.
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
item, exists := s.items[key]
|
||||
if !exists {
|
||||
item = &configMapStoreItem{
|
||||
refCount: 0,
|
||||
configMap: &configMapData{},
|
||||
}
|
||||
s.items[key] = item
|
||||
}
|
||||
|
||||
item.refCount++
|
||||
// This will trigger fetch on the next Get() operation.
|
||||
item.configMap = nil
|
||||
func (c *configMapManager) RegisterPod(pod *v1.Pod) {
|
||||
c.manager.RegisterPod(pod)
|
||||
}
|
||||
|
||||
func (s *configMapStore) Delete(namespace, name string) {
|
||||
key := objectKey{namespace: namespace, name: name}
|
||||
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
if item, ok := s.items[key]; ok {
|
||||
item.refCount--
|
||||
if item.refCount == 0 {
|
||||
delete(s.items, key)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func GetObjectTTLFromNodeFunc(getNode func() (*v1.Node, error)) GetObjectTTLFunc {
|
||||
return func() (time.Duration, bool) {
|
||||
node, err := getNode()
|
||||
if err != nil {
|
||||
return time.Duration(0), false
|
||||
}
|
||||
if node != nil && node.Annotations != nil {
|
||||
if value, ok := node.Annotations[v1.ObjectTTLAnnotationKey]; ok {
|
||||
if intValue, err := strconv.Atoi(value); err == nil {
|
||||
return time.Duration(intValue) * time.Second, true
|
||||
}
|
||||
}
|
||||
}
|
||||
return time.Duration(0), false
|
||||
}
|
||||
}
|
||||
|
||||
func (s *configMapStore) isConfigMapFresh(data *configMapData) bool {
|
||||
configMapTTL := s.defaultTTL
|
||||
if ttl, ok := s.getTTL(); ok {
|
||||
configMapTTL = ttl
|
||||
}
|
||||
return s.clock.Now().Before(data.lastUpdateTime.Add(configMapTTL))
|
||||
}
|
||||
|
||||
func (s *configMapStore) Get(namespace, name string) (*v1.ConfigMap, error) {
|
||||
key := objectKey{namespace: namespace, name: name}
|
||||
|
||||
data := func() *configMapData {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
item, exists := s.items[key]
|
||||
if !exists {
|
||||
return nil
|
||||
}
|
||||
if item.configMap == nil {
|
||||
item.configMap = &configMapData{}
|
||||
}
|
||||
return item.configMap
|
||||
}()
|
||||
if data == nil {
|
||||
return nil, fmt.Errorf("configmap %q/%q not registered", namespace, name)
|
||||
}
|
||||
|
||||
// After updating data in configMapStore, lock the data, fetch configMap if
|
||||
// needed and return data.
|
||||
data.Lock()
|
||||
defer data.Unlock()
|
||||
if data.err != nil || !s.isConfigMapFresh(data) {
|
||||
opts := metav1.GetOptions{}
|
||||
if data.configMap != nil && data.err == nil {
|
||||
// This is just a periodic refresh of a configmap we successfully fetched previously.
|
||||
// In this case, server data from apiserver cache to reduce the load on both
|
||||
// etcd and apiserver (the cache is eventually consistent).
|
||||
util.FromApiserverCache(&opts)
|
||||
}
|
||||
configMap, err := s.kubeClient.CoreV1().ConfigMaps(namespace).Get(name, opts)
|
||||
if err != nil && !apierrors.IsNotFound(err) && data.configMap == nil && data.err == nil {
|
||||
// Couldn't fetch the latest configmap, but there is no cached data to return.
|
||||
// Return the fetch result instead.
|
||||
return configMap, err
|
||||
}
|
||||
if (err == nil && !isConfigMapOlder(configMap, data.configMap)) || apierrors.IsNotFound(err) {
|
||||
// If the fetch succeeded with a newer version of the configmap, or if the
|
||||
// configmap could not be found in the apiserver, update the cached data to
|
||||
// reflect the current status.
|
||||
data.configMap = configMap
|
||||
data.err = err
|
||||
data.lastUpdateTime = s.clock.Now()
|
||||
}
|
||||
}
|
||||
return data.configMap, data.err
|
||||
}
|
||||
|
||||
// cachingConfigMapManager keeps a cache of all configmaps necessary for registered pods.
|
||||
// It implements the following logic:
|
||||
// - whenever a pod is created or updated, the cached versions of all its configmaps
|
||||
// are invalidated
|
||||
// - every GetConfigMap() call tries to fetch the value from local cache; if it is
|
||||
// not there, invalidated or too old, we fetch it from apiserver and refresh the
|
||||
// value in cache; otherwise it is just fetched from cache
|
||||
type cachingConfigMapManager struct {
|
||||
configMapStore *configMapStore
|
||||
|
||||
lock sync.Mutex
|
||||
registeredPods map[objectKey]*v1.Pod
|
||||
}
|
||||
|
||||
func NewCachingConfigMapManager(kubeClient clientset.Interface, getTTL GetObjectTTLFunc) Manager {
|
||||
csm := &cachingConfigMapManager{
|
||||
configMapStore: newConfigMapStore(kubeClient, clock.RealClock{}, getTTL, defaultTTL),
|
||||
registeredPods: make(map[objectKey]*v1.Pod),
|
||||
}
|
||||
return csm
|
||||
}
|
||||
|
||||
func (c *cachingConfigMapManager) GetConfigMap(namespace, name string) (*v1.ConfigMap, error) {
|
||||
return c.configMapStore.Get(namespace, name)
|
||||
func (c *configMapManager) UnregisterPod(pod *v1.Pod) {
|
||||
c.manager.UnregisterPod(pod)
|
||||
}
|
||||
|
||||
func getConfigMapNames(pod *v1.Pod) sets.String {
|
||||
|
@ -269,39 +102,24 @@ func getConfigMapNames(pod *v1.Pod) sets.String {
|
|||
return result
|
||||
}
|
||||
|
||||
func (c *cachingConfigMapManager) RegisterPod(pod *v1.Pod) {
|
||||
names := getConfigMapNames(pod)
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
for name := range names {
|
||||
c.configMapStore.Add(pod.Namespace, name)
|
||||
}
|
||||
var prev *v1.Pod
|
||||
key := objectKey{namespace: pod.Namespace, name: pod.Name}
|
||||
prev = c.registeredPods[key]
|
||||
c.registeredPods[key] = pod
|
||||
if prev != nil {
|
||||
for name := range getConfigMapNames(prev) {
|
||||
// On an update, the .Add() call above will have re-incremented the
|
||||
// ref count of any existing items, so any configmaps that are in both
|
||||
// names and prev need to have their ref counts decremented. Any that
|
||||
// are only in prev need to be completely removed. This unconditional
|
||||
// call takes care of both cases.
|
||||
c.configMapStore.Delete(prev.Namespace, name)
|
||||
}
|
||||
}
|
||||
}
|
||||
const (
|
||||
defaultTTL = time.Minute
|
||||
)
|
||||
|
||||
func (c *cachingConfigMapManager) UnregisterPod(pod *v1.Pod) {
|
||||
var prev *v1.Pod
|
||||
key := objectKey{namespace: pod.Namespace, name: pod.Name}
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
prev = c.registeredPods[key]
|
||||
delete(c.registeredPods, key)
|
||||
if prev != nil {
|
||||
for name := range getConfigMapNames(prev) {
|
||||
c.configMapStore.Delete(prev.Namespace, name)
|
||||
}
|
||||
// NewCachingConfigMapManager creates a manager that keeps a cache of all configmaps
|
||||
// necessary for registered pods.
|
||||
// It implement the following logic:
|
||||
// - whenever a pod is create or updated, the cached versions of all configmaps
|
||||
// are invalidated
|
||||
// - every GetObject() call tries to fetch the value from local cache; if it is
|
||||
// not there, invalidated or too old, we fetch it from apiserver and refresh the
|
||||
// value in cache; otherwise it is just fetched from cache
|
||||
func NewCachingConfigMapManager(kubeClient clientset.Interface, getTTL manager.GetObjectTTLFunc) Manager {
|
||||
getConfigMap := func(namespace, name string, opts metav1.GetOptions) (runtime.Object, error) {
|
||||
return kubeClient.CoreV1().ConfigMaps(namespace).Get(name, opts)
|
||||
}
|
||||
configMapStore := manager.NewObjectStore(getConfigMap, clock.RealClock{}, getTTL, defaultTTL)
|
||||
return &configMapManager{
|
||||
manager: manager.NewCacheBasedManager(configMapStore, getConfigMapNames),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,30 +18,27 @@ package configmap
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
core "k8s.io/client-go/testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util/manager"
|
||||
)
|
||||
|
||||
func checkConfigMap(t *testing.T, store *configMapStore, ns, name string, shouldExist bool) {
|
||||
func checkObject(t *testing.T, store manager.Store, ns, name string, shouldExist bool) {
|
||||
_, err := store.Get(ns, name)
|
||||
if shouldExist && err != nil {
|
||||
t.Errorf("unexpected actions: %#v", err)
|
||||
}
|
||||
if !shouldExist && (err == nil || !strings.Contains(err.Error(), fmt.Sprintf("configmap %q/%q not registered", ns, name))) {
|
||||
if !shouldExist && (err == nil || !strings.Contains(err.Error(), fmt.Sprintf("object %q/%q not registered", ns, name))) {
|
||||
t.Errorf("unexpected actions: %#v", err)
|
||||
}
|
||||
}
|
||||
|
@ -50,242 +47,9 @@ func noObjectTTL() (time.Duration, bool) {
|
|||
return time.Duration(0), false
|
||||
}
|
||||
|
||||
func TestConfigMapStore(t *testing.T) {
|
||||
fakeClient := &fake.Clientset{}
|
||||
store := newConfigMapStore(fakeClient, clock.RealClock{}, noObjectTTL, 0)
|
||||
store.Add("ns1", "name1")
|
||||
store.Add("ns2", "name2")
|
||||
store.Add("ns1", "name1")
|
||||
store.Add("ns1", "name1")
|
||||
store.Delete("ns1", "name1")
|
||||
store.Delete("ns2", "name2")
|
||||
store.Add("ns3", "name3")
|
||||
|
||||
// Adds don't issue Get requests.
|
||||
actions := fakeClient.Actions()
|
||||
assert.Equal(t, 0, len(actions), "unexpected actions: %#v", actions)
|
||||
// Should issue Get request
|
||||
store.Get("ns1", "name1")
|
||||
// Shouldn't issue Get request, as configMap is not registered
|
||||
store.Get("ns2", "name2")
|
||||
// Should issue Get request
|
||||
store.Get("ns3", "name3")
|
||||
|
||||
actions = fakeClient.Actions()
|
||||
assert.Equal(t, 2, len(actions), "unexpected actions: %#v", actions)
|
||||
|
||||
for _, a := range actions {
|
||||
assert.True(t, a.Matches("get", "configmaps"), "unexpected actions: %#v", a)
|
||||
}
|
||||
|
||||
checkConfigMap(t, store, "ns1", "name1", true)
|
||||
checkConfigMap(t, store, "ns2", "name2", false)
|
||||
checkConfigMap(t, store, "ns3", "name3", true)
|
||||
checkConfigMap(t, store, "ns4", "name4", false)
|
||||
}
|
||||
|
||||
func TestConfigMapStoreDeletingConfigMap(t *testing.T) {
|
||||
fakeClient := &fake.Clientset{}
|
||||
store := newConfigMapStore(fakeClient, clock.RealClock{}, noObjectTTL, 0)
|
||||
store.Add("ns", "name")
|
||||
|
||||
result := &v1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "name", ResourceVersion: "10"}}
|
||||
fakeClient.AddReactor("get", "configmaps", func(action core.Action) (bool, runtime.Object, error) {
|
||||
return true, result, nil
|
||||
})
|
||||
configMap, err := store.Get("ns", "name")
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
if !reflect.DeepEqual(configMap, result) {
|
||||
t.Errorf("Unexpected configMap: %v", configMap)
|
||||
}
|
||||
|
||||
fakeClient.PrependReactor("get", "configmaps", func(action core.Action) (bool, runtime.Object, error) {
|
||||
return true, &v1.ConfigMap{}, apierrors.NewNotFound(v1.Resource("configMap"), "name")
|
||||
})
|
||||
configMap, err = store.Get("ns", "name")
|
||||
if err == nil || !apierrors.IsNotFound(err) {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
if !reflect.DeepEqual(configMap, &v1.ConfigMap{}) {
|
||||
t.Errorf("Unexpected configMap: %v", configMap)
|
||||
}
|
||||
}
|
||||
|
||||
func TestConfigMapStoreGetAlwaysRefresh(t *testing.T) {
|
||||
fakeClient := &fake.Clientset{}
|
||||
fakeClock := clock.NewFakeClock(time.Now())
|
||||
store := newConfigMapStore(fakeClient, fakeClock, noObjectTTL, 0)
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
store.Add(fmt.Sprintf("ns-%d", i), fmt.Sprintf("name-%d", i))
|
||||
}
|
||||
fakeClient.ClearActions()
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(100)
|
||||
for i := 0; i < 100; i++ {
|
||||
go func(i int) {
|
||||
store.Get(fmt.Sprintf("ns-%d", i%10), fmt.Sprintf("name-%d", i%10))
|
||||
wg.Done()
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
actions := fakeClient.Actions()
|
||||
assert.Equal(t, 100, len(actions), "unexpected actions: %#v", actions)
|
||||
|
||||
for _, a := range actions {
|
||||
assert.True(t, a.Matches("get", "configmaps"), "unexpected actions: %#v", a)
|
||||
}
|
||||
}
|
||||
|
||||
func TestConfigMapStoreGetNeverRefresh(t *testing.T) {
|
||||
fakeClient := &fake.Clientset{}
|
||||
fakeClock := clock.NewFakeClock(time.Now())
|
||||
store := newConfigMapStore(fakeClient, fakeClock, noObjectTTL, time.Minute)
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
store.Add(fmt.Sprintf("ns-%d", i), fmt.Sprintf("name-%d", i))
|
||||
}
|
||||
fakeClient.ClearActions()
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(100)
|
||||
for i := 0; i < 100; i++ {
|
||||
go func(i int) {
|
||||
store.Get(fmt.Sprintf("ns-%d", i%10), fmt.Sprintf("name-%d", i%10))
|
||||
wg.Done()
|
||||
}(i)
|
||||
}
|
||||
wg.Wait()
|
||||
actions := fakeClient.Actions()
|
||||
// Only first Get, should forward the Get request.
|
||||
assert.Equal(t, 10, len(actions), "unexpected actions: %#v", actions)
|
||||
}
|
||||
|
||||
func TestCustomTTL(t *testing.T) {
|
||||
ttl := time.Duration(0)
|
||||
ttlExists := false
|
||||
customTTL := func() (time.Duration, bool) {
|
||||
return ttl, ttlExists
|
||||
}
|
||||
|
||||
fakeClient := &fake.Clientset{}
|
||||
fakeClock := clock.NewFakeClock(time.Time{})
|
||||
store := newConfigMapStore(fakeClient, fakeClock, customTTL, time.Minute)
|
||||
|
||||
store.Add("ns", "name")
|
||||
store.Get("ns", "name")
|
||||
fakeClient.ClearActions()
|
||||
|
||||
// Set 0-ttl and see if that works.
|
||||
ttl = time.Duration(0)
|
||||
ttlExists = true
|
||||
store.Get("ns", "name")
|
||||
actions := fakeClient.Actions()
|
||||
assert.Equal(t, 1, len(actions), "unexpected actions: %#v", actions)
|
||||
fakeClient.ClearActions()
|
||||
|
||||
// Set 5-minute ttl and see if this works.
|
||||
ttl = time.Duration(5) * time.Minute
|
||||
store.Get("ns", "name")
|
||||
actions = fakeClient.Actions()
|
||||
assert.Equal(t, 0, len(actions), "unexpected actions: %#v", actions)
|
||||
// Still no effect after 4 minutes.
|
||||
fakeClock.Step(4 * time.Minute)
|
||||
store.Get("ns", "name")
|
||||
actions = fakeClient.Actions()
|
||||
assert.Equal(t, 0, len(actions), "unexpected actions: %#v", actions)
|
||||
// Now it should have an effect.
|
||||
fakeClock.Step(time.Minute)
|
||||
store.Get("ns", "name")
|
||||
actions = fakeClient.Actions()
|
||||
assert.Equal(t, 1, len(actions), "unexpected actions: %#v", actions)
|
||||
fakeClient.ClearActions()
|
||||
|
||||
// Now remove the custom ttl and see if that works.
|
||||
ttlExists = false
|
||||
fakeClock.Step(55 * time.Second)
|
||||
store.Get("ns", "name")
|
||||
actions = fakeClient.Actions()
|
||||
assert.Equal(t, 0, len(actions), "unexpected actions: %#v", actions)
|
||||
// Pass the minute and it should be triggered now.
|
||||
fakeClock.Step(5 * time.Second)
|
||||
store.Get("ns", "name")
|
||||
actions = fakeClient.Actions()
|
||||
assert.Equal(t, 1, len(actions), "unexpected actions: %#v", actions)
|
||||
}
|
||||
|
||||
func TestParseNodeAnnotation(t *testing.T) {
|
||||
testCases := []struct {
|
||||
node *v1.Node
|
||||
err error
|
||||
exists bool
|
||||
ttl time.Duration
|
||||
}{
|
||||
{
|
||||
node: nil,
|
||||
err: fmt.Errorf("error"),
|
||||
exists: false,
|
||||
},
|
||||
{
|
||||
node: &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "node",
|
||||
},
|
||||
},
|
||||
exists: false,
|
||||
},
|
||||
{
|
||||
node: &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "node",
|
||||
Annotations: map[string]string{},
|
||||
},
|
||||
},
|
||||
exists: false,
|
||||
},
|
||||
{
|
||||
node: &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "node",
|
||||
Annotations: map[string]string{v1.ObjectTTLAnnotationKey: "bad"},
|
||||
},
|
||||
},
|
||||
exists: false,
|
||||
},
|
||||
{
|
||||
node: &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "node",
|
||||
Annotations: map[string]string{v1.ObjectTTLAnnotationKey: "0"},
|
||||
},
|
||||
},
|
||||
exists: true,
|
||||
ttl: time.Duration(0),
|
||||
},
|
||||
{
|
||||
node: &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "node",
|
||||
Annotations: map[string]string{v1.ObjectTTLAnnotationKey: "60"},
|
||||
},
|
||||
},
|
||||
exists: true,
|
||||
ttl: time.Minute,
|
||||
},
|
||||
}
|
||||
for i, testCase := range testCases {
|
||||
getNode := func() (*v1.Node, error) { return testCase.node, testCase.err }
|
||||
ttl, exists := GetObjectTTLFromNodeFunc(getNode)()
|
||||
if exists != testCase.exists {
|
||||
t.Errorf("%d: incorrect parsing: %t", i, exists)
|
||||
continue
|
||||
}
|
||||
if exists && ttl != testCase.ttl {
|
||||
t.Errorf("%d: incorrect ttl: %v", i, ttl)
|
||||
}
|
||||
func getConfigMap(fakeClient clientset.Interface) manager.GetObjectFunc {
|
||||
return func(namespace, name string, opts metav1.GetOptions) (runtime.Object, error) {
|
||||
return fakeClient.CoreV1().ConfigMaps(namespace).Get(name, opts)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -348,152 +112,11 @@ func podWithConfigMaps(ns, podName string, toAttach configMapsToAttach) *v1.Pod
|
|||
return pod
|
||||
}
|
||||
|
||||
func TestCacheInvalidation(t *testing.T) {
|
||||
func TestCacheBasedConfigMapManager(t *testing.T) {
|
||||
fakeClient := &fake.Clientset{}
|
||||
fakeClock := clock.NewFakeClock(time.Now())
|
||||
store := newConfigMapStore(fakeClient, fakeClock, noObjectTTL, time.Minute)
|
||||
manager := &cachingConfigMapManager{
|
||||
configMapStore: store,
|
||||
registeredPods: make(map[objectKey]*v1.Pod),
|
||||
}
|
||||
|
||||
// Create a pod with some configMaps.
|
||||
s1 := configMapsToAttach{
|
||||
containerEnvConfigMaps: []envConfigMaps{
|
||||
{envVarNames: []string{"s1"}, envFromNames: []string{"s10"}},
|
||||
{envVarNames: []string{"s2"}},
|
||||
},
|
||||
}
|
||||
manager.RegisterPod(podWithConfigMaps("ns1", "name1", s1))
|
||||
// Fetch both configMaps - this should triggger get operations.
|
||||
store.Get("ns1", "s1")
|
||||
store.Get("ns1", "s10")
|
||||
store.Get("ns1", "s2")
|
||||
actions := fakeClient.Actions()
|
||||
assert.Equal(t, 3, len(actions), "unexpected actions: %#v", actions)
|
||||
fakeClient.ClearActions()
|
||||
|
||||
// Update a pod with a new configMap.
|
||||
s2 := configMapsToAttach{
|
||||
containerEnvConfigMaps: []envConfigMaps{
|
||||
{envVarNames: []string{"s1"}},
|
||||
{envVarNames: []string{"s2"}, envFromNames: []string{"s20"}},
|
||||
},
|
||||
volumes: []string{"s3"},
|
||||
}
|
||||
manager.RegisterPod(podWithConfigMaps("ns1", "name1", s2))
|
||||
// All configMaps should be invalidated - this should trigger get operations.
|
||||
store.Get("ns1", "s1")
|
||||
store.Get("ns1", "s2")
|
||||
store.Get("ns1", "s20")
|
||||
store.Get("ns1", "s3")
|
||||
actions = fakeClient.Actions()
|
||||
assert.Equal(t, 4, len(actions), "unexpected actions: %#v", actions)
|
||||
fakeClient.ClearActions()
|
||||
|
||||
// Create a new pod that is refencing the first three configMaps - those should
|
||||
// be invalidated.
|
||||
manager.RegisterPod(podWithConfigMaps("ns1", "name2", s1))
|
||||
store.Get("ns1", "s1")
|
||||
store.Get("ns1", "s10")
|
||||
store.Get("ns1", "s2")
|
||||
store.Get("ns1", "s20")
|
||||
store.Get("ns1", "s3")
|
||||
actions = fakeClient.Actions()
|
||||
assert.Equal(t, 3, len(actions), "unexpected actions: %#v", actions)
|
||||
fakeClient.ClearActions()
|
||||
}
|
||||
|
||||
func TestCacheRefcounts(t *testing.T) {
|
||||
fakeClient := &fake.Clientset{}
|
||||
fakeClock := clock.NewFakeClock(time.Now())
|
||||
store := newConfigMapStore(fakeClient, fakeClock, noObjectTTL, time.Minute)
|
||||
manager := &cachingConfigMapManager{
|
||||
configMapStore: store,
|
||||
registeredPods: make(map[objectKey]*v1.Pod),
|
||||
}
|
||||
|
||||
s1 := configMapsToAttach{
|
||||
containerEnvConfigMaps: []envConfigMaps{
|
||||
{envVarNames: []string{"s1"}, envFromNames: []string{"s10"}},
|
||||
{envVarNames: []string{"s2"}},
|
||||
},
|
||||
volumes: []string{"s3"},
|
||||
}
|
||||
manager.RegisterPod(podWithConfigMaps("ns1", "name1", s1))
|
||||
manager.RegisterPod(podWithConfigMaps("ns1", "name2", s1))
|
||||
s2 := configMapsToAttach{
|
||||
containerEnvConfigMaps: []envConfigMaps{
|
||||
{envVarNames: []string{"s4"}},
|
||||
{envVarNames: []string{"s5"}, envFromNames: []string{"s50"}},
|
||||
},
|
||||
}
|
||||
manager.RegisterPod(podWithConfigMaps("ns1", "name2", s2))
|
||||
manager.RegisterPod(podWithConfigMaps("ns1", "name3", s2))
|
||||
manager.RegisterPod(podWithConfigMaps("ns1", "name4", s2))
|
||||
manager.UnregisterPod(podWithConfigMaps("ns1", "name3", s2))
|
||||
s3 := configMapsToAttach{
|
||||
containerEnvConfigMaps: []envConfigMaps{
|
||||
{envVarNames: []string{"s3"}, envFromNames: []string{"s30"}},
|
||||
{envVarNames: []string{"s5"}},
|
||||
},
|
||||
}
|
||||
manager.RegisterPod(podWithConfigMaps("ns1", "name5", s3))
|
||||
manager.RegisterPod(podWithConfigMaps("ns1", "name6", s3))
|
||||
s4 := configMapsToAttach{
|
||||
containerEnvConfigMaps: []envConfigMaps{
|
||||
{envVarNames: []string{"s6"}},
|
||||
{envFromNames: []string{"s60"}},
|
||||
},
|
||||
}
|
||||
manager.RegisterPod(podWithConfigMaps("ns1", "name7", s4))
|
||||
manager.UnregisterPod(podWithConfigMaps("ns1", "name7", s4))
|
||||
|
||||
// Also check the Add + Update + Remove scenario.
|
||||
manager.RegisterPod(podWithConfigMaps("ns1", "other-name", s1))
|
||||
manager.RegisterPod(podWithConfigMaps("ns1", "other-name", s2))
|
||||
manager.UnregisterPod(podWithConfigMaps("ns1", "other-name", s2))
|
||||
|
||||
s5 := configMapsToAttach{
|
||||
containerEnvConfigMaps: []envConfigMaps{
|
||||
{envVarNames: []string{"s7"}},
|
||||
{envFromNames: []string{"s70"}},
|
||||
},
|
||||
}
|
||||
|
||||
// Check the no-op update scenario
|
||||
manager.RegisterPod(podWithConfigMaps("ns1", "noop-pod", s5))
|
||||
manager.RegisterPod(podWithConfigMaps("ns1", "noop-pod", s5))
|
||||
|
||||
refs := func(ns, name string) int {
|
||||
store.lock.Lock()
|
||||
defer store.lock.Unlock()
|
||||
item, ok := store.items[objectKey{ns, name}]
|
||||
if !ok {
|
||||
return 0
|
||||
}
|
||||
return item.refCount
|
||||
}
|
||||
assert.Equal(t, 1, refs("ns1", "s1"))
|
||||
assert.Equal(t, 1, refs("ns1", "s10"))
|
||||
assert.Equal(t, 1, refs("ns1", "s2"))
|
||||
assert.Equal(t, 3, refs("ns1", "s3"))
|
||||
assert.Equal(t, 2, refs("ns1", "s30"))
|
||||
assert.Equal(t, 2, refs("ns1", "s4"))
|
||||
assert.Equal(t, 4, refs("ns1", "s5"))
|
||||
assert.Equal(t, 2, refs("ns1", "s50"))
|
||||
assert.Equal(t, 0, refs("ns1", "s6"))
|
||||
assert.Equal(t, 0, refs("ns1", "s60"))
|
||||
assert.Equal(t, 1, refs("ns1", "s7"))
|
||||
assert.Equal(t, 1, refs("ns1", "s70"))
|
||||
}
|
||||
|
||||
func TestCachingConfigMapManager(t *testing.T) {
|
||||
fakeClient := &fake.Clientset{}
|
||||
configMapStore := newConfigMapStore(fakeClient, clock.RealClock{}, noObjectTTL, 0)
|
||||
manager := &cachingConfigMapManager{
|
||||
configMapStore: configMapStore,
|
||||
registeredPods: make(map[objectKey]*v1.Pod),
|
||||
store := manager.NewObjectStore(getConfigMap(fakeClient), clock.RealClock{}, noObjectTTL, 0)
|
||||
manager := &configMapManager{
|
||||
manager: manager.NewCacheBasedManager(store, getConfigMapNames),
|
||||
}
|
||||
|
||||
// Create a pod with some configMaps.
|
||||
|
@ -543,7 +166,7 @@ func TestCachingConfigMapManager(t *testing.T) {
|
|||
|
||||
for _, ns := range []string{"ns1", "ns2", "ns3"} {
|
||||
for _, configMap := range []string{"s1", "s2", "s3", "s4", "s5", "s6", "s20", "s40", "s50"} {
|
||||
checkConfigMap(t, configMapStore, ns, configMap, shouldExist(ns, configMap))
|
||||
checkObject(t, store, ns, configMap, shouldExist(ns, configMap))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -547,7 +547,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
|
|||
klet.secretManager = secretManager
|
||||
|
||||
configMapManager := configmap.NewCachingConfigMapManager(
|
||||
kubeDeps.KubeClient, configmap.GetObjectTTLFromNodeFunc(klet.GetNode))
|
||||
kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
|
||||
klet.configMapManager = configMapManager
|
||||
|
||||
if klet.experimentalHostUserNamespaceDefaulting {
|
||||
|
|
|
@ -66,16 +66,16 @@ func (s *simpleSecretManager) RegisterPod(pod *v1.Pod) {
|
|||
func (s *simpleSecretManager) UnregisterPod(pod *v1.Pod) {
|
||||
}
|
||||
|
||||
// cachingSecretManager keeps a store with secrets necessary
|
||||
// secretManager keeps a store with secrets necessary
|
||||
// for registered pods. Different implementations of the store
|
||||
// may result in different semantics for freshness of secrets
|
||||
// (e.g. ttl-based implementation vs watch-based implementation).
|
||||
type cachingSecretManager struct {
|
||||
type secretManager struct {
|
||||
manager manager.Manager
|
||||
}
|
||||
|
||||
func (c *cachingSecretManager) GetSecret(namespace, name string) (*v1.Secret, error) {
|
||||
object, err := c.manager.GetObject(namespace, name)
|
||||
func (s *secretManager) GetSecret(namespace, name string) (*v1.Secret, error) {
|
||||
object, err := s.manager.GetObject(namespace, name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -85,12 +85,12 @@ func (c *cachingSecretManager) GetSecret(namespace, name string) (*v1.Secret, er
|
|||
return nil, fmt.Errorf("unexpected object type: %v", object)
|
||||
}
|
||||
|
||||
func (c *cachingSecretManager) RegisterPod(pod *v1.Pod) {
|
||||
c.manager.RegisterPod(pod)
|
||||
func (s *secretManager) RegisterPod(pod *v1.Pod) {
|
||||
s.manager.RegisterPod(pod)
|
||||
}
|
||||
|
||||
func (c *cachingSecretManager) UnregisterPod(pod *v1.Pod) {
|
||||
c.manager.UnregisterPod(pod)
|
||||
func (s *secretManager) UnregisterPod(pod *v1.Pod) {
|
||||
s.manager.UnregisterPod(pod)
|
||||
}
|
||||
|
||||
func getSecretNames(pod *v1.Pod) sets.String {
|
||||
|
@ -106,7 +106,7 @@ const (
|
|||
defaultTTL = time.Minute
|
||||
)
|
||||
|
||||
// NewCacheBasedManager creates a manager that keeps a cache of all secrets
|
||||
// NewCachingSecretManager creates a manager that keeps a cache of all secrets
|
||||
// necessary for registered pods.
|
||||
// It implements the following logic:
|
||||
// - whenever a pod is created or updated, the cached versions of all secrets
|
||||
|
@ -119,7 +119,7 @@ func NewCachingSecretManager(kubeClient clientset.Interface, getTTL manager.GetO
|
|||
return kubeClient.CoreV1().Secrets(namespace).Get(name, opts)
|
||||
}
|
||||
secretStore := manager.NewObjectStore(getSecret, clock.RealClock{}, getTTL, defaultTTL)
|
||||
return &cachingSecretManager{
|
||||
return &secretManager{
|
||||
manager: manager.NewCacheBasedManager(secretStore, getSecretNames),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -108,7 +108,7 @@ func podWithSecrets(ns, podName string, toAttach secretsToAttach) *v1.Pod {
|
|||
func TestCacheBasedSecretManager(t *testing.T) {
|
||||
fakeClient := &fake.Clientset{}
|
||||
store := manager.NewObjectStore(getSecret(fakeClient), clock.RealClock{}, noObjectTTL, 0)
|
||||
manager := &cachingSecretManager{
|
||||
manager := &secretManager{
|
||||
manager: manager.NewCacheBasedManager(store, getSecretNames),
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue