Push the full channel logic into the implementation of the broadcaster

in watch/mux.go rather than being in the client event recording code.
pull/6/head
Alex Robinson 2015-01-13 01:40:17 +00:00
parent 702a6f96b4
commit be6b1cf0e2
5 changed files with 64 additions and 59 deletions

View File

@ -32,10 +32,7 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
) )
const ( const maxTriesPerEvent = 10
maxQueuedEvents = 1000
maxTriesPerEvent = 10
)
var ( var (
minSleep = float64(1 * time.Second) minSleep = float64(1 * time.Second)
@ -56,47 +53,26 @@ type EventRecorder interface {
// or used to stop recording, if desired. // or used to stop recording, if desired.
// TODO: make me an object with parameterizable queue length and retry interval // TODO: make me an object with parameterizable queue length and retry interval
func StartRecording(recorder EventRecorder, source api.EventSource) watch.Interface { func StartRecording(recorder EventRecorder, source api.EventSource) watch.Interface {
// Set up our own personal buffer of events so that we can clear out GetEvents'
// broadcast channel as quickly as possible to avoid causing the relatively more
// important event-producing goroutines from blocking while trying to insert events.
eventQueue := make(chan *api.Event, maxQueuedEvents)
// Run a function in the background that grabs events off the queue and tries
// to record them, retrying as appropriate to try to avoid dropping any.
go func() {
defer util.HandleCrash()
for event := range eventQueue {
tries := 0
for {
if recordEvent(recorder, event) {
break
}
tries++
if tries >= maxTriesPerEvent {
glog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
break
}
sleepDuration := time.Duration(
math.Min(maxSleep, minSleep*math.Pow(backoffExp, float64(tries-1))))
time.Sleep(wait.Jitter(sleepDuration, 0.5))
}
}
}()
// Finally, kick off the watcher that takes events from the channel and puts them
// onto the queue.
return GetEvents(func(event *api.Event) { return GetEvents(func(event *api.Event) {
// Make a copy before modification, because there could be multiple listeners. // Make a copy before modification, because there could be multiple listeners.
// Events are safe to copy like this. // Events are safe to copy like this.
eventCopy := *event eventCopy := *event
event = &eventCopy event = &eventCopy
event.Source = source event.Source = source
// Drop new events rather than old ones because the old ones may contain
// some information explaining why everything is so backed up. tries := 0
if len(eventQueue) == maxQueuedEvents { for {
glog.Errorf("Unable to write event '%#v' (event buffer full!)", event) if recordEvent(recorder, event) {
} else { break
eventQueue <- event }
tries++
if tries >= maxTriesPerEvent {
glog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
break
}
sleepDuration := time.Duration(
math.Min(maxSleep, minSleep*math.Pow(backoffExp, float64(tries-1))))
time.Sleep(wait.Jitter(sleepDuration, 0.5))
} }
}) })
} }
@ -163,9 +139,9 @@ func GetEvents(f func(*api.Event)) watch.Interface {
return w return w
} }
const queueLen = 1000 const maxQueuedEvents = 1000
var events = watch.NewBroadcaster(queueLen) var events = watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull)
// Event constructs an event from the given information and puts it in the queue for sending. // Event constructs an event from the given information and puts it in the queue for sending.
// 'object' is the object this event is about. Event will make a reference-- or you may also // 'object' is the object this event is about. Event will make a reference-- or you may also

View File

@ -39,7 +39,7 @@ type GenericRegistry struct {
func NewGeneric(list runtime.Object) *GenericRegistry { func NewGeneric(list runtime.Object) *GenericRegistry {
return &GenericRegistry{ return &GenericRegistry{
ObjectList: list, ObjectList: list,
Broadcaster: watch.NewBroadcaster(0), Broadcaster: watch.NewBroadcaster(0, watch.WaitIfChannelFull),
} }
} }

View File

@ -36,7 +36,7 @@ type PodRegistry struct {
func NewPodRegistry(pods *api.PodList) *PodRegistry { func NewPodRegistry(pods *api.PodList) *PodRegistry {
return &PodRegistry{ return &PodRegistry{
Pods: pods, Pods: pods,
broadcaster: watch.NewBroadcaster(0), broadcaster: watch.NewBroadcaster(0, watch.WaitIfChannelFull),
} }
} }

View File

@ -22,6 +22,15 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
) )
// FullChannelBehavior controls how the Broadcaster reacts if a watcher's watch
// channel is full.
type FullChannelBehavior int
const (
WaitIfChannelFull = iota
DropIfChannelFull = iota
)
// Broadcaster distributes event notifications among any number of watchers. Every event // Broadcaster distributes event notifications among any number of watchers. Every event
// is delivered to every watcher. // is delivered to every watcher.
type Broadcaster struct { type Broadcaster struct {
@ -31,17 +40,27 @@ type Broadcaster struct {
nextWatcher int64 nextWatcher int64
incoming chan Event incoming chan Event
// How large to make watcher's channel.
watchQueueLength int
// If one of the watch channels is full, don't wait for it to become empty.
// Instead just deliver it to the watchers that do have space in their
// channels and move on to the next event.
// It's more fair to do this on a per-watcher basis than to do it on the
// "incoming" channel, which would allow one slow watcher to prevent all
// other watchers from getting new events.
fullChannelBehavior FullChannelBehavior
} }
// NewBroadcaster creates a new Broadcaster. queueLength is the maximum number of events to queue. // NewBroadcaster creates a new Broadcaster. queueLength is the maximum number of events to queue per watcher.
// When queueLength is 0, Action will block until any prior event has been // It is guaranteed that events will be distibuted in the order in which they ocur,
// completely distributed. It is guaranteed that events will be distibuted in the // but the order in which a single event is distributed among all of the watchers is unspecified.
// order in which they ocurr, but the order in which a single event is distributed func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster {
// among all of the watchers is unspecified.
func NewBroadcaster(queueLength int) *Broadcaster {
m := &Broadcaster{ m := &Broadcaster{
watchers: map[int64]*broadcasterWatcher{}, watchers: map[int64]*broadcasterWatcher{},
incoming: make(chan Event, queueLength), incoming: make(chan Event),
watchQueueLength: queueLength,
fullChannelBehavior: fullChannelBehavior,
} }
go m.loop() go m.loop()
return m return m
@ -56,7 +75,7 @@ func (m *Broadcaster) Watch() Interface {
id := m.nextWatcher id := m.nextWatcher
m.nextWatcher++ m.nextWatcher++
w := &broadcasterWatcher{ w := &broadcasterWatcher{
result: make(chan Event), result: make(chan Event, m.watchQueueLength),
stopped: make(chan struct{}), stopped: make(chan struct{}),
id: id, id: id,
m: m, m: m,
@ -119,10 +138,20 @@ func (m *Broadcaster) loop() {
func (m *Broadcaster) distribute(event Event) { func (m *Broadcaster) distribute(event Event) {
m.lock.Lock() m.lock.Lock()
defer m.lock.Unlock() defer m.lock.Unlock()
for _, w := range m.watchers { if m.fullChannelBehavior == DropIfChannelFull {
select { for _, w := range m.watchers {
case w.result <- event: select {
case <-w.stopped: case w.result <- event:
case <-w.stopped:
default: // Don't block if the event can't be queued.
}
}
} else {
for _, w := range m.watchers {
select {
case w.result <- event:
case <-w.stopped:
}
} }
} }
} }

View File

@ -39,7 +39,7 @@ func TestBroadcaster(t *testing.T) {
} }
// The broadcaster we're testing // The broadcaster we're testing
m := NewBroadcaster(0) m := NewBroadcaster(0, WaitIfChannelFull)
// Add a bunch of watchers // Add a bunch of watchers
const testWatchers = 2 const testWatchers = 2
@ -77,7 +77,7 @@ func TestBroadcaster(t *testing.T) {
} }
func TestBroadcasterWatcherClose(t *testing.T) { func TestBroadcasterWatcherClose(t *testing.T) {
m := NewBroadcaster(0) m := NewBroadcaster(0, WaitIfChannelFull)
w := m.Watch() w := m.Watch()
w2 := m.Watch() w2 := m.Watch()
w.Stop() w.Stop()
@ -95,7 +95,7 @@ func TestBroadcasterWatcherClose(t *testing.T) {
func TestBroadcasterWatcherStopDeadlock(t *testing.T) { func TestBroadcasterWatcherStopDeadlock(t *testing.T) {
done := make(chan bool) done := make(chan bool)
m := NewBroadcaster(0) m := NewBroadcaster(0, WaitIfChannelFull)
go func(w0, w1 Interface) { go func(w0, w1 Interface) {
// We know Broadcaster is in the distribute loop once one watcher receives // We know Broadcaster is in the distribute loop once one watcher receives
// an event. Stop the other watcher while distribute is trying to // an event. Stop the other watcher while distribute is trying to