mirror of https://github.com/k3s-io/k3s
fix delta fifo & various fakes for go1.5.1
parent
b9c7cf43b2
commit
4bdb1259a7
|
@ -154,10 +154,31 @@ func (f *DeltaFIFO) Update(obj interface{}) error {
|
|||
return f.queueActionLocked(Updated, obj)
|
||||
}
|
||||
|
||||
// Delete is just like Add, but makes an Deleted Delta.
|
||||
// Delete is just like Add, but makes an Deleted Delta. If the item does not
|
||||
// already exist, it will be ignored. (It may have already been deleted by a
|
||||
// Replace (re-list), for example.
|
||||
func (f *DeltaFIFO) Delete(obj interface{}) error {
|
||||
id, err := f.KeyOf(obj)
|
||||
if err != nil {
|
||||
return KeyError{obj, err}
|
||||
}
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
if f.knownObjectKeys == nil {
|
||||
if _, exists := f.items[id]; !exists {
|
||||
// Presumably, this was deleted when a relist happened.
|
||||
// Don't provide a second report of the same deletion.
|
||||
return nil
|
||||
}
|
||||
} else if keyGetter, ok := f.knownObjectKeys.(KeyGetter); ok {
|
||||
if _, exists, err := keyGetter.GetByKey(id); err == nil && !exists {
|
||||
// Presumably, this was deleted when a relist happened.
|
||||
// Don't provide a second report of the same deletion.
|
||||
// This may be racy-- we aren't properly locked with knownObjectKeys.
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return f.queueActionLocked(Deleted, obj)
|
||||
}
|
||||
|
||||
|
@ -191,6 +212,43 @@ func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// re-listing and watching can deliver the same update multiple times in any
|
||||
// order. This will combine the most recent two deltas if they are the same.
|
||||
func dedupDeltas(deltas Deltas) Deltas {
|
||||
n := len(deltas)
|
||||
if n < 2 {
|
||||
return deltas
|
||||
}
|
||||
a := &deltas[n-1]
|
||||
b := &deltas[n-2]
|
||||
if out := isDup(a, b); out != nil {
|
||||
d := append(Deltas{}, deltas[:n-2]...)
|
||||
return append(d, *out)
|
||||
}
|
||||
return deltas
|
||||
}
|
||||
|
||||
// If a & b represent the same event, returns the delta that ought to be kept. Otherwise, nil.
|
||||
func isDup(a, b *Delta) *Delta {
|
||||
if out := isDeletionDup(a, b); out != nil {
|
||||
return out
|
||||
}
|
||||
// TODO: Detect other duplicate situations? Are there any?
|
||||
return nil
|
||||
}
|
||||
|
||||
// keep the one with the most information if both are deletions.
|
||||
func isDeletionDup(a, b *Delta) *Delta {
|
||||
if b.Type != Deleted || a.Type != Deleted {
|
||||
return nil
|
||||
}
|
||||
// Do more sophisticated checks, or is this sufficient?
|
||||
if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
|
||||
return a
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
// queueActionLocked appends to the delta list for the object, calling
|
||||
// f.deltaCompressor if needed
|
||||
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
|
||||
|
@ -199,6 +257,7 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err
|
|||
return KeyError{obj, err}
|
||||
}
|
||||
newDeltas := append(f.items[id], Delta{actionType, obj})
|
||||
newDeltas = dedupDeltas(newDeltas)
|
||||
if f.deltaCompressor != nil {
|
||||
newDeltas = f.deltaCompressor.Compress(newDeltas)
|
||||
}
|
||||
|
@ -310,39 +369,42 @@ func (f *DeltaFIFO) Pop() interface{} {
|
|||
func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
for _, item := range list {
|
||||
if err := f.queueActionLocked(Sync, item); err != nil {
|
||||
return fmt.Errorf("couldn't enqueue object: %v", err)
|
||||
}
|
||||
}
|
||||
if f.knownObjectKeys == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
keySet := make(sets.String, len(list))
|
||||
keys := make(sets.String, len(list))
|
||||
for _, item := range list {
|
||||
key, err := f.KeyOf(item)
|
||||
if err != nil {
|
||||
return KeyError{item, err}
|
||||
}
|
||||
keySet.Insert(key)
|
||||
keys.Insert(key)
|
||||
if err := f.queueActionLocked(Sync, item); err != nil {
|
||||
return fmt.Errorf("couldn't enqueue object: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if f.knownObjectKeys == nil {
|
||||
// Do deletion detection against our own list.
|
||||
for k, oldItem := range f.items {
|
||||
if keys.Has(k) {
|
||||
continue
|
||||
}
|
||||
var deletedObj interface{}
|
||||
if n := oldItem.Newest(); n != nil {
|
||||
deletedObj = n.Object
|
||||
}
|
||||
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Detect deletions not already in the queue.
|
||||
knownKeys := f.knownObjectKeys.ListKeys()
|
||||
for _, k := range knownKeys {
|
||||
if _, exists := keySet[k]; exists {
|
||||
if keys.Has(k) {
|
||||
continue
|
||||
}
|
||||
|
||||
// This key isn't in the complete set we got, so it must have been deleted.
|
||||
if d, exists := f.items[k]; exists {
|
||||
// Don't issue a delete delta if we have one enqueued as the most
|
||||
// recent delta.
|
||||
if d.Newest().Type == Deleted {
|
||||
continue
|
||||
}
|
||||
}
|
||||
var deletedObj interface{}
|
||||
if keyGetter, ok := f.knownObjectKeys.(KeyGetter); ok {
|
||||
var exists bool
|
||||
|
@ -364,6 +426,12 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// A KeyListerGetter is anything that knows how to list its keys and look up by key.
|
||||
type KeyListerGetter interface {
|
||||
KeyLister
|
||||
KeyGetter
|
||||
}
|
||||
|
||||
// A KeyLister is anything that knows how to list its keys.
|
||||
type KeyLister interface {
|
||||
ListKeys() []string
|
||||
|
|
|
@ -98,13 +98,13 @@ func TestDeltaFIFO_compressorWorks(t *testing.T) {
|
|||
f.Add(mkFifoObj("foo", 10))
|
||||
f.Update(mkFifoObj("foo", 12))
|
||||
f.Replace([]interface{}{mkFifoObj("foo", 20)}, "0")
|
||||
f.Delete(mkFifoObj("foo", 15))
|
||||
f.Delete(mkFifoObj("foo", 18)) // flush the last one out
|
||||
f.Delete(mkFifoObj("foo", 22))
|
||||
f.Add(mkFifoObj("foo", 25)) // flush the last one out
|
||||
expect := []DeltaType{Added, Updated, Sync, Deleted}
|
||||
if e, a := expect, oldestTypes; !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("Expected %#v, got %#v", e, a)
|
||||
}
|
||||
if e, a := (Deltas{{Deleted, mkFifoObj("foo", 18)}}), f.Pop().(Deltas); !reflect.DeepEqual(e, a) {
|
||||
if e, a := (Deltas{{Added, mkFifoObj("foo", 25)}}), f.Pop().(Deltas); !reflect.DeepEqual(e, a) {
|
||||
t.Fatalf("Expected %#v, got %#v", e, a)
|
||||
}
|
||||
|
||||
|
@ -126,7 +126,10 @@ func TestDeltaFIFO_addUpdate(t *testing.T) {
|
|||
got := make(chan testFifoObject, 2)
|
||||
go func() {
|
||||
for {
|
||||
got <- testPop(f)
|
||||
obj := f.Pop().(Deltas).Newest().Object.(testFifoObject)
|
||||
t.Logf("got a thing %#v", obj)
|
||||
t.Logf("D len: %v", len(f.queue))
|
||||
got <- obj
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -145,10 +148,39 @@ func TestDeltaFIFO_addUpdate(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestDeltaFIFO_enqueueing(t *testing.T) {
|
||||
func TestDeltaFIFO_enqueueingNoLister(t *testing.T) {
|
||||
f := NewDeltaFIFO(testFifoObjectKeyFunc, nil, nil)
|
||||
f.Add(mkFifoObj("foo", 10))
|
||||
f.Update(mkFifoObj("bar", 15))
|
||||
f.Add(mkFifoObj("qux", 17))
|
||||
f.Delete(mkFifoObj("qux", 18))
|
||||
|
||||
// This delete does not enqueue anything because baz doesn't exist.
|
||||
f.Delete(mkFifoObj("baz", 20))
|
||||
|
||||
expectList := []int{10, 15, 18}
|
||||
for _, expect := range expectList {
|
||||
if e, a := expect, testPop(f).val; e != a {
|
||||
t.Errorf("Didn't get updated value (%v), got %v", e, a)
|
||||
}
|
||||
}
|
||||
if e, a := 0, len(f.items); e != a {
|
||||
t.Errorf("queue unexpectedly not empty: %v != %v\n%#v", e, a, f.items)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeltaFIFO_enqueueingWithLister(t *testing.T) {
|
||||
f := NewDeltaFIFO(
|
||||
testFifoObjectKeyFunc,
|
||||
nil,
|
||||
keyLookupFunc(func() []string {
|
||||
return []string{"foo", "bar", "baz"}
|
||||
}),
|
||||
)
|
||||
f.Add(mkFifoObj("foo", 10))
|
||||
f.Update(mkFifoObj("bar", 15))
|
||||
|
||||
// This delete does enqueue the deletion, because "baz" is in the key lister.
|
||||
f.Delete(mkFifoObj("baz", 20))
|
||||
|
||||
expectList := []int{10, 15, 20}
|
||||
|
|
|
@ -311,30 +311,6 @@ func TestUpdate(t *testing.T) {
|
|||
pair{FROM, FROM}: true,
|
||||
}
|
||||
|
||||
var testDoneWG sync.WaitGroup
|
||||
|
||||
// Make a controller that deletes things once it observes an update.
|
||||
// It calls Done() on the wait group on deletions so we can tell when
|
||||
// everything we've added has been deleted.
|
||||
_, controller := framework.NewInformer(
|
||||
source,
|
||||
&api.Pod{},
|
||||
time.Millisecond*1,
|
||||
framework.ResourceEventHandlerFuncs{
|
||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||
o, n := oldObj.(*api.Pod), newObj.(*api.Pod)
|
||||
from, to := o.Labels["check"], n.Labels["check"]
|
||||
if !allowedTransitions[pair{from, to}] {
|
||||
t.Errorf("observed transition %q -> %q for %v", from, to, n.Name)
|
||||
}
|
||||
source.Delete(n)
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
testDoneWG.Done()
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
pod := func(name, check string) *api.Pod {
|
||||
return &api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
|
@ -368,8 +344,32 @@ func TestUpdate(t *testing.T) {
|
|||
}
|
||||
|
||||
const threads = 3
|
||||
|
||||
var testDoneWG sync.WaitGroup
|
||||
testDoneWG.Add(threads * len(tests))
|
||||
|
||||
// Make a controller that deletes things once it observes an update.
|
||||
// It calls Done() on the wait group on deletions so we can tell when
|
||||
// everything we've added has been deleted.
|
||||
_, controller := framework.NewInformer(
|
||||
source,
|
||||
&api.Pod{},
|
||||
time.Millisecond*1,
|
||||
framework.ResourceEventHandlerFuncs{
|
||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||
o, n := oldObj.(*api.Pod), newObj.(*api.Pod)
|
||||
from, to := o.Labels["check"], n.Labels["check"]
|
||||
if !allowedTransitions[pair{from, to}] {
|
||||
t.Errorf("observed transition %q -> %q for %v", from, to, n.Name)
|
||||
}
|
||||
source.Delete(n)
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
testDoneWG.Done()
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
// Run the controller and run it until we close stop.
|
||||
// Once Run() is called, calls to testDoneWG.Done() might start, so
|
||||
// all testDoneWG.Add() calls must happen before this point
|
||||
|
|
|
@ -103,7 +103,7 @@ func (f *FakeControllerSource) Change(e watch.Event, watchProbability float64) {
|
|||
panic(err) // this is test code only
|
||||
}
|
||||
|
||||
resourceVersion := len(f.changes)
|
||||
resourceVersion := len(f.changes) + 1
|
||||
objMeta.ResourceVersion = strconv.Itoa(resourceVersion)
|
||||
f.changes = append(f.changes, e)
|
||||
key := f.key(objMeta)
|
||||
|
@ -127,7 +127,7 @@ func (f *FakeControllerSource) List() (runtime.Object, error) {
|
|||
for _, obj := range f.items {
|
||||
// Must make a copy to allow clients to modify the object.
|
||||
// Otherwise, if they make a change and write it back, they
|
||||
// will inadvertently change the our canonical copy (in
|
||||
// will inadvertently change our canonical copy (in
|
||||
// addition to racing with other clients).
|
||||
objCopy, err := api.Scheme.DeepCopy(obj)
|
||||
if err != nil {
|
||||
|
@ -157,7 +157,6 @@ func (f *FakeControllerSource) Watch(resourceVersion string) (watch.Interface, e
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rc++ // Don't re-send them a change they already have.
|
||||
if rc < len(f.changes) {
|
||||
changes := []watch.Event{}
|
||||
for _, c := range f.changes[rc:] {
|
||||
|
@ -178,3 +177,11 @@ func (f *FakeControllerSource) Watch(resourceVersion string) (watch.Interface, e
|
|||
}
|
||||
return f.broadcaster.Watch(), nil
|
||||
}
|
||||
|
||||
// Shutdown closes the underlying broadcaster, waiting for events to be
|
||||
// delivered. It's an error to call any method after calling shutdown. This is
|
||||
// enforced by Shutdown() leaving f locked.
|
||||
func (f *FakeControllerSource) Shutdown() {
|
||||
f.lock.Lock() // Purposely no unlock.
|
||||
f.broadcaster.Shutdown()
|
||||
}
|
||||
|
|
|
@ -17,11 +17,36 @@ limitations under the License.
|
|||
package framework
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
// ensure the watch delivers the requested and only the requested items.
|
||||
func consume(t *testing.T, w watch.Interface, rvs []string, done *sync.WaitGroup) {
|
||||
defer done.Done()
|
||||
for _, rv := range rvs {
|
||||
got, ok := <-w.ResultChan()
|
||||
if !ok {
|
||||
t.Errorf("%#v: unexpected channel close, wanted %v", rvs, rv)
|
||||
return
|
||||
}
|
||||
gotRV := got.Object.(*api.Pod).ObjectMeta.ResourceVersion
|
||||
if e, a := rv, gotRV; e != a {
|
||||
t.Errorf("wanted %v, got %v", e, a)
|
||||
} else {
|
||||
t.Logf("Got %v as expected", gotRV)
|
||||
}
|
||||
}
|
||||
// We should not get anything else.
|
||||
got, open := <-w.ResultChan()
|
||||
if open {
|
||||
t.Errorf("%#v: unwanted object %#v", rvs, got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRCNumber(t *testing.T) {
|
||||
pod := func(name string) *api.Pod {
|
||||
return &api.Pod{
|
||||
|
@ -31,6 +56,9 @@ func TestRCNumber(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(3)
|
||||
|
||||
source := NewFakeControllerSource()
|
||||
source.Add(pod("foo"))
|
||||
source.Modify(pod("foo"))
|
||||
|
@ -40,9 +68,27 @@ func TestRCNumber(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
defer w.Stop()
|
||||
got := <-w.ResultChan()
|
||||
if e, a := "2", got.Object.(*api.Pod).ObjectMeta.ResourceVersion; e != a {
|
||||
go consume(t, w, []string{"2", "3"}, wg)
|
||||
|
||||
list, err := source.List()
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
if e, a := "3", list.(*api.List).ResourceVersion; e != a {
|
||||
t.Errorf("wanted %v, got %v", e, a)
|
||||
}
|
||||
|
||||
w2, err := source.Watch("2")
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
go consume(t, w2, []string{"3"}, wg)
|
||||
|
||||
w3, err := source.Watch("3")
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
go consume(t, w3, []string{}, wg)
|
||||
source.Shutdown()
|
||||
wg.Wait()
|
||||
}
|
||||
|
|
|
@ -39,6 +39,8 @@ const incomingQueueLength = 25
|
|||
// Broadcaster distributes event notifications among any number of watchers. Every event
|
||||
// is delivered to every watcher.
|
||||
type Broadcaster struct {
|
||||
// TODO: see if this lock is needed now that new watchers go through
|
||||
// the incoming channel.
|
||||
lock sync.Mutex
|
||||
|
||||
watchers map[int64]*broadcasterWatcher
|
||||
|
@ -73,21 +75,48 @@ func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *B
|
|||
return m
|
||||
}
|
||||
|
||||
const internalRunFunctionMarker = "internal-do-function"
|
||||
|
||||
// a function type we can shoehorn into the queue.
|
||||
type functionFakeRuntimeObject func()
|
||||
|
||||
func (functionFakeRuntimeObject) IsAnAPIObject() {}
|
||||
|
||||
// Execute f, blocking the incoming queue (and waiting for it to drain first).
|
||||
// The purpose of this terrible hack is so that watchers added after an event
|
||||
// won't ever see that event, and will always see any event after they are
|
||||
// added.
|
||||
func (b *Broadcaster) blockQueue(f func()) {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
b.incoming <- Event{
|
||||
Type: internalRunFunctionMarker,
|
||||
Object: functionFakeRuntimeObject(func() {
|
||||
defer wg.Done()
|
||||
f()
|
||||
}),
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// Watch adds a new watcher to the list and returns an Interface for it.
|
||||
// Note: new watchers will only receive new events. They won't get an entire history
|
||||
// of previous events.
|
||||
func (m *Broadcaster) Watch() Interface {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
id := m.nextWatcher
|
||||
m.nextWatcher++
|
||||
w := &broadcasterWatcher{
|
||||
result: make(chan Event, m.watchQueueLength),
|
||||
stopped: make(chan struct{}),
|
||||
id: id,
|
||||
m: m,
|
||||
}
|
||||
m.watchers[id] = w
|
||||
var w *broadcasterWatcher
|
||||
m.blockQueue(func() {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
id := m.nextWatcher
|
||||
m.nextWatcher++
|
||||
w = &broadcasterWatcher{
|
||||
result: make(chan Event, m.watchQueueLength),
|
||||
stopped: make(chan struct{}),
|
||||
id: id,
|
||||
m: m,
|
||||
}
|
||||
m.watchers[id] = w
|
||||
})
|
||||
return w
|
||||
}
|
||||
|
||||
|
@ -96,24 +125,27 @@ func (m *Broadcaster) Watch() Interface {
|
|||
// The returned watch will have a queue length that is at least large enough to accommodate
|
||||
// all of the items in queuedEvents.
|
||||
func (m *Broadcaster) WatchWithPrefix(queuedEvents []Event) Interface {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
id := m.nextWatcher
|
||||
m.nextWatcher++
|
||||
length := m.watchQueueLength
|
||||
if n := len(queuedEvents) + 1; n > length {
|
||||
length = n
|
||||
}
|
||||
w := &broadcasterWatcher{
|
||||
result: make(chan Event, length),
|
||||
stopped: make(chan struct{}),
|
||||
id: id,
|
||||
m: m,
|
||||
}
|
||||
m.watchers[id] = w
|
||||
for _, e := range queuedEvents {
|
||||
w.result <- e
|
||||
}
|
||||
var w *broadcasterWatcher
|
||||
m.blockQueue(func() {
|
||||
m.lock.Lock()
|
||||
defer m.lock.Unlock()
|
||||
id := m.nextWatcher
|
||||
m.nextWatcher++
|
||||
length := m.watchQueueLength
|
||||
if n := len(queuedEvents) + 1; n > length {
|
||||
length = n
|
||||
}
|
||||
w = &broadcasterWatcher{
|
||||
result: make(chan Event, length),
|
||||
stopped: make(chan struct{}),
|
||||
id: id,
|
||||
m: m,
|
||||
}
|
||||
m.watchers[id] = w
|
||||
for _, e := range queuedEvents {
|
||||
w.result <- e
|
||||
}
|
||||
})
|
||||
return w
|
||||
}
|
||||
|
||||
|
@ -167,6 +199,10 @@ func (m *Broadcaster) loop() {
|
|||
if !ok {
|
||||
break
|
||||
}
|
||||
if event.Type == internalRunFunctionMarker {
|
||||
event.Object.(functionFakeRuntimeObject)()
|
||||
continue
|
||||
}
|
||||
m.distribute(event)
|
||||
}
|
||||
m.closeAll()
|
||||
|
|
Loading…
Reference in New Issue