diff --git a/cmd/kube-controller-manager/app/bootstrap.go b/cmd/kube-controller-manager/app/bootstrap.go index 38e066523f..aeb8405612 100644 --- a/cmd/kube-controller-manager/app/bootstrap.go +++ b/cmd/kube-controller-manager/app/bootstrap.go @@ -39,6 +39,7 @@ func startBootstrapSignerController(ctx ControllerContext) (bool, error) { func startTokenCleanerController(ctx ControllerContext) (bool, error) { tcc, err := bootstrap.NewTokenCleaner( ctx.ClientBuilder.ClientGoClientOrDie("token-cleaner"), + ctx.InformerFactory.Core().V1().Secrets(), bootstrap.DefaultTokenCleanerOptions(), ) if err != nil { diff --git a/pkg/controller/bootstrap/BUILD b/pkg/controller/bootstrap/BUILD index e4ef02a20b..f5850a6fba 100644 --- a/pkg/controller/bootstrap/BUILD +++ b/pkg/controller/bootstrap/BUILD @@ -54,12 +54,9 @@ go_library( "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/fields:go_default_library", "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", "//vendor/k8s.io/client-go/informers/core/v1:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", "//vendor/k8s.io/client-go/listers/core/v1:go_default_library", diff --git a/pkg/controller/bootstrap/tokencleaner.go b/pkg/controller/bootstrap/tokencleaner.go index 6c099a4c73..34a91e492c 100644 --- a/pkg/controller/bootstrap/tokencleaner.go +++ b/pkg/controller/bootstrap/tokencleaner.go @@ -17,21 +17,23 @@ limitations under the License. package bootstrap import ( + "fmt" "time" "github.com/golang/glog" - "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" - "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apimachinery/pkg/watch" + coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" + corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" api "k8s.io/kubernetes/pkg/apis/core" bootstrapapi "k8s.io/kubernetes/pkg/bootstrap/api" + "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/util/metrics" ) @@ -59,57 +61,128 @@ type TokenCleaner struct { client clientset.Interface - secrets cache.Store - secretsController cache.Controller + // secretLister is able to list/get secrets and is populated by the shared informer passed to NewTokenCleaner. + secretLister corelisters.SecretLister + + // secretSynced returns true if the secret shared informer has been synced at least once. + secretSynced cache.InformerSynced + + queue workqueue.RateLimitingInterface } // NewTokenCleaner returns a new *NewTokenCleaner. -// -// TODO: Switch to shared informers -func NewTokenCleaner(cl clientset.Interface, options TokenCleanerOptions) (*TokenCleaner, error) { +func NewTokenCleaner(cl clientset.Interface, secrets coreinformers.SecretInformer, options TokenCleanerOptions) (*TokenCleaner, error) { e := &TokenCleaner{ client: cl, + secretLister: secrets.Lister(), + secretSynced: secrets.Informer().HasSynced, tokenSecretNamespace: options.TokenSecretNamespace, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "token_cleaner"), } + if cl.CoreV1().RESTClient().GetRateLimiter() != nil { if err := metrics.RegisterMetricAndTrackRateLimiterUsage("token_cleaner", cl.CoreV1().RESTClient().GetRateLimiter()); err != nil { return nil, err } } - secretSelector := fields.SelectorFromSet(map[string]string{api.SecretTypeField: string(bootstrapapi.SecretTypeBootstrapToken)}) - e.secrets, e.secretsController = cache.NewInformer( - &cache.ListWatch{ - ListFunc: func(lo metav1.ListOptions) (runtime.Object, error) { - lo.FieldSelector = secretSelector.String() - return e.client.CoreV1().Secrets(e.tokenSecretNamespace).List(lo) + secrets.Informer().AddEventHandlerWithResyncPeriod( + cache.FilteringResourceEventHandler{ + FilterFunc: func(obj interface{}) bool { + switch t := obj.(type) { + case *v1.Secret: + return t.Type == bootstrapapi.SecretTypeBootstrapToken && t.Namespace == e.tokenSecretNamespace + default: + utilruntime.HandleError(fmt.Errorf("object passed to %T that is not expected: %T", e, obj)) + return false + } }, - WatchFunc: func(lo metav1.ListOptions) (watch.Interface, error) { - lo.FieldSelector = secretSelector.String() - return e.client.CoreV1().Secrets(e.tokenSecretNamespace).Watch(lo) + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: e.enqueueSecrets, + UpdateFunc: func(oldSecret, newSecret interface{}) { e.enqueueSecrets(newSecret) }, }, }, - &v1.Secret{}, options.SecretResync, - cache.ResourceEventHandlerFuncs{ - AddFunc: e.evalSecret, - UpdateFunc: func(oldSecret, newSecret interface{}) { e.evalSecret(newSecret) }, - }, ) + return e, nil } // Run runs controller loops and returns when they are done func (tc *TokenCleaner) Run(stopCh <-chan struct{}) { - go tc.secretsController.Run(stopCh) - go wait.Until(tc.evalSecrets, 10*time.Second, stopCh) + defer utilruntime.HandleCrash() + defer tc.queue.ShutDown() + + glog.Infof("Starting token cleaner controller") + defer glog.Infof("Shutting down token cleaner controller") + + if !controller.WaitForCacheSync("token_cleaner", stopCh, tc.secretSynced) { + return + } + + go wait.Until(tc.worker, 10*time.Second, stopCh) + <-stopCh } -func (tc *TokenCleaner) evalSecrets() { - for _, obj := range tc.secrets.List() { - tc.evalSecret(obj) +func (tc *TokenCleaner) enqueueSecrets(obj interface{}) { + key, err := controller.KeyFunc(obj) + if err != nil { + utilruntime.HandleError(err) + return } + tc.queue.Add(key) +} + +// worker runs a thread that dequeues secrets, handles them, and marks them done. +func (tc *TokenCleaner) worker() { + for tc.processNextWorkItem() { + } +} + +// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit. +func (tc *TokenCleaner) processNextWorkItem() bool { + key, quit := tc.queue.Get() + if quit { + return false + } + defer tc.queue.Done(key) + + if err := tc.syncFunc(key.(string)); err != nil { + tc.queue.AddRateLimited(key) + utilruntime.HandleError(fmt.Errorf("Sync %v failed with : %v", key, err)) + return true + } + + tc.queue.Forget(key) + return true +} + +func (tc *TokenCleaner) syncFunc(key string) error { + startTime := time.Now() + defer func() { + glog.V(4).Infof("Finished syncing secret %q (%v)", key, time.Now().Sub(startTime)) + }() + + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return err + } + + ret, err := tc.secretLister.Secrets(namespace).Get(name) + if apierrors.IsNotFound(err) { + glog.V(3).Infof("secret has been deleted: %v", key) + return nil + } + + if err != nil { + return err + } + + if ret.Type == bootstrapapi.SecretTypeBootstrapToken { + tc.evalSecret(ret) + } + return nil } func (tc *TokenCleaner) evalSecret(o interface{}) { diff --git a/pkg/controller/bootstrap/tokencleaner_test.go b/pkg/controller/bootstrap/tokencleaner_test.go index 47059dd4d1..5fddd7980f 100644 --- a/pkg/controller/bootstrap/tokencleaner_test.go +++ b/pkg/controller/bootstrap/tokencleaner_test.go @@ -23,6 +23,8 @@ import ( "github.com/davecgh/go-spew/spew" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/informers" + coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes/fake" core "k8s.io/client-go/testing" api "k8s.io/kubernetes/pkg/apis/core" @@ -32,24 +34,26 @@ func init() { spew.Config.DisableMethods = true } -func newTokenCleaner() (*TokenCleaner, *fake.Clientset, error) { +func newTokenCleaner() (*TokenCleaner, *fake.Clientset, coreinformers.SecretInformer, error) { options := DefaultTokenCleanerOptions() cl := fake.NewSimpleClientset() - tcc, err := NewTokenCleaner(cl, options) + informerFactory := informers.NewSharedInformerFactory(cl, options.SecretResync) + secrets := informerFactory.Core().V1().Secrets() + tcc, err := NewTokenCleaner(cl, secrets, options) if err != nil { - return nil, nil, err + return nil, nil, nil, err } - return tcc, cl, nil + return tcc, cl, secrets, nil } func TestCleanerNoExpiration(t *testing.T) { - cleaner, cl, err := newTokenCleaner() + cleaner, cl, secrets, err := newTokenCleaner() if err != nil { t.Fatalf("error creating TokenCleaner: %v", err) } secret := newTokenSecret("tokenID", "tokenSecret") - cleaner.secrets.Add(secret) + secrets.Informer().GetIndexer().Add(secret) cleaner.evalSecret(secret) @@ -59,14 +63,14 @@ func TestCleanerNoExpiration(t *testing.T) { } func TestCleanerExpired(t *testing.T) { - cleaner, cl, err := newTokenCleaner() + cleaner, cl, secrets, err := newTokenCleaner() if err != nil { t.Fatalf("error creating TokenCleaner: %v", err) } secret := newTokenSecret("tokenID", "tokenSecret") addSecretExpiration(secret, timeString(-time.Hour)) - cleaner.secrets.Add(secret) + secrets.Informer().GetIndexer().Add(secret) cleaner.evalSecret(secret) @@ -81,14 +85,14 @@ func TestCleanerExpired(t *testing.T) { } func TestCleanerNotExpired(t *testing.T) { - cleaner, cl, err := newTokenCleaner() + cleaner, cl, secrets, err := newTokenCleaner() if err != nil { t.Fatalf("error creating TokenCleaner: %v", err) } secret := newTokenSecret("tokenID", "tokenSecret") addSecretExpiration(secret, timeString(time.Hour)) - cleaner.secrets.Add(secret) + secrets.Informer().GetIndexer().Add(secret) cleaner.evalSecret(secret)