mirror of https://github.com/k3s-io/k3s
Merge pull request #63539 from wojtek-t/refactor_secret_configmap_manager
Automatic merge from submit-queue (batch tested with PRs 63593, 63539). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Refactor cachingSecretManager I have a POC of watch-based implementation of SecretManager in https://github.com/kubernetes/kubernetes/pull/63461 This is an initial refactoring that would make that change easier. @yujuhong - if you're fine with this PR, I will do the same for configmaps in the follow up PR.pull/8/head
commit
8de6600a55
|
@ -8,7 +8,7 @@ load(
|
|||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["secret_manager_test.go"],
|
||||
srcs = ["caching_secret_manager_test.go"],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//vendor/github.com/stretchr/testify/assert:go_default_library",
|
||||
|
@ -25,6 +25,7 @@ go_test(
|
|||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"caching_secret_manager.go",
|
||||
"fake_manager.go",
|
||||
"secret_manager.go",
|
||||
],
|
||||
|
|
|
@ -0,0 +1,206 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package secret
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
storageetcd "k8s.io/apiserver/pkg/storage/etcd"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util"
|
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultTTL = time.Minute
|
||||
)
|
||||
|
||||
type GetObjectTTLFunc func() (time.Duration, bool)
|
||||
|
||||
// secretStoreItems is a single item stored in secretStore.
|
||||
type secretStoreItem struct {
|
||||
refCount int
|
||||
secret *secretData
|
||||
}
|
||||
|
||||
type secretData struct {
|
||||
sync.Mutex
|
||||
|
||||
secret *v1.Secret
|
||||
err error
|
||||
lastUpdateTime time.Time
|
||||
}
|
||||
|
||||
// secretStore is a local cache of secrets.
|
||||
type secretStore struct {
|
||||
kubeClient clientset.Interface
|
||||
clock clock.Clock
|
||||
|
||||
lock sync.Mutex
|
||||
items map[objectKey]*secretStoreItem
|
||||
|
||||
defaultTTL time.Duration
|
||||
getTTL GetObjectTTLFunc
|
||||
}
|
||||
|
||||
func newSecretStore(kubeClient clientset.Interface, clock clock.Clock, getTTL GetObjectTTLFunc, ttl time.Duration) *secretStore {
|
||||
return &secretStore{
|
||||
kubeClient: kubeClient,
|
||||
clock: clock,
|
||||
items: make(map[objectKey]*secretStoreItem),
|
||||
defaultTTL: ttl,
|
||||
getTTL: getTTL,
|
||||
}
|
||||
}
|
||||
|
||||
func isSecretOlder(newSecret, oldSecret *v1.Secret) bool {
|
||||
if newSecret == nil || oldSecret == nil {
|
||||
return false
|
||||
}
|
||||
newVersion, _ := storageetcd.Versioner.ObjectResourceVersion(newSecret)
|
||||
oldVersion, _ := storageetcd.Versioner.ObjectResourceVersion(oldSecret)
|
||||
return newVersion < oldVersion
|
||||
}
|
||||
|
||||
func (s *secretStore) AddReference(namespace, name string) {
|
||||
key := objectKey{namespace: namespace, name: name}
|
||||
|
||||
// AddReference is called from RegisterPod, thus it needs to be efficient.
|
||||
// Thus Add() is only increasing refCount and generation of a given secret.
|
||||
// Then Get() is responsible for fetching if needed.
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
item, exists := s.items[key]
|
||||
if !exists {
|
||||
item = &secretStoreItem{
|
||||
refCount: 0,
|
||||
secret: &secretData{},
|
||||
}
|
||||
s.items[key] = item
|
||||
}
|
||||
|
||||
item.refCount++
|
||||
// This will trigger fetch on the next Get() operation.
|
||||
item.secret = nil
|
||||
}
|
||||
|
||||
func (s *secretStore) DeleteReference(namespace, name string) {
|
||||
key := objectKey{namespace: namespace, name: name}
|
||||
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
if item, ok := s.items[key]; ok {
|
||||
item.refCount--
|
||||
if item.refCount == 0 {
|
||||
delete(s.items, key)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func GetObjectTTLFromNodeFunc(getNode func() (*v1.Node, error)) GetObjectTTLFunc {
|
||||
return func() (time.Duration, bool) {
|
||||
node, err := getNode()
|
||||
if err != nil {
|
||||
return time.Duration(0), false
|
||||
}
|
||||
if node != nil && node.Annotations != nil {
|
||||
if value, ok := node.Annotations[v1.ObjectTTLAnnotationKey]; ok {
|
||||
if intValue, err := strconv.Atoi(value); err == nil {
|
||||
return time.Duration(intValue) * time.Second, true
|
||||
}
|
||||
}
|
||||
}
|
||||
return time.Duration(0), false
|
||||
}
|
||||
}
|
||||
|
||||
func (s *secretStore) isSecretFresh(data *secretData) bool {
|
||||
secretTTL := s.defaultTTL
|
||||
if ttl, ok := s.getTTL(); ok {
|
||||
secretTTL = ttl
|
||||
}
|
||||
return s.clock.Now().Before(data.lastUpdateTime.Add(secretTTL))
|
||||
}
|
||||
|
||||
func (s *secretStore) Get(namespace, name string) (*v1.Secret, error) {
|
||||
key := objectKey{namespace: namespace, name: name}
|
||||
|
||||
data := func() *secretData {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
item, exists := s.items[key]
|
||||
if !exists {
|
||||
return nil
|
||||
}
|
||||
if item.secret == nil {
|
||||
item.secret = &secretData{}
|
||||
}
|
||||
return item.secret
|
||||
}()
|
||||
if data == nil {
|
||||
return nil, fmt.Errorf("secret %q/%q not registered", namespace, name)
|
||||
}
|
||||
|
||||
// After updating data in secretStore, lock the data, fetch secret if
|
||||
// needed and return data.
|
||||
data.Lock()
|
||||
defer data.Unlock()
|
||||
if data.err != nil || !s.isSecretFresh(data) {
|
||||
opts := metav1.GetOptions{}
|
||||
if data.secret != nil && data.err == nil {
|
||||
// This is just a periodic refresh of a secret we successfully fetched previously.
|
||||
// In this case, server data from apiserver cache to reduce the load on both
|
||||
// etcd and apiserver (the cache is eventually consistent).
|
||||
util.FromApiserverCache(&opts)
|
||||
}
|
||||
secret, err := s.kubeClient.CoreV1().Secrets(namespace).Get(name, opts)
|
||||
if err != nil && !apierrors.IsNotFound(err) && data.secret == nil && data.err == nil {
|
||||
// Couldn't fetch the latest secret, but there is no cached data to return.
|
||||
// Return the fetch result instead.
|
||||
return secret, err
|
||||
}
|
||||
if (err == nil && !isSecretOlder(secret, data.secret)) || apierrors.IsNotFound(err) {
|
||||
// If the fetch succeeded with a newer version of the secret, or if the
|
||||
// secret could not be found in the apiserver, update the cached data to
|
||||
// reflect the current status.
|
||||
data.secret = secret
|
||||
data.err = err
|
||||
data.lastUpdateTime = s.clock.Now()
|
||||
}
|
||||
}
|
||||
return data.secret, data.err
|
||||
}
|
||||
|
||||
// NewCachingSecretManager creates a manager that keeps a cache of all secrets
|
||||
// necessary for registered pods.
|
||||
// It implements the following logic:
|
||||
// - whenever a pod is created or updated, the cached versions of all its secrets
|
||||
// are invalidated
|
||||
// - every GetSecret() call tries to fetch the value from local cache; if it is
|
||||
// not there, invalidated or too old, we fetch it from apiserver and refresh the
|
||||
// value in cache; otherwise it is just fetched from cache
|
||||
func NewCachingSecretManager(kubeClient clientset.Interface, getTTL GetObjectTTLFunc) Manager {
|
||||
secretStore := newSecretStore(kubeClient, clock.RealClock{}, getTTL, defaultTTL)
|
||||
return newCacheBasedSecretManager(secretStore)
|
||||
}
|
|
@ -53,13 +53,13 @@ func noObjectTTL() (time.Duration, bool) {
|
|||
func TestSecretStore(t *testing.T) {
|
||||
fakeClient := &fake.Clientset{}
|
||||
store := newSecretStore(fakeClient, clock.RealClock{}, noObjectTTL, 0)
|
||||
store.Add("ns1", "name1")
|
||||
store.Add("ns2", "name2")
|
||||
store.Add("ns1", "name1")
|
||||
store.Add("ns1", "name1")
|
||||
store.Delete("ns1", "name1")
|
||||
store.Delete("ns2", "name2")
|
||||
store.Add("ns3", "name3")
|
||||
store.AddReference("ns1", "name1")
|
||||
store.AddReference("ns2", "name2")
|
||||
store.AddReference("ns1", "name1")
|
||||
store.AddReference("ns1", "name1")
|
||||
store.DeleteReference("ns1", "name1")
|
||||
store.DeleteReference("ns2", "name2")
|
||||
store.AddReference("ns3", "name3")
|
||||
|
||||
// Adds don't issue Get requests.
|
||||
actions := fakeClient.Actions()
|
||||
|
@ -87,7 +87,7 @@ func TestSecretStore(t *testing.T) {
|
|||
func TestSecretStoreDeletingSecret(t *testing.T) {
|
||||
fakeClient := &fake.Clientset{}
|
||||
store := newSecretStore(fakeClient, clock.RealClock{}, noObjectTTL, 0)
|
||||
store.Add("ns", "name")
|
||||
store.AddReference("ns", "name")
|
||||
|
||||
result := &v1.Secret{ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "name", ResourceVersion: "10"}}
|
||||
fakeClient.AddReactor("get", "secrets", func(action core.Action) (bool, runtime.Object, error) {
|
||||
|
@ -119,7 +119,7 @@ func TestSecretStoreGetAlwaysRefresh(t *testing.T) {
|
|||
store := newSecretStore(fakeClient, fakeClock, noObjectTTL, 0)
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
store.Add(fmt.Sprintf("ns-%d", i), fmt.Sprintf("name-%d", i))
|
||||
store.AddReference(fmt.Sprintf("ns-%d", i), fmt.Sprintf("name-%d", i))
|
||||
}
|
||||
fakeClient.ClearActions()
|
||||
|
||||
|
@ -146,7 +146,7 @@ func TestSecretStoreGetNeverRefresh(t *testing.T) {
|
|||
store := newSecretStore(fakeClient, fakeClock, noObjectTTL, time.Minute)
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
store.Add(fmt.Sprintf("ns-%d", i), fmt.Sprintf("name-%d", i))
|
||||
store.AddReference(fmt.Sprintf("ns-%d", i), fmt.Sprintf("name-%d", i))
|
||||
}
|
||||
fakeClient.ClearActions()
|
||||
|
||||
|
@ -175,7 +175,7 @@ func TestCustomTTL(t *testing.T) {
|
|||
fakeClock := clock.NewFakeClock(time.Time{})
|
||||
store := newSecretStore(fakeClient, fakeClock, customTTL, time.Minute)
|
||||
|
||||
store.Add("ns", "name")
|
||||
store.AddReference("ns", "name")
|
||||
store.Get("ns", "name")
|
||||
fakeClient.ClearActions()
|
||||
|
||||
|
@ -345,10 +345,7 @@ func TestCacheInvalidation(t *testing.T) {
|
|||
fakeClient := &fake.Clientset{}
|
||||
fakeClock := clock.NewFakeClock(time.Now())
|
||||
store := newSecretStore(fakeClient, fakeClock, noObjectTTL, time.Minute)
|
||||
manager := &cachingSecretManager{
|
||||
secretStore: store,
|
||||
registeredPods: make(map[objectKey]*v1.Pod),
|
||||
}
|
||||
manager := newCacheBasedSecretManager(store)
|
||||
|
||||
// Create a pod with some secrets.
|
||||
s1 := secretsToAttach{
|
||||
|
@ -403,10 +400,7 @@ func TestCacheRefcounts(t *testing.T) {
|
|||
fakeClient := &fake.Clientset{}
|
||||
fakeClock := clock.NewFakeClock(time.Now())
|
||||
store := newSecretStore(fakeClient, fakeClock, noObjectTTL, time.Minute)
|
||||
manager := &cachingSecretManager{
|
||||
secretStore: store,
|
||||
registeredPods: make(map[objectKey]*v1.Pod),
|
||||
}
|
||||
manager := newCacheBasedSecretManager(store)
|
||||
|
||||
s1 := secretsToAttach{
|
||||
imagePullSecretNames: []string{"s1"},
|
||||
|
@ -490,10 +484,7 @@ func TestCacheRefcounts(t *testing.T) {
|
|||
func TestCachingSecretManager(t *testing.T) {
|
||||
fakeClient := &fake.Clientset{}
|
||||
secretStore := newSecretStore(fakeClient, clock.RealClock{}, noObjectTTL, 0)
|
||||
manager := &cachingSecretManager{
|
||||
secretStore: secretStore,
|
||||
registeredPods: make(map[objectKey]*v1.Pod),
|
||||
}
|
||||
manager := newCacheBasedSecretManager(secretStore)
|
||||
|
||||
// Create a pod with some secrets.
|
||||
s1 := secretsToAttach{
|
|
@ -17,27 +17,16 @@ limitations under the License.
|
|||
package secret
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
storageetcd "k8s.io/apiserver/pkg/storage/etcd"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util"
|
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultTTL = time.Minute
|
||||
)
|
||||
|
||||
type Manager interface {
|
||||
// Get secret by secret namespace and name.
|
||||
GetSecret(namespace, name string) (*v1.Secret, error)
|
||||
|
@ -53,6 +42,11 @@ type Manager interface {
|
|||
UnregisterPod(pod *v1.Pod)
|
||||
}
|
||||
|
||||
type objectKey struct {
|
||||
namespace string
|
||||
name string
|
||||
}
|
||||
|
||||
// simpleSecretManager implements SecretManager interfaces with
|
||||
// simple operations to apiserver.
|
||||
type simpleSecretManager struct {
|
||||
|
@ -73,190 +67,41 @@ func (s *simpleSecretManager) RegisterPod(pod *v1.Pod) {
|
|||
func (s *simpleSecretManager) UnregisterPod(pod *v1.Pod) {
|
||||
}
|
||||
|
||||
type GetObjectTTLFunc func() (time.Duration, bool)
|
||||
|
||||
type objectKey struct {
|
||||
namespace string
|
||||
name string
|
||||
// store is the interface for a secrets cache that
|
||||
// can be used by cacheBasedSecretManager.
|
||||
type store interface {
|
||||
// AddReference adds a reference to the secret to the store.
|
||||
// Note that multiple additions to the store has to be allowed
|
||||
// in the implementations and effectively treated as refcounted.
|
||||
AddReference(namespace, name string)
|
||||
// DeleteReference deletes reference to the secret from the store.
|
||||
// Note that secret should be deleted only when there was a
|
||||
// corresponding Delete call for each of Add calls (effectively
|
||||
// when refcount was reduced to zero).
|
||||
DeleteReference(namespace, name string)
|
||||
// Get a secret from a store.
|
||||
Get(namespace, name string) (*v1.Secret, error)
|
||||
}
|
||||
|
||||
// secretStoreItems is a single item stored in secretStore.
|
||||
type secretStoreItem struct {
|
||||
refCount int
|
||||
secret *secretData
|
||||
}
|
||||
|
||||
type secretData struct {
|
||||
sync.Mutex
|
||||
|
||||
secret *v1.Secret
|
||||
err error
|
||||
lastUpdateTime time.Time
|
||||
}
|
||||
|
||||
// secretStore is a local cache of secrets.
|
||||
type secretStore struct {
|
||||
kubeClient clientset.Interface
|
||||
clock clock.Clock
|
||||
|
||||
lock sync.Mutex
|
||||
items map[objectKey]*secretStoreItem
|
||||
|
||||
defaultTTL time.Duration
|
||||
getTTL GetObjectTTLFunc
|
||||
}
|
||||
|
||||
func newSecretStore(kubeClient clientset.Interface, clock clock.Clock, getTTL GetObjectTTLFunc, ttl time.Duration) *secretStore {
|
||||
return &secretStore{
|
||||
kubeClient: kubeClient,
|
||||
clock: clock,
|
||||
items: make(map[objectKey]*secretStoreItem),
|
||||
defaultTTL: ttl,
|
||||
getTTL: getTTL,
|
||||
}
|
||||
}
|
||||
|
||||
func isSecretOlder(newSecret, oldSecret *v1.Secret) bool {
|
||||
if newSecret == nil || oldSecret == nil {
|
||||
return false
|
||||
}
|
||||
newVersion, _ := storageetcd.Versioner.ObjectResourceVersion(newSecret)
|
||||
oldVersion, _ := storageetcd.Versioner.ObjectResourceVersion(oldSecret)
|
||||
return newVersion < oldVersion
|
||||
}
|
||||
|
||||
func (s *secretStore) Add(namespace, name string) {
|
||||
key := objectKey{namespace: namespace, name: name}
|
||||
|
||||
// Add is called from RegisterPod, thus it needs to be efficient.
|
||||
// Thus Add() is only increasing refCount and generation of a given secret.
|
||||
// Then Get() is responsible for fetching if needed.
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
item, exists := s.items[key]
|
||||
if !exists {
|
||||
item = &secretStoreItem{
|
||||
refCount: 0,
|
||||
secret: &secretData{},
|
||||
}
|
||||
s.items[key] = item
|
||||
}
|
||||
|
||||
item.refCount++
|
||||
// This will trigger fetch on the next Get() operation.
|
||||
item.secret = nil
|
||||
}
|
||||
|
||||
func (s *secretStore) Delete(namespace, name string) {
|
||||
key := objectKey{namespace: namespace, name: name}
|
||||
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
if item, ok := s.items[key]; ok {
|
||||
item.refCount--
|
||||
if item.refCount == 0 {
|
||||
delete(s.items, key)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func GetObjectTTLFromNodeFunc(getNode func() (*v1.Node, error)) GetObjectTTLFunc {
|
||||
return func() (time.Duration, bool) {
|
||||
node, err := getNode()
|
||||
if err != nil {
|
||||
return time.Duration(0), false
|
||||
}
|
||||
if node != nil && node.Annotations != nil {
|
||||
if value, ok := node.Annotations[v1.ObjectTTLAnnotationKey]; ok {
|
||||
if intValue, err := strconv.Atoi(value); err == nil {
|
||||
return time.Duration(intValue) * time.Second, true
|
||||
}
|
||||
}
|
||||
}
|
||||
return time.Duration(0), false
|
||||
}
|
||||
}
|
||||
|
||||
func (s *secretStore) isSecretFresh(data *secretData) bool {
|
||||
secretTTL := s.defaultTTL
|
||||
if ttl, ok := s.getTTL(); ok {
|
||||
secretTTL = ttl
|
||||
}
|
||||
return s.clock.Now().Before(data.lastUpdateTime.Add(secretTTL))
|
||||
}
|
||||
|
||||
func (s *secretStore) Get(namespace, name string) (*v1.Secret, error) {
|
||||
key := objectKey{namespace: namespace, name: name}
|
||||
|
||||
data := func() *secretData {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
item, exists := s.items[key]
|
||||
if !exists {
|
||||
return nil
|
||||
}
|
||||
if item.secret == nil {
|
||||
item.secret = &secretData{}
|
||||
}
|
||||
return item.secret
|
||||
}()
|
||||
if data == nil {
|
||||
return nil, fmt.Errorf("secret %q/%q not registered", namespace, name)
|
||||
}
|
||||
|
||||
// After updating data in secretStore, lock the data, fetch secret if
|
||||
// needed and return data.
|
||||
data.Lock()
|
||||
defer data.Unlock()
|
||||
if data.err != nil || !s.isSecretFresh(data) {
|
||||
opts := metav1.GetOptions{}
|
||||
if data.secret != nil && data.err == nil {
|
||||
// This is just a periodic refresh of a secret we successfully fetched previously.
|
||||
// In this case, server data from apiserver cache to reduce the load on both
|
||||
// etcd and apiserver (the cache is eventually consistent).
|
||||
util.FromApiserverCache(&opts)
|
||||
}
|
||||
secret, err := s.kubeClient.CoreV1().Secrets(namespace).Get(name, opts)
|
||||
if err != nil && !apierrors.IsNotFound(err) && data.secret == nil && data.err == nil {
|
||||
// Couldn't fetch the latest secret, but there is no cached data to return.
|
||||
// Return the fetch result instead.
|
||||
return secret, err
|
||||
}
|
||||
if (err == nil && !isSecretOlder(secret, data.secret)) || apierrors.IsNotFound(err) {
|
||||
// If the fetch succeeded with a newer version of the secret, or if the
|
||||
// secret could not be found in the apiserver, update the cached data to
|
||||
// reflect the current status.
|
||||
data.secret = secret
|
||||
data.err = err
|
||||
data.lastUpdateTime = s.clock.Now()
|
||||
}
|
||||
}
|
||||
return data.secret, data.err
|
||||
}
|
||||
|
||||
// cachingSecretManager keeps a cache of all secrets necessary for registered pods.
|
||||
// It implements the following logic:
|
||||
// - whenever a pod is created or updated, the cached versions of all its secrets
|
||||
// are invalidated
|
||||
// - every GetSecret() call tries to fetch the value from local cache; if it is
|
||||
// not there, invalidated or too old, we fetch it from apiserver and refresh the
|
||||
// value in cache; otherwise it is just fetched from cache
|
||||
type cachingSecretManager struct {
|
||||
secretStore *secretStore
|
||||
// cachingBasedSecretManager keeps a store with secrets necessary
|
||||
// for registered pods. Different implementations of the store
|
||||
// may result in different semantics for freshness of secrets
|
||||
// (e.g. ttl-based implementation vs watch-based implementation).
|
||||
type cacheBasedSecretManager struct {
|
||||
secretStore store
|
||||
|
||||
lock sync.Mutex
|
||||
registeredPods map[objectKey]*v1.Pod
|
||||
}
|
||||
|
||||
func NewCachingSecretManager(kubeClient clientset.Interface, getTTL GetObjectTTLFunc) Manager {
|
||||
csm := &cachingSecretManager{
|
||||
secretStore: newSecretStore(kubeClient, clock.RealClock{}, getTTL, defaultTTL),
|
||||
func newCacheBasedSecretManager(secretStore store) Manager {
|
||||
return &cacheBasedSecretManager{
|
||||
secretStore: secretStore,
|
||||
registeredPods: make(map[objectKey]*v1.Pod),
|
||||
}
|
||||
return csm
|
||||
}
|
||||
|
||||
func (c *cachingSecretManager) GetSecret(namespace, name string) (*v1.Secret, error) {
|
||||
func (c *cacheBasedSecretManager) GetSecret(namespace, name string) (*v1.Secret, error) {
|
||||
return c.secretStore.Get(namespace, name)
|
||||
}
|
||||
|
||||
|
@ -269,12 +114,12 @@ func getSecretNames(pod *v1.Pod) sets.String {
|
|||
return result
|
||||
}
|
||||
|
||||
func (c *cachingSecretManager) RegisterPod(pod *v1.Pod) {
|
||||
func (c *cacheBasedSecretManager) RegisterPod(pod *v1.Pod) {
|
||||
names := getSecretNames(pod)
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
for name := range names {
|
||||
c.secretStore.Add(pod.Namespace, name)
|
||||
c.secretStore.AddReference(pod.Namespace, name)
|
||||
}
|
||||
var prev *v1.Pod
|
||||
key := objectKey{namespace: pod.Namespace, name: pod.Name}
|
||||
|
@ -287,12 +132,12 @@ func (c *cachingSecretManager) RegisterPod(pod *v1.Pod) {
|
|||
// names and prev need to have their ref counts decremented. Any that
|
||||
// are only in prev need to be completely removed. This unconditional
|
||||
// call takes care of both cases.
|
||||
c.secretStore.Delete(prev.Namespace, name)
|
||||
c.secretStore.DeleteReference(prev.Namespace, name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *cachingSecretManager) UnregisterPod(pod *v1.Pod) {
|
||||
func (c *cacheBasedSecretManager) UnregisterPod(pod *v1.Pod) {
|
||||
var prev *v1.Pod
|
||||
key := objectKey{namespace: pod.Namespace, name: pod.Name}
|
||||
c.lock.Lock()
|
||||
|
@ -301,7 +146,7 @@ func (c *cachingSecretManager) UnregisterPod(pod *v1.Pod) {
|
|||
delete(c.registeredPods, key)
|
||||
if prev != nil {
|
||||
for name := range getSecretNames(prev) {
|
||||
c.secretStore.Delete(prev.Namespace, name)
|
||||
c.secretStore.DeleteReference(prev.Namespace, name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue