Wait until stores are filled before processing service account token events

pull/6/head
Jordan Liggitt 2015-05-19 05:24:17 -04:00
parent d3778f5f5a
commit 49ceb82179
5 changed files with 69 additions and 6 deletions

View File

@ -20,6 +20,7 @@ import (
"errors"
"io"
"reflect"
"sync"
"time"
apierrs "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
@ -53,8 +54,10 @@ type Reflector struct {
resyncPeriod time.Duration
// lastSyncResourceVersion is the resource version token last
// observed when doing a sync with the underlying store
// it is not thread safe as it is not synchronized with access to the store
// it is thread safe, but not synchronized with the underlying store
lastSyncResourceVersion string
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
lastSyncResourceVersionMutex sync.RWMutex
}
// NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector
@ -145,7 +148,7 @@ func (r *Reflector) listAndWatch(stopCh <-chan struct{}) {
glog.Errorf("Unable to sync list result: %v", err)
return
}
r.lastSyncResourceVersion = resourceVersion
r.setLastSyncResourceVersion(resourceVersion)
for {
w, err := r.listerWatcher.Watch(resourceVersion)
@ -225,7 +228,7 @@ loop:
glog.Errorf("unable to understand watch event %#v", event)
}
*resourceVersion = meta.ResourceVersion()
r.lastSyncResourceVersion = *resourceVersion
r.setLastSyncResourceVersion(*resourceVersion)
eventCount++
}
}
@ -242,5 +245,13 @@ loop:
// LastSyncResourceVersion is the resource version observed when last sync with the underlying store
// The value returned is not synchronized with access to the underlying store and is not thread-safe
func (r *Reflector) LastSyncResourceVersion() string {
r.lastSyncResourceVersionMutex.RLock()
defer r.lastSyncResourceVersionMutex.RUnlock()
return r.lastSyncResourceVersion
}
func (r *Reflector) setLastSyncResourceVersion(v string) {
r.lastSyncResourceVersionMutex.Lock()
defer r.lastSyncResourceVersionMutex.Unlock()
r.lastSyncResourceVersion = v
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package framework
import (
"sync"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
@ -61,7 +62,9 @@ type ProcessFunc func(obj interface{}) error
// Controller is a generic controller framework.
type Controller struct {
config Config
config Config
reflector *cache.Reflector
reflectorMutex sync.RWMutex
}
// New makes a new Controller from the given Config.
@ -77,16 +80,32 @@ func New(c *Config) *Controller {
// Run blocks; call via go.
func (c *Controller) Run(stopCh <-chan struct{}) {
defer util.HandleCrash()
cache.NewReflector(
r := cache.NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
).RunUntil(stopCh)
)
c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()
r.RunUntil(stopCh)
util.Until(c.processLoop, time.Second, stopCh)
}
// Returns true once this controller has completed an initial resource listing
func (c *Controller) HasSynced() bool {
c.reflectorMutex.RLock()
defer c.reflectorMutex.RUnlock()
if c.reflector == nil {
return false
}
return c.reflector.LastSyncResourceVersion() != ""
}
// processLoop drains the work queue.
// TODO: Consider doing the processing in parallel. This will require a little thought
// to make sure that we don't end up processing the same object multiple times

View File

@ -214,10 +214,20 @@ func TestHammerController(t *testing.T) {
},
)
if controller.HasSynced() {
t.Errorf("Expected HasSynced() to return false before we started the controller")
}
// Run the controller and run it until we close stop.
stop := make(chan struct{})
go controller.Run(stop)
// Let's wait for the controller to do its initial sync
time.Sleep(100 * time.Millisecond)
if !controller.HasSynced() {
t.Errorf("Expected HasSynced() to return true after the initial sync")
}
wg := sync.WaitGroup{}
const threads = 3
wg.Add(threads)

View File

@ -96,6 +96,9 @@ func NewTokensController(cl client.Interface, options TokensControllerOptions) *
cache.Indexers{"namespace": cache.MetaNamespaceIndexFunc},
)
e.serviceAccountsSynced = e.serviceAccountController.HasSynced
e.secretsSynced = e.secretController.HasSynced
return e
}
@ -112,6 +115,10 @@ type TokensController struct {
// Since we join two objects, we'll watch both of them with controllers.
serviceAccountController *framework.Controller
secretController *framework.Controller
// These are here so tests can inject a 'return true'.
serviceAccountsSynced func() bool
secretsSynced func() bool
}
// Runs controller loops and returns immediately
@ -133,6 +140,9 @@ func (e *TokensController) Stop() {
// serviceAccountAdded reacts to a ServiceAccount creation by creating a corresponding ServiceAccountToken Secret
func (e *TokensController) serviceAccountAdded(obj interface{}) {
if !e.secretsSynced() {
return
}
serviceAccount := obj.(*api.ServiceAccount)
err := e.createSecretIfNeeded(serviceAccount)
if err != nil {
@ -142,6 +152,9 @@ func (e *TokensController) serviceAccountAdded(obj interface{}) {
// serviceAccountUpdated reacts to a ServiceAccount update (or re-list) by ensuring a corresponding ServiceAccountToken Secret exists
func (e *TokensController) serviceAccountUpdated(oldObj interface{}, newObj interface{}) {
if !e.secretsSynced() {
return
}
newServiceAccount := newObj.(*api.ServiceAccount)
err := e.createSecretIfNeeded(newServiceAccount)
if err != nil {
@ -171,6 +184,9 @@ func (e *TokensController) serviceAccountDeleted(obj interface{}) {
// secretAdded reacts to a Secret create by ensuring the referenced ServiceAccount exists, and by adding a token to the secret if needed
func (e *TokensController) secretAdded(obj interface{}) {
if !e.serviceAccountsSynced() {
return
}
secret := obj.(*api.Secret)
serviceAccount, err := e.getServiceAccount(secret)
if err != nil {
@ -188,6 +204,9 @@ func (e *TokensController) secretAdded(obj interface{}) {
// secretUpdated reacts to a Secret update (or re-list) by deleting the secret (if the referenced ServiceAccount does not exist)
func (e *TokensController) secretUpdated(oldObj interface{}, newObj interface{}) {
if !e.serviceAccountsSynced() {
return
}
newSecret := newObj.(*api.Secret)
newServiceAccount, err := e.getServiceAccount(newSecret)
if err != nil {

View File

@ -335,6 +335,10 @@ func TestTokenCreation(t *testing.T) {
controller := NewTokensController(client, DefaultTokenControllerOptions(generator))
// Tell the token controller its stores have been synced
controller.serviceAccountsSynced = func() bool { return true }
controller.secretsSynced = func() bool { return true }
if tc.ExistingServiceAccount != nil {
controller.serviceAccounts.Add(tc.ExistingServiceAccount)
}