From 8e615df0005d56292714ae74fe0369580aa21a04 Mon Sep 17 00:00:00 2001 From: mqliang Date: Wed, 3 Feb 2016 11:03:31 +0800 Subject: [PATCH] fix the HasSynced() bug for Informer --- pkg/client/cache/delta_fifo.go | 27 +++++++++++++ pkg/client/cache/delta_fifo_test.go | 56 ++++++++++++++++++++++++++ pkg/client/cache/fifo.go | 30 ++++++++++++++ pkg/client/cache/fifo_test.go | 56 ++++++++++++++++++++++++++ pkg/controller/framework/controller.go | 7 +--- 5 files changed, 170 insertions(+), 6 deletions(-) diff --git a/pkg/client/cache/delta_fifo.go b/pkg/client/cache/delta_fifo.go index fcf2c31371..e7cc1aad19 100644 --- a/pkg/client/cache/delta_fifo.go +++ b/pkg/client/cache/delta_fifo.go @@ -100,6 +100,12 @@ type DeltaFIFO struct { items map[string]Deltas queue []string + // populated is true if the first batch of items inserted by Replace() has been populated + // or Delete/Add/Update was called first. + populated bool + // initialPopulationCount is the number of items inserted by the first call of Replace() + initialPopulationCount int + // keyFunc is used to make the key used for queued item // insertion and retrieval, and should be deterministic. keyFunc KeyFunc @@ -141,11 +147,20 @@ func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) { return f.keyFunc(obj) } +// Return true if an Add/Update/Delete/AddIfNotPresent are called first, +// or an Update called first but the first batch of items inserted by Replace() has been popped +func (f *DeltaFIFO) HasSynced() bool { + f.lock.Lock() + defer f.lock.Unlock() + return f.populated && f.initialPopulationCount == 0 +} + // Add inserts an item, and puts it in the queue. The item is only enqueued // if it doesn't already exist in the set. func (f *DeltaFIFO) Add(obj interface{}) error { f.lock.Lock() defer f.lock.Unlock() + f.populated = true return f.queueActionLocked(Added, obj) } @@ -153,6 +168,7 @@ func (f *DeltaFIFO) Add(obj interface{}) error { func (f *DeltaFIFO) Update(obj interface{}) error { f.lock.Lock() defer f.lock.Unlock() + f.populated = true return f.queueActionLocked(Updated, obj) } @@ -166,6 +182,7 @@ func (f *DeltaFIFO) Delete(obj interface{}) error { } f.lock.Lock() defer f.lock.Unlock() + f.populated = true if f.knownObjects == nil { if _, exists := f.items[id]; !exists { // Presumably, this was deleted when a relist happened. @@ -203,6 +220,7 @@ func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error { } f.lock.Lock() defer f.lock.Unlock() + f.populated = true if _, exists := f.items[id]; exists { return nil } @@ -354,6 +372,9 @@ func (f *DeltaFIFO) Pop() interface{} { id := f.queue[0] f.queue = f.queue[1:] item, ok := f.items[id] + if f.initialPopulationCount > 0 { + f.initialPopulationCount-- + } if !ok { // Item may have been deleted subsequently. continue @@ -373,6 +394,12 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error { f.lock.Lock() defer f.lock.Unlock() keys := make(sets.String, len(list)) + + if !f.populated { + f.populated = true + f.initialPopulationCount = len(list) + } + for _, item := range list { key, err := f.KeyOf(item) if err != nil { diff --git a/pkg/client/cache/delta_fifo_test.go b/pkg/client/cache/delta_fifo_test.go index 6adec86057..8efd982b5a 100644 --- a/pkg/client/cache/delta_fifo_test.go +++ b/pkg/client/cache/delta_fifo_test.go @@ -327,3 +327,59 @@ func TestDeltaFIFO_KeyOf(t *testing.T) { } } } + +func TestDeltaFIFO_HasSynced(t *testing.T) { + tests := []struct { + actions []func(f *DeltaFIFO) + expectedSynced bool + }{ + { + actions: []func(f *DeltaFIFO){}, + expectedSynced: false, + }, + { + actions: []func(f *DeltaFIFO){ + func(f *DeltaFIFO) { f.Add(mkFifoObj("a", 1)) }, + }, + expectedSynced: true, + }, + { + actions: []func(f *DeltaFIFO){ + func(f *DeltaFIFO) { f.Replace([]interface{}{}, "0") }, + }, + expectedSynced: true, + }, + { + actions: []func(f *DeltaFIFO){ + func(f *DeltaFIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") }, + }, + expectedSynced: false, + }, + { + actions: []func(f *DeltaFIFO){ + func(f *DeltaFIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") }, + func(f *DeltaFIFO) { f.Pop() }, + }, + expectedSynced: false, + }, + { + actions: []func(f *DeltaFIFO){ + func(f *DeltaFIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") }, + func(f *DeltaFIFO) { f.Pop() }, + func(f *DeltaFIFO) { f.Pop() }, + }, + expectedSynced: true, + }, + } + + for i, test := range tests { + f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil) + + for _, action := range test.actions { + action(f) + } + if e, a := test.expectedSynced, f.HasSynced(); a != e { + t.Errorf("test case %v failed, expected: %v , got %v", i, e, a) + } + } +} diff --git a/pkg/client/cache/fifo.go b/pkg/client/cache/fifo.go index b56687ee0f..d4076a326d 100644 --- a/pkg/client/cache/fifo.go +++ b/pkg/client/cache/fifo.go @@ -32,6 +32,9 @@ type Queue interface { // as nothing else (presumably more recent) // has since been added. AddIfNotPresent(interface{}) error + + // Return true if the first batch of items has been popped + HasSynced() bool } // FIFO receives adds and updates from a Reflector, and puts them in a queue for @@ -52,6 +55,13 @@ type FIFO struct { // We depend on the property that items in the set are in the queue and vice versa. items map[string]interface{} queue []string + + // populated is true if the first batch of items inserted by Replace() has been populated + // or Delete/Add/Update was called first. + populated bool + // initialPopulationCount is the number of items inserted by the first call of Replace() + initialPopulationCount int + // keyFunc is used to make the key used for queued item insertion and retrieval, and // should be deterministic. keyFunc KeyFunc @@ -61,6 +71,14 @@ var ( _ = Queue(&FIFO{}) // FIFO is a Queue ) +// Return true if an Add/Update/Delete/AddIfNotPresent are called first, +// or an Update called first but the first batch of items inserted by Replace() has been popped +func (f *FIFO) HasSynced() bool { + f.lock.Lock() + defer f.lock.Unlock() + return f.populated && f.initialPopulationCount == 0 +} + // Add inserts an item, and puts it in the queue. The item is only enqueued // if it doesn't already exist in the set. func (f *FIFO) Add(obj interface{}) error { @@ -70,6 +88,7 @@ func (f *FIFO) Add(obj interface{}) error { } f.lock.Lock() defer f.lock.Unlock() + f.populated = true if _, exists := f.items[id]; !exists { f.queue = append(f.queue, id) } @@ -91,6 +110,7 @@ func (f *FIFO) AddIfNotPresent(obj interface{}) error { } f.lock.Lock() defer f.lock.Unlock() + f.populated = true if _, exists := f.items[id]; exists { return nil } @@ -116,6 +136,7 @@ func (f *FIFO) Delete(obj interface{}) error { } f.lock.Lock() defer f.lock.Unlock() + f.populated = true delete(f.items, id) return err } @@ -174,6 +195,9 @@ func (f *FIFO) Pop() interface{} { } id := f.queue[0] f.queue = f.queue[1:] + if f.initialPopulationCount > 0 { + f.initialPopulationCount-- + } item, ok := f.items[id] if !ok { // Item may have been deleted subsequently. @@ -200,6 +224,12 @@ func (f *FIFO) Replace(list []interface{}, resourceVersion string) error { f.lock.Lock() defer f.lock.Unlock() + + if !f.populated { + f.populated = true + f.initialPopulationCount = len(items) + } + f.items = items f.queue = f.queue[:0] for id := range items { diff --git a/pkg/client/cache/fifo_test.go b/pkg/client/cache/fifo_test.go index bf99b5aa15..974fa6d3b7 100644 --- a/pkg/client/cache/fifo_test.go +++ b/pkg/client/cache/fifo_test.go @@ -177,3 +177,59 @@ func TestFIFO_addIfNotPresent(t *testing.T) { } } } + +func TestFIFO_HasSynced(t *testing.T) { + tests := []struct { + actions []func(f *FIFO) + expectedSynced bool + }{ + { + actions: []func(f *FIFO){}, + expectedSynced: false, + }, + { + actions: []func(f *FIFO){ + func(f *FIFO) { f.Add(mkFifoObj("a", 1)) }, + }, + expectedSynced: true, + }, + { + actions: []func(f *FIFO){ + func(f *FIFO) { f.Replace([]interface{}{}, "0") }, + }, + expectedSynced: true, + }, + { + actions: []func(f *FIFO){ + func(f *FIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") }, + }, + expectedSynced: false, + }, + { + actions: []func(f *FIFO){ + func(f *FIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") }, + func(f *FIFO) { f.Pop() }, + }, + expectedSynced: false, + }, + { + actions: []func(f *FIFO){ + func(f *FIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") }, + func(f *FIFO) { f.Pop() }, + func(f *FIFO) { f.Pop() }, + }, + expectedSynced: true, + }, + } + + for i, test := range tests { + f := NewFIFO(testFifoObjectKeyFunc) + + for _, action := range test.actions { + action(f) + } + if e, a := test.expectedSynced, f.HasSynced(); a != e { + t.Errorf("test case %v failed, expected: %v , got %v", i, e, a) + } + } +} diff --git a/pkg/controller/framework/controller.go b/pkg/controller/framework/controller.go index ca182343aa..c789119e87 100644 --- a/pkg/controller/framework/controller.go +++ b/pkg/controller/framework/controller.go @@ -98,12 +98,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) { // Returns true once this controller has completed an initial resource listing func (c *Controller) HasSynced() bool { - c.reflectorMutex.RLock() - defer c.reflectorMutex.RUnlock() - if c.reflector == nil { - return false - } - return c.reflector.LastSyncResourceVersion() != "" + return c.config.Queue.HasSynced() } // Requeue adds the provided object back into the queue if it does not already exist.