From b62fa1d72855ef1eb02ec2c908b16b7477feb3b1 Mon Sep 17 00:00:00 2001 From: Derek Carr Date: Thu, 15 Jun 2017 15:58:06 -0400 Subject: [PATCH] Add client side event rate limiting --- .../src/k8s.io/client-go/tools/record/BUILD | 1 + .../client-go/tools/record/event_test.go | 21 ++-- .../client-go/tools/record/events_cache.go | 98 ++++++++++++++++++- .../tools/record/events_cache_test.go | 26 ++++- 4 files changed, 131 insertions(+), 15 deletions(-) diff --git a/staging/src/k8s.io/client-go/tools/record/BUILD b/staging/src/k8s.io/client-go/tools/record/BUILD index f74f58c594..6d69699799 100644 --- a/staging/src/k8s.io/client-go/tools/record/BUILD +++ b/staging/src/k8s.io/client-go/tools/record/BUILD @@ -53,5 +53,6 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/watch:go_default_library", "//vendor/k8s.io/client-go/rest:go_default_library", "//vendor/k8s.io/client-go/tools/reference:go_default_library", + "//vendor/k8s.io/client-go/util/flowcontrol:go_default_library", ], ) diff --git a/staging/src/k8s.io/client-go/tools/record/event_test.go b/staging/src/k8s.io/client-go/tools/record/event_test.go index 33889a9a2e..07b6df2187 100644 --- a/staging/src/k8s.io/client-go/tools/record/event_test.go +++ b/staging/src/k8s.io/client-go/tools/record/event_test.go @@ -412,7 +412,8 @@ func TestWriteEventError(t *testing.T) { }, } - eventCorrelator := NewEventCorrelator(clock.RealClock{}) + clock := clock.IntervalClock{Time: time.Now(), Duration: time.Second} + eventCorrelator := NewEventCorrelator(&clock) randGen := rand.New(rand.NewSource(time.Now().UnixNano())) for caseName, ent := range table { @@ -435,7 +436,8 @@ func TestWriteEventError(t *testing.T) { } func TestUpdateExpiredEvent(t *testing.T) { - eventCorrelator := NewEventCorrelator(clock.RealClock{}) + clock := clock.IntervalClock{Time: time.Now(), Duration: time.Second} + eventCorrelator := NewEventCorrelator(&clock) randGen := rand.New(rand.NewSource(time.Now().UnixNano())) var createdEvent *v1.Event @@ -497,14 +499,15 @@ func TestLotsOfEvents(t *testing.T) { loggerCalled <- struct{}{} }) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "eventTest"}) - ref := &v1.ObjectReference{ - Kind: "Pod", - Name: "foo", - Namespace: "baz", - UID: "bar", - APIVersion: "version", - } for i := 0; i < maxQueuedEvents; i++ { + // we want a unique object to stop spam filtering + ref := &v1.ObjectReference{ + Kind: "Pod", + Name: fmt.Sprintf("foo-%v", i), + Namespace: "baz", + UID: "bar", + APIVersion: "version", + } // we need to vary the reason to prevent aggregation go recorder.Eventf(ref, v1.EventTypeNormal, "Reason-"+string(i), strconv.Itoa(i)) } diff --git a/staging/src/k8s.io/client-go/tools/record/events_cache.go b/staging/src/k8s.io/client-go/tools/record/events_cache.go index 01704b6c0d..6ac767c9f0 100644 --- a/staging/src/k8s.io/client-go/tools/record/events_cache.go +++ b/staging/src/k8s.io/client-go/tools/record/events_cache.go @@ -30,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/strategicpatch" + "k8s.io/client-go/util/flowcontrol" ) const ( @@ -39,6 +40,13 @@ const ( // more than 10 times in a 10 minute period, aggregate the event defaultAggregateMaxEvents = 10 defaultAggregateIntervalInSeconds = 600 + + // by default, allow a source to send 25 events about an object + // but control the refill rate to 1 new event every 5 minutes + // this helps control the long-tail of events for things that are always + // unhealthy + defaultSpamBurst = 25 + defaultSpamQPS = 1. / 300. ) // getEventKey builds unique event key based on source, involvedObject, reason, message @@ -59,6 +67,20 @@ func getEventKey(event *v1.Event) string { "") } +// getSpamKey builds unique event key based on source, involvedObject +func getSpamKey(event *v1.Event) string { + return strings.Join([]string{ + event.Source.Component, + event.Source.Host, + event.InvolvedObject.Kind, + event.InvolvedObject.Namespace, + event.InvolvedObject.Name, + string(event.InvolvedObject.UID), + event.InvolvedObject.APIVersion, + }, + "") +} + // EventFilterFunc is a function that returns true if the event should be skipped type EventFilterFunc func(event *v1.Event) bool @@ -67,6 +89,69 @@ func DefaultEventFilterFunc(event *v1.Event) bool { return false } +// EventSourceObjectSpamFilter is responsible for throttling +// the amount of events a source and object can produce. +type EventSourceObjectSpamFilter struct { + sync.RWMutex + + // the cache that manages last synced state + cache *lru.Cache + + // burst is the amount of events we allow per source + object + burst int + + // qps is the refill rate of the token bucket in queries per second + qps float32 + + // clock is used to allow for testing over a time interval + clock clock.Clock +} + +// NewEventSourceObjectSpamFilter allows burst events from a source about an object with the specified qps refill. +func NewEventSourceObjectSpamFilter(lruCacheSize, burst int, qps float32, clock clock.Clock) *EventSourceObjectSpamFilter { + return &EventSourceObjectSpamFilter{ + cache: lru.New(lruCacheSize), + burst: burst, + qps: qps, + clock: clock, + } +} + +// spamRecord holds data used to perform spam filtering decisions. +type spamRecord struct { + // rateLimiter controls the rate of events about this object + rateLimiter flowcontrol.RateLimiter +} + +// Filter controls that a given source+object are not exceeding the allowed rate. +func (f *EventSourceObjectSpamFilter) Filter(event *v1.Event) bool { + var record spamRecord + + // controls our cached information about this event (source+object) + eventKey := getSpamKey(event) + + // do we have a record of similar events in our cache? + f.Lock() + defer f.Unlock() + value, found := f.cache.Get(eventKey) + if found { + record = value.(spamRecord) + } + + // verify we have a rate limiter for this record + if record.rateLimiter == nil { + record.rateLimiter = flowcontrol.NewTokenBucketRateLimiterWithClock(f.qps, f.burst, f.clock) + } + + // ensure we have available rate + filter := !record.rateLimiter.TryAccept() + + // update the cache + f.cache.Add(eventKey, record) + + return filter +} + // EventAggregatorKeyFunc is responsible for grouping events for aggregation // It returns a tuple of the following: // aggregateKey - key the identifies the aggregate group to bucket this event @@ -337,7 +422,6 @@ type EventCorrelateResult struct { // prior to interacting with the API server to record the event. // // The default behavior is as follows: -// * No events are filtered from being recorded // * Aggregation is performed if a similar event is recorded 10 times in a // in a 10 minute rolling interval. A similar event is an event that varies only by // the Event.Message field. Rather than recording the precise event, aggregation @@ -345,10 +429,13 @@ type EventCorrelateResult struct { // the same reason. // * Events are incrementally counted if the exact same event is encountered multiple // times. +// * A source may burst 25 events about an object, but has a refill rate budget +// per object of 1 event every 5 minutes to control long-tail of spam. func NewEventCorrelator(clock clock.Clock) *EventCorrelator { cacheSize := maxLruCacheEntries + spamFilter := NewEventSourceObjectSpamFilter(cacheSize, defaultSpamBurst, defaultSpamQPS, clock) return &EventCorrelator{ - filterFunc: DefaultEventFilterFunc, + filterFunc: spamFilter.Filter, aggregator: NewEventAggregator( cacheSize, EventAggregatorByReasonFunc, @@ -363,11 +450,14 @@ func NewEventCorrelator(clock clock.Clock) *EventCorrelator { // EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateResult, error) { - if c.filterFunc(newEvent) { - return &EventCorrelateResult{Skip: true}, nil + if newEvent == nil { + return nil, fmt.Errorf("event is nil") } aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent) observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey) + if c.filterFunc(observedEvent) { + return &EventCorrelateResult{Skip: true}, nil + } return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err } diff --git a/staging/src/k8s.io/client-go/tools/record/events_cache_test.go b/staging/src/k8s.io/client-go/tools/record/events_cache_test.go index 799d6a77b2..a4fc2a0f61 100644 --- a/staging/src/k8s.io/client-go/tools/record/events_cache_test.go +++ b/staging/src/k8s.io/client-go/tools/record/events_cache_test.go @@ -181,6 +181,7 @@ func TestEventCorrelator(t *testing.T) { newEvent v1.Event expectedEvent v1.Event intervalSeconds int + expectedSkip bool }{ "create-a-single-event": { previousEvents: []v1.Event{}, @@ -198,7 +199,13 @@ func TestEventCorrelator(t *testing.T) { previousEvents: makeEvents(defaultAggregateMaxEvents, duplicateEvent), newEvent: duplicateEvent, expectedEvent: setCount(duplicateEvent, defaultAggregateMaxEvents+1), - intervalSeconds: 5, + intervalSeconds: 30, // larger interval induces aggregation but not spam. + }, + "the-same-event-is-spam-if-happens-too-frequently": { + previousEvents: makeEvents(defaultSpamBurst+1, duplicateEvent), + newEvent: duplicateEvent, + expectedSkip: true, + intervalSeconds: 1, }, "create-many-unique-events": { previousEvents: makeUniqueEvents(30), @@ -245,7 +252,10 @@ func TestEventCorrelator(t *testing.T) { if err != nil { t.Errorf("scenario %v: unexpected error playing back prevEvents %v", testScenario, err) } - correlator.UpdateState(result.Event) + // if we are skipping the event, we can avoid updating state + if !result.Skip { + correlator.UpdateState(result.Event) + } } // update the input to current clock value @@ -257,6 +267,18 @@ func TestEventCorrelator(t *testing.T) { t.Errorf("scenario %v: unexpected error correlating input event %v", testScenario, err) } + // verify we did not get skip from filter function unexpectedly... + if result.Skip != testInput.expectedSkip { + t.Errorf("scenario %v: expected skip %v, but got %v", testScenario, testInput.expectedSkip, result.Skip) + continue + } + + // we wanted to actually skip, so no event is needed to validate + if testInput.expectedSkip { + continue + } + + // validate event _, err = validateEvent(testScenario, result.Event, &testInput.expectedEvent, t) if err != nil { t.Errorf("scenario %v: unexpected error validating result %v", testScenario, err)