mirror of https://github.com/k3s-io/k3s
Fix bugs in DeltaFIFO
parent
3c822c0b3c
commit
ccd42e9236
|
@ -189,12 +189,20 @@ func (f *DeltaFIFO) Delete(obj interface{}) error {
|
|||
// Don't provide a second report of the same deletion.
|
||||
return nil
|
||||
}
|
||||
} else if _, exists, err := f.knownObjects.GetByKey(id); err == nil && !exists {
|
||||
// Presumably, this was deleted when a relist happened.
|
||||
// Don't provide a second report of the same deletion.
|
||||
// TODO(lavalamp): This may be racy-- we aren't properly locked
|
||||
// with knownObjects.
|
||||
return nil
|
||||
} else {
|
||||
// We only want to skip the "deletion" action if the object doesn't
|
||||
// exist in knownObjects and it doesn't have corresponding item in items.
|
||||
// Note that even if there is a "deletion" action in items, we can ignore it,
|
||||
// because it will be deduped automatically in "queueActionLocked"
|
||||
_, exists, err := f.knownObjects.GetByKey(id)
|
||||
_, itemsExist := f.items[id]
|
||||
if err == nil && !exists && !itemsExist {
|
||||
// Presumably, this was deleted when a relist happened.
|
||||
// Don't provide a second report of the same deletion.
|
||||
// TODO(lavalamp): This may be racy-- we aren't properly locked
|
||||
// with knownObjects.
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return f.queueActionLocked(Deleted, obj)
|
||||
|
@ -270,6 +278,13 @@ func isDeletionDup(a, b *Delta) *Delta {
|
|||
return b
|
||||
}
|
||||
|
||||
// willObjectBeDeletedLocked returns true only if the last delta for the
|
||||
// given object is Delete. Caller must lock first.
|
||||
func (f *DeltaFIFO) willObjectBeDeletedLocked(id string) bool {
|
||||
deltas := f.items[id]
|
||||
return len(deltas) > 0 && deltas[len(deltas)-1].Type == Deleted
|
||||
}
|
||||
|
||||
// queueActionLocked appends to the delta list for the object, calling
|
||||
// f.deltaCompressor if needed. Caller must lock first.
|
||||
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
|
||||
|
@ -277,6 +292,14 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err
|
|||
if err != nil {
|
||||
return KeyError{obj, err}
|
||||
}
|
||||
|
||||
// If object is supposed to be deleted (last event is Deleted),
|
||||
// then we should ignore Sync events, because it would result in
|
||||
// recreation of this object.
|
||||
if actionType == Sync && f.willObjectBeDeletedLocked(id) {
|
||||
return nil
|
||||
}
|
||||
|
||||
newDeltas := append(f.items[id], Delta{actionType, obj})
|
||||
newDeltas = dedupDeltas(newDeltas)
|
||||
if f.deltaCompressor != nil {
|
||||
|
|
|
@ -28,18 +28,22 @@ func testPop(f *DeltaFIFO) testFifoObject {
|
|||
}
|
||||
|
||||
// keyLookupFunc adapts a raw function to be a KeyLookup.
|
||||
type keyLookupFunc func() []string
|
||||
type keyLookupFunc func() []testFifoObject
|
||||
|
||||
// ListKeys just calls kl.
|
||||
func (kl keyLookupFunc) ListKeys() []string {
|
||||
return kl()
|
||||
result := []string{}
|
||||
for _, fifoObj := range kl() {
|
||||
result = append(result, fifoObj.name)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// GetByKey returns the key if it exists in the list returned by kl.
|
||||
func (kl keyLookupFunc) GetByKey(key string) (interface{}, bool, error) {
|
||||
for _, v := range kl() {
|
||||
if v == key {
|
||||
return key, true, nil
|
||||
if v.name == key {
|
||||
return v, true, nil
|
||||
}
|
||||
}
|
||||
return nil, false, nil
|
||||
|
@ -173,8 +177,8 @@ func TestDeltaFIFO_enqueueingWithLister(t *testing.T) {
|
|||
f := NewDeltaFIFO(
|
||||
testFifoObjectKeyFunc,
|
||||
nil,
|
||||
keyLookupFunc(func() []string {
|
||||
return []string{"foo", "bar", "baz"}
|
||||
keyLookupFunc(func() []testFifoObject {
|
||||
return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)}
|
||||
}),
|
||||
)
|
||||
f.Add(mkFifoObj("foo", 10))
|
||||
|
@ -220,12 +224,52 @@ func TestDeltaFIFO_addReplace(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestDeltaFIFO_ResyncNonExisting(t *testing.T) {
|
||||
f := NewDeltaFIFO(
|
||||
testFifoObjectKeyFunc,
|
||||
nil,
|
||||
keyLookupFunc(func() []testFifoObject {
|
||||
return []testFifoObject{mkFifoObj("foo", 5)}
|
||||
}),
|
||||
)
|
||||
f.Delete(mkFifoObj("foo", 10))
|
||||
f.Resync()
|
||||
|
||||
deltas := f.items["foo"]
|
||||
if len(deltas) != 1 {
|
||||
t.Fatalf("unexpected deltas length: %v", deltas)
|
||||
}
|
||||
if deltas[0].Type != Deleted {
|
||||
t.Errorf("unexpected delta: %v", deltas[0])
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeltaFIFO_DeleteExistingNonPropagated(t *testing.T) {
|
||||
f := NewDeltaFIFO(
|
||||
testFifoObjectKeyFunc,
|
||||
nil,
|
||||
keyLookupFunc(func() []testFifoObject {
|
||||
return []testFifoObject{}
|
||||
}),
|
||||
)
|
||||
f.Add(mkFifoObj("foo", 5))
|
||||
f.Delete(mkFifoObj("foo", 6))
|
||||
|
||||
deltas := f.items["foo"]
|
||||
if len(deltas) != 2 {
|
||||
t.Fatalf("unexpected deltas length: %v", deltas)
|
||||
}
|
||||
if deltas[len(deltas)-1].Type != Deleted {
|
||||
t.Errorf("unexpected delta: %v", deltas[len(deltas)-1])
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) {
|
||||
f := NewDeltaFIFO(
|
||||
testFifoObjectKeyFunc,
|
||||
nil,
|
||||
keyLookupFunc(func() []string {
|
||||
return []string{"foo", "bar", "baz"}
|
||||
keyLookupFunc(func() []testFifoObject {
|
||||
return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)}
|
||||
}),
|
||||
)
|
||||
f.Delete(mkFifoObj("baz", 10))
|
||||
|
@ -236,7 +280,7 @@ func TestDeltaFIFO_ReplaceMakesDeletions(t *testing.T) {
|
|||
{{Sync, mkFifoObj("foo", 5)}},
|
||||
// Since "bar" didn't have a delete event and wasn't in the Replace list
|
||||
// it should get a tombstone key with the right Obj.
|
||||
{{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: "bar"}}},
|
||||
{{Deleted, DeletedFinalStateUnknown{Key: "bar", Obj: mkFifoObj("bar", 6)}}},
|
||||
}
|
||||
|
||||
for _, expected := range expectedList {
|
||||
|
|
|
@ -45,6 +45,8 @@ type Queue interface {
|
|||
}
|
||||
|
||||
// Helper function for popping from Queue.
|
||||
// WARNING: Do NOT use this function in non-test code to avoid races
|
||||
// unless you really really really really know what you are doing.
|
||||
func Pop(queue Queue) interface{} {
|
||||
var result interface{}
|
||||
queue.Pop(func(obj interface{}) error {
|
||||
|
|
Loading…
Reference in New Issue