Merge pull request #20269 from mqliang/sync-delta-fifo

add a HasSynced() for DeltaFIFO and FIFO, method, which is very helpful for Informer
pull/6/head
Daniel Smith 2016-02-05 15:51:50 -08:00
commit 9b68e8ec2b
5 changed files with 170 additions and 6 deletions

View File

@ -100,6 +100,12 @@ type DeltaFIFO struct {
items map[string]Deltas items map[string]Deltas
queue []string queue []string
// populated is true if the first batch of items inserted by Replace() has been populated
// or Delete/Add/Update was called first.
populated bool
// initialPopulationCount is the number of items inserted by the first call of Replace()
initialPopulationCount int
// keyFunc is used to make the key used for queued item // keyFunc is used to make the key used for queued item
// insertion and retrieval, and should be deterministic. // insertion and retrieval, and should be deterministic.
keyFunc KeyFunc keyFunc KeyFunc
@ -141,11 +147,20 @@ func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
return f.keyFunc(obj) return f.keyFunc(obj)
} }
// Return true if an Add/Update/Delete/AddIfNotPresent are called first,
// or an Update called first but the first batch of items inserted by Replace() has been popped
func (f *DeltaFIFO) HasSynced() bool {
f.lock.Lock()
defer f.lock.Unlock()
return f.populated && f.initialPopulationCount == 0
}
// Add inserts an item, and puts it in the queue. The item is only enqueued // Add inserts an item, and puts it in the queue. The item is only enqueued
// if it doesn't already exist in the set. // if it doesn't already exist in the set.
func (f *DeltaFIFO) Add(obj interface{}) error { func (f *DeltaFIFO) Add(obj interface{}) error {
f.lock.Lock() f.lock.Lock()
defer f.lock.Unlock() defer f.lock.Unlock()
f.populated = true
return f.queueActionLocked(Added, obj) return f.queueActionLocked(Added, obj)
} }
@ -153,6 +168,7 @@ func (f *DeltaFIFO) Add(obj interface{}) error {
func (f *DeltaFIFO) Update(obj interface{}) error { func (f *DeltaFIFO) Update(obj interface{}) error {
f.lock.Lock() f.lock.Lock()
defer f.lock.Unlock() defer f.lock.Unlock()
f.populated = true
return f.queueActionLocked(Updated, obj) return f.queueActionLocked(Updated, obj)
} }
@ -166,6 +182,7 @@ func (f *DeltaFIFO) Delete(obj interface{}) error {
} }
f.lock.Lock() f.lock.Lock()
defer f.lock.Unlock() defer f.lock.Unlock()
f.populated = true
if f.knownObjects == nil { if f.knownObjects == nil {
if _, exists := f.items[id]; !exists { if _, exists := f.items[id]; !exists {
// Presumably, this was deleted when a relist happened. // Presumably, this was deleted when a relist happened.
@ -203,6 +220,7 @@ func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error {
} }
f.lock.Lock() f.lock.Lock()
defer f.lock.Unlock() defer f.lock.Unlock()
f.populated = true
if _, exists := f.items[id]; exists { if _, exists := f.items[id]; exists {
return nil return nil
} }
@ -354,6 +372,9 @@ func (f *DeltaFIFO) Pop() interface{} {
id := f.queue[0] id := f.queue[0]
f.queue = f.queue[1:] f.queue = f.queue[1:]
item, ok := f.items[id] item, ok := f.items[id]
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
if !ok { if !ok {
// Item may have been deleted subsequently. // Item may have been deleted subsequently.
continue continue
@ -373,6 +394,12 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
f.lock.Lock() f.lock.Lock()
defer f.lock.Unlock() defer f.lock.Unlock()
keys := make(sets.String, len(list)) keys := make(sets.String, len(list))
if !f.populated {
f.populated = true
f.initialPopulationCount = len(list)
}
for _, item := range list { for _, item := range list {
key, err := f.KeyOf(item) key, err := f.KeyOf(item)
if err != nil { if err != nil {

View File

@ -327,3 +327,59 @@ func TestDeltaFIFO_KeyOf(t *testing.T) {
} }
} }
} }
func TestDeltaFIFO_HasSynced(t *testing.T) {
tests := []struct {
actions []func(f *DeltaFIFO)
expectedSynced bool
}{
{
actions: []func(f *DeltaFIFO){},
expectedSynced: false,
},
{
actions: []func(f *DeltaFIFO){
func(f *DeltaFIFO) { f.Add(mkFifoObj("a", 1)) },
},
expectedSynced: true,
},
{
actions: []func(f *DeltaFIFO){
func(f *DeltaFIFO) { f.Replace([]interface{}{}, "0") },
},
expectedSynced: true,
},
{
actions: []func(f *DeltaFIFO){
func(f *DeltaFIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") },
},
expectedSynced: false,
},
{
actions: []func(f *DeltaFIFO){
func(f *DeltaFIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") },
func(f *DeltaFIFO) { f.Pop() },
},
expectedSynced: false,
},
{
actions: []func(f *DeltaFIFO){
func(f *DeltaFIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") },
func(f *DeltaFIFO) { f.Pop() },
func(f *DeltaFIFO) { f.Pop() },
},
expectedSynced: true,
},
}
for i, test := range tests {
f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil)
for _, action := range test.actions {
action(f)
}
if e, a := test.expectedSynced, f.HasSynced(); a != e {
t.Errorf("test case %v failed, expected: %v , got %v", i, e, a)
}
}
}

View File

@ -32,6 +32,9 @@ type Queue interface {
// as nothing else (presumably more recent) // as nothing else (presumably more recent)
// has since been added. // has since been added.
AddIfNotPresent(interface{}) error AddIfNotPresent(interface{}) error
// Return true if the first batch of items has been popped
HasSynced() bool
} }
// FIFO receives adds and updates from a Reflector, and puts them in a queue for // FIFO receives adds and updates from a Reflector, and puts them in a queue for
@ -52,6 +55,13 @@ type FIFO struct {
// We depend on the property that items in the set are in the queue and vice versa. // We depend on the property that items in the set are in the queue and vice versa.
items map[string]interface{} items map[string]interface{}
queue []string queue []string
// populated is true if the first batch of items inserted by Replace() has been populated
// or Delete/Add/Update was called first.
populated bool
// initialPopulationCount is the number of items inserted by the first call of Replace()
initialPopulationCount int
// keyFunc is used to make the key used for queued item insertion and retrieval, and // keyFunc is used to make the key used for queued item insertion and retrieval, and
// should be deterministic. // should be deterministic.
keyFunc KeyFunc keyFunc KeyFunc
@ -61,6 +71,14 @@ var (
_ = Queue(&FIFO{}) // FIFO is a Queue _ = Queue(&FIFO{}) // FIFO is a Queue
) )
// Return true if an Add/Update/Delete/AddIfNotPresent are called first,
// or an Update called first but the first batch of items inserted by Replace() has been popped
func (f *FIFO) HasSynced() bool {
f.lock.Lock()
defer f.lock.Unlock()
return f.populated && f.initialPopulationCount == 0
}
// Add inserts an item, and puts it in the queue. The item is only enqueued // Add inserts an item, and puts it in the queue. The item is only enqueued
// if it doesn't already exist in the set. // if it doesn't already exist in the set.
func (f *FIFO) Add(obj interface{}) error { func (f *FIFO) Add(obj interface{}) error {
@ -70,6 +88,7 @@ func (f *FIFO) Add(obj interface{}) error {
} }
f.lock.Lock() f.lock.Lock()
defer f.lock.Unlock() defer f.lock.Unlock()
f.populated = true
if _, exists := f.items[id]; !exists { if _, exists := f.items[id]; !exists {
f.queue = append(f.queue, id) f.queue = append(f.queue, id)
} }
@ -91,6 +110,7 @@ func (f *FIFO) AddIfNotPresent(obj interface{}) error {
} }
f.lock.Lock() f.lock.Lock()
defer f.lock.Unlock() defer f.lock.Unlock()
f.populated = true
if _, exists := f.items[id]; exists { if _, exists := f.items[id]; exists {
return nil return nil
} }
@ -116,6 +136,7 @@ func (f *FIFO) Delete(obj interface{}) error {
} }
f.lock.Lock() f.lock.Lock()
defer f.lock.Unlock() defer f.lock.Unlock()
f.populated = true
delete(f.items, id) delete(f.items, id)
return err return err
} }
@ -174,6 +195,9 @@ func (f *FIFO) Pop() interface{} {
} }
id := f.queue[0] id := f.queue[0]
f.queue = f.queue[1:] f.queue = f.queue[1:]
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
item, ok := f.items[id] item, ok := f.items[id]
if !ok { if !ok {
// Item may have been deleted subsequently. // Item may have been deleted subsequently.
@ -200,6 +224,12 @@ func (f *FIFO) Replace(list []interface{}, resourceVersion string) error {
f.lock.Lock() f.lock.Lock()
defer f.lock.Unlock() defer f.lock.Unlock()
if !f.populated {
f.populated = true
f.initialPopulationCount = len(items)
}
f.items = items f.items = items
f.queue = f.queue[:0] f.queue = f.queue[:0]
for id := range items { for id := range items {

View File

@ -177,3 +177,59 @@ func TestFIFO_addIfNotPresent(t *testing.T) {
} }
} }
} }
func TestFIFO_HasSynced(t *testing.T) {
tests := []struct {
actions []func(f *FIFO)
expectedSynced bool
}{
{
actions: []func(f *FIFO){},
expectedSynced: false,
},
{
actions: []func(f *FIFO){
func(f *FIFO) { f.Add(mkFifoObj("a", 1)) },
},
expectedSynced: true,
},
{
actions: []func(f *FIFO){
func(f *FIFO) { f.Replace([]interface{}{}, "0") },
},
expectedSynced: true,
},
{
actions: []func(f *FIFO){
func(f *FIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") },
},
expectedSynced: false,
},
{
actions: []func(f *FIFO){
func(f *FIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") },
func(f *FIFO) { f.Pop() },
},
expectedSynced: false,
},
{
actions: []func(f *FIFO){
func(f *FIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") },
func(f *FIFO) { f.Pop() },
func(f *FIFO) { f.Pop() },
},
expectedSynced: true,
},
}
for i, test := range tests {
f := NewFIFO(testFifoObjectKeyFunc)
for _, action := range test.actions {
action(f)
}
if e, a := test.expectedSynced, f.HasSynced(); a != e {
t.Errorf("test case %v failed, expected: %v , got %v", i, e, a)
}
}
}

View File

@ -99,12 +99,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
// Returns true once this controller has completed an initial resource listing // Returns true once this controller has completed an initial resource listing
func (c *Controller) HasSynced() bool { func (c *Controller) HasSynced() bool {
c.reflectorMutex.RLock() return c.config.Queue.HasSynced()
defer c.reflectorMutex.RUnlock()
if c.reflector == nil {
return false
}
return c.reflector.LastSyncResourceVersion() != ""
} }
// Requeue adds the provided object back into the queue if it does not already exist. // Requeue adds the provided object back into the queue if it does not already exist.