mirror of https://github.com/k3s-io/k3s
Merge pull request #40815 from ncdc/delta-fifo-atomic-resync
Automatic merge from submit-queue (batch tested with PRs 40873, 40948, 39580, 41065, 40815) Make DeltaFIFO Resync atomic Make DeltaFIFO's Resync operation atomic, so it enqueues the entire queue before allowing adds/updates/deletes. I'm hoping to use this to help with custom resync periods for multiple event handlers against a single shared informer (see https://github.com/kubernetes/kubernetes/pull/40759#pullrequestreview-19598213 for the motivation). @lavalamp @smarterclayton @deads2k @liggitt @sttts @timothysc @wojtek-t @gmarek @kubernetes/sig-api-machinery-pr-reviews @kubernetes/sig-scalability-pr-reviewspull/6/head
commit
bd17091e16
|
@ -505,14 +505,12 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
|
|||
|
||||
// Resync will send a sync event for each item
|
||||
func (f *DeltaFIFO) Resync() error {
|
||||
var keys []string
|
||||
func() {
|
||||
f.lock.RLock()
|
||||
defer f.lock.RUnlock()
|
||||
keys = f.knownObjects.ListKeys()
|
||||
}()
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
|
||||
keys := f.knownObjects.ListKeys()
|
||||
for _, k := range keys {
|
||||
if err := f.syncKey(k); err != nil {
|
||||
if err := f.syncKeyLocked(k); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -522,6 +520,11 @@ func (f *DeltaFIFO) Resync() error {
|
|||
func (f *DeltaFIFO) syncKey(key string) error {
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
|
||||
return f.syncKeyLocked(key)
|
||||
}
|
||||
|
||||
func (f *DeltaFIFO) syncKeyLocked(key string) error {
|
||||
obj, exists, err := f.knownObjects.GetByKey(key)
|
||||
if err != nil {
|
||||
glog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
|
||||
|
|
Loading…
Reference in New Issue