mirror of https://github.com/k3s-io/k3s
fix the HasSynced() bug for Informer
parent
a81fa2f926
commit
8e615df000
|
@ -100,6 +100,12 @@ type DeltaFIFO struct {
|
||||||
items map[string]Deltas
|
items map[string]Deltas
|
||||||
queue []string
|
queue []string
|
||||||
|
|
||||||
|
// populated is true if the first batch of items inserted by Replace() has been populated
|
||||||
|
// or Delete/Add/Update was called first.
|
||||||
|
populated bool
|
||||||
|
// initialPopulationCount is the number of items inserted by the first call of Replace()
|
||||||
|
initialPopulationCount int
|
||||||
|
|
||||||
// keyFunc is used to make the key used for queued item
|
// keyFunc is used to make the key used for queued item
|
||||||
// insertion and retrieval, and should be deterministic.
|
// insertion and retrieval, and should be deterministic.
|
||||||
keyFunc KeyFunc
|
keyFunc KeyFunc
|
||||||
|
@ -141,11 +147,20 @@ func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
|
||||||
return f.keyFunc(obj)
|
return f.keyFunc(obj)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Return true if an Add/Update/Delete/AddIfNotPresent are called first,
|
||||||
|
// or an Update called first but the first batch of items inserted by Replace() has been popped
|
||||||
|
func (f *DeltaFIFO) HasSynced() bool {
|
||||||
|
f.lock.Lock()
|
||||||
|
defer f.lock.Unlock()
|
||||||
|
return f.populated && f.initialPopulationCount == 0
|
||||||
|
}
|
||||||
|
|
||||||
// Add inserts an item, and puts it in the queue. The item is only enqueued
|
// Add inserts an item, and puts it in the queue. The item is only enqueued
|
||||||
// if it doesn't already exist in the set.
|
// if it doesn't already exist in the set.
|
||||||
func (f *DeltaFIFO) Add(obj interface{}) error {
|
func (f *DeltaFIFO) Add(obj interface{}) error {
|
||||||
f.lock.Lock()
|
f.lock.Lock()
|
||||||
defer f.lock.Unlock()
|
defer f.lock.Unlock()
|
||||||
|
f.populated = true
|
||||||
return f.queueActionLocked(Added, obj)
|
return f.queueActionLocked(Added, obj)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -153,6 +168,7 @@ func (f *DeltaFIFO) Add(obj interface{}) error {
|
||||||
func (f *DeltaFIFO) Update(obj interface{}) error {
|
func (f *DeltaFIFO) Update(obj interface{}) error {
|
||||||
f.lock.Lock()
|
f.lock.Lock()
|
||||||
defer f.lock.Unlock()
|
defer f.lock.Unlock()
|
||||||
|
f.populated = true
|
||||||
return f.queueActionLocked(Updated, obj)
|
return f.queueActionLocked(Updated, obj)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -166,6 +182,7 @@ func (f *DeltaFIFO) Delete(obj interface{}) error {
|
||||||
}
|
}
|
||||||
f.lock.Lock()
|
f.lock.Lock()
|
||||||
defer f.lock.Unlock()
|
defer f.lock.Unlock()
|
||||||
|
f.populated = true
|
||||||
if f.knownObjects == nil {
|
if f.knownObjects == nil {
|
||||||
if _, exists := f.items[id]; !exists {
|
if _, exists := f.items[id]; !exists {
|
||||||
// Presumably, this was deleted when a relist happened.
|
// Presumably, this was deleted when a relist happened.
|
||||||
|
@ -203,6 +220,7 @@ func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error {
|
||||||
}
|
}
|
||||||
f.lock.Lock()
|
f.lock.Lock()
|
||||||
defer f.lock.Unlock()
|
defer f.lock.Unlock()
|
||||||
|
f.populated = true
|
||||||
if _, exists := f.items[id]; exists {
|
if _, exists := f.items[id]; exists {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -354,6 +372,9 @@ func (f *DeltaFIFO) Pop() interface{} {
|
||||||
id := f.queue[0]
|
id := f.queue[0]
|
||||||
f.queue = f.queue[1:]
|
f.queue = f.queue[1:]
|
||||||
item, ok := f.items[id]
|
item, ok := f.items[id]
|
||||||
|
if f.initialPopulationCount > 0 {
|
||||||
|
f.initialPopulationCount--
|
||||||
|
}
|
||||||
if !ok {
|
if !ok {
|
||||||
// Item may have been deleted subsequently.
|
// Item may have been deleted subsequently.
|
||||||
continue
|
continue
|
||||||
|
@ -373,6 +394,12 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
|
||||||
f.lock.Lock()
|
f.lock.Lock()
|
||||||
defer f.lock.Unlock()
|
defer f.lock.Unlock()
|
||||||
keys := make(sets.String, len(list))
|
keys := make(sets.String, len(list))
|
||||||
|
|
||||||
|
if !f.populated {
|
||||||
|
f.populated = true
|
||||||
|
f.initialPopulationCount = len(list)
|
||||||
|
}
|
||||||
|
|
||||||
for _, item := range list {
|
for _, item := range list {
|
||||||
key, err := f.KeyOf(item)
|
key, err := f.KeyOf(item)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -327,3 +327,59 @@ func TestDeltaFIFO_KeyOf(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDeltaFIFO_HasSynced(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
actions []func(f *DeltaFIFO)
|
||||||
|
expectedSynced bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
actions: []func(f *DeltaFIFO){},
|
||||||
|
expectedSynced: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
actions: []func(f *DeltaFIFO){
|
||||||
|
func(f *DeltaFIFO) { f.Add(mkFifoObj("a", 1)) },
|
||||||
|
},
|
||||||
|
expectedSynced: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
actions: []func(f *DeltaFIFO){
|
||||||
|
func(f *DeltaFIFO) { f.Replace([]interface{}{}, "0") },
|
||||||
|
},
|
||||||
|
expectedSynced: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
actions: []func(f *DeltaFIFO){
|
||||||
|
func(f *DeltaFIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") },
|
||||||
|
},
|
||||||
|
expectedSynced: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
actions: []func(f *DeltaFIFO){
|
||||||
|
func(f *DeltaFIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") },
|
||||||
|
func(f *DeltaFIFO) { f.Pop() },
|
||||||
|
},
|
||||||
|
expectedSynced: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
actions: []func(f *DeltaFIFO){
|
||||||
|
func(f *DeltaFIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") },
|
||||||
|
func(f *DeltaFIFO) { f.Pop() },
|
||||||
|
func(f *DeltaFIFO) { f.Pop() },
|
||||||
|
},
|
||||||
|
expectedSynced: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, test := range tests {
|
||||||
|
f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil)
|
||||||
|
|
||||||
|
for _, action := range test.actions {
|
||||||
|
action(f)
|
||||||
|
}
|
||||||
|
if e, a := test.expectedSynced, f.HasSynced(); a != e {
|
||||||
|
t.Errorf("test case %v failed, expected: %v , got %v", i, e, a)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -32,6 +32,9 @@ type Queue interface {
|
||||||
// as nothing else (presumably more recent)
|
// as nothing else (presumably more recent)
|
||||||
// has since been added.
|
// has since been added.
|
||||||
AddIfNotPresent(interface{}) error
|
AddIfNotPresent(interface{}) error
|
||||||
|
|
||||||
|
// Return true if the first batch of items has been popped
|
||||||
|
HasSynced() bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// FIFO receives adds and updates from a Reflector, and puts them in a queue for
|
// FIFO receives adds and updates from a Reflector, and puts them in a queue for
|
||||||
|
@ -52,6 +55,13 @@ type FIFO struct {
|
||||||
// We depend on the property that items in the set are in the queue and vice versa.
|
// We depend on the property that items in the set are in the queue and vice versa.
|
||||||
items map[string]interface{}
|
items map[string]interface{}
|
||||||
queue []string
|
queue []string
|
||||||
|
|
||||||
|
// populated is true if the first batch of items inserted by Replace() has been populated
|
||||||
|
// or Delete/Add/Update was called first.
|
||||||
|
populated bool
|
||||||
|
// initialPopulationCount is the number of items inserted by the first call of Replace()
|
||||||
|
initialPopulationCount int
|
||||||
|
|
||||||
// keyFunc is used to make the key used for queued item insertion and retrieval, and
|
// keyFunc is used to make the key used for queued item insertion and retrieval, and
|
||||||
// should be deterministic.
|
// should be deterministic.
|
||||||
keyFunc KeyFunc
|
keyFunc KeyFunc
|
||||||
|
@ -61,6 +71,14 @@ var (
|
||||||
_ = Queue(&FIFO{}) // FIFO is a Queue
|
_ = Queue(&FIFO{}) // FIFO is a Queue
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Return true if an Add/Update/Delete/AddIfNotPresent are called first,
|
||||||
|
// or an Update called first but the first batch of items inserted by Replace() has been popped
|
||||||
|
func (f *FIFO) HasSynced() bool {
|
||||||
|
f.lock.Lock()
|
||||||
|
defer f.lock.Unlock()
|
||||||
|
return f.populated && f.initialPopulationCount == 0
|
||||||
|
}
|
||||||
|
|
||||||
// Add inserts an item, and puts it in the queue. The item is only enqueued
|
// Add inserts an item, and puts it in the queue. The item is only enqueued
|
||||||
// if it doesn't already exist in the set.
|
// if it doesn't already exist in the set.
|
||||||
func (f *FIFO) Add(obj interface{}) error {
|
func (f *FIFO) Add(obj interface{}) error {
|
||||||
|
@ -70,6 +88,7 @@ func (f *FIFO) Add(obj interface{}) error {
|
||||||
}
|
}
|
||||||
f.lock.Lock()
|
f.lock.Lock()
|
||||||
defer f.lock.Unlock()
|
defer f.lock.Unlock()
|
||||||
|
f.populated = true
|
||||||
if _, exists := f.items[id]; !exists {
|
if _, exists := f.items[id]; !exists {
|
||||||
f.queue = append(f.queue, id)
|
f.queue = append(f.queue, id)
|
||||||
}
|
}
|
||||||
|
@ -91,6 +110,7 @@ func (f *FIFO) AddIfNotPresent(obj interface{}) error {
|
||||||
}
|
}
|
||||||
f.lock.Lock()
|
f.lock.Lock()
|
||||||
defer f.lock.Unlock()
|
defer f.lock.Unlock()
|
||||||
|
f.populated = true
|
||||||
if _, exists := f.items[id]; exists {
|
if _, exists := f.items[id]; exists {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -116,6 +136,7 @@ func (f *FIFO) Delete(obj interface{}) error {
|
||||||
}
|
}
|
||||||
f.lock.Lock()
|
f.lock.Lock()
|
||||||
defer f.lock.Unlock()
|
defer f.lock.Unlock()
|
||||||
|
f.populated = true
|
||||||
delete(f.items, id)
|
delete(f.items, id)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -174,6 +195,9 @@ func (f *FIFO) Pop() interface{} {
|
||||||
}
|
}
|
||||||
id := f.queue[0]
|
id := f.queue[0]
|
||||||
f.queue = f.queue[1:]
|
f.queue = f.queue[1:]
|
||||||
|
if f.initialPopulationCount > 0 {
|
||||||
|
f.initialPopulationCount--
|
||||||
|
}
|
||||||
item, ok := f.items[id]
|
item, ok := f.items[id]
|
||||||
if !ok {
|
if !ok {
|
||||||
// Item may have been deleted subsequently.
|
// Item may have been deleted subsequently.
|
||||||
|
@ -200,6 +224,12 @@ func (f *FIFO) Replace(list []interface{}, resourceVersion string) error {
|
||||||
|
|
||||||
f.lock.Lock()
|
f.lock.Lock()
|
||||||
defer f.lock.Unlock()
|
defer f.lock.Unlock()
|
||||||
|
|
||||||
|
if !f.populated {
|
||||||
|
f.populated = true
|
||||||
|
f.initialPopulationCount = len(items)
|
||||||
|
}
|
||||||
|
|
||||||
f.items = items
|
f.items = items
|
||||||
f.queue = f.queue[:0]
|
f.queue = f.queue[:0]
|
||||||
for id := range items {
|
for id := range items {
|
||||||
|
|
|
@ -177,3 +177,59 @@ func TestFIFO_addIfNotPresent(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestFIFO_HasSynced(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
actions []func(f *FIFO)
|
||||||
|
expectedSynced bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
actions: []func(f *FIFO){},
|
||||||
|
expectedSynced: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
actions: []func(f *FIFO){
|
||||||
|
func(f *FIFO) { f.Add(mkFifoObj("a", 1)) },
|
||||||
|
},
|
||||||
|
expectedSynced: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
actions: []func(f *FIFO){
|
||||||
|
func(f *FIFO) { f.Replace([]interface{}{}, "0") },
|
||||||
|
},
|
||||||
|
expectedSynced: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
actions: []func(f *FIFO){
|
||||||
|
func(f *FIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") },
|
||||||
|
},
|
||||||
|
expectedSynced: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
actions: []func(f *FIFO){
|
||||||
|
func(f *FIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") },
|
||||||
|
func(f *FIFO) { f.Pop() },
|
||||||
|
},
|
||||||
|
expectedSynced: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
actions: []func(f *FIFO){
|
||||||
|
func(f *FIFO) { f.Replace([]interface{}{mkFifoObj("a", 1), mkFifoObj("b", 2)}, "0") },
|
||||||
|
func(f *FIFO) { f.Pop() },
|
||||||
|
func(f *FIFO) { f.Pop() },
|
||||||
|
},
|
||||||
|
expectedSynced: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, test := range tests {
|
||||||
|
f := NewFIFO(testFifoObjectKeyFunc)
|
||||||
|
|
||||||
|
for _, action := range test.actions {
|
||||||
|
action(f)
|
||||||
|
}
|
||||||
|
if e, a := test.expectedSynced, f.HasSynced(); a != e {
|
||||||
|
t.Errorf("test case %v failed, expected: %v , got %v", i, e, a)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -98,12 +98,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
|
||||||
|
|
||||||
// Returns true once this controller has completed an initial resource listing
|
// Returns true once this controller has completed an initial resource listing
|
||||||
func (c *Controller) HasSynced() bool {
|
func (c *Controller) HasSynced() bool {
|
||||||
c.reflectorMutex.RLock()
|
return c.config.Queue.HasSynced()
|
||||||
defer c.reflectorMutex.RUnlock()
|
|
||||||
if c.reflector == nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return c.reflector.LastSyncResourceVersion() != ""
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Requeue adds the provided object back into the queue if it does not already exist.
|
// Requeue adds the provided object back into the queue if it does not already exist.
|
||||||
|
|
Loading…
Reference in New Issue