Merge pull request #16798 from derekwaynecarr/key_compaction

Auto commit by PR queue bot
pull/6/head
k8s-merge-robot 2015-11-12 02:45:15 -08:00
commit 706d3a77a9
6 changed files with 582 additions and 269 deletions

View File

@ -35,14 +35,23 @@ Documentation for other releases can be found at
This document captures the design of event compression.
## Background
Kubernetes components can get into a state where they generate tons of events which are identical except for the timestamp. For example, when pulling a non-existing image, Kubelet will repeatedly generate `image_not_existing` and `container_is_waiting` events until upstream components correct the image. When this happens, the spam from the repeated events makes the entire event mechanism useless. It also appears to cause memory pressure in etcd (see [#3853](http://issue.k8s.io/3853)).
Kubernetes components can get into a state where they generate tons of events.
The events can be categorized in one of two ways:
1. same - the event is identical to previous events except it varies only on timestamp
2. similar - the event is identical to previous events except it varies on timestamp and message
For example, when pulling a non-existing image, Kubelet will repeatedly generate `image_not_existing` and `container_is_waiting` events until upstream components correct the image. When this happens, the spam from the repeated events makes the entire event mechanism useless. It also appears to cause memory pressure in etcd (see [#3853](http://issue.k8s.io/3853)).
The goal is introduce event counting to increment same events, and event aggregation to collapse similar events.
## Proposal
Each binary that generates events (for example, `kubelet`) should keep track of previously generated events so that it can collapse recurring events into a single event instead of creating a new instance for each new event.
Each binary that generates events (for example, `kubelet`) should keep track of previously generated events so that it can collapse recurring events into a single event instead of creating a new instance for each new event. In addition, if many similar events are
created, events should be aggregated into a single event to reduce spam.
Event compression should be best effort (not guaranteed). Meaning, in the worst case, `n` identical (minus timestamp) events may still result in `n` event entries.
@ -61,6 +70,24 @@ Instead of a single Timestamp, each event object [contains](http://releases.k8s.
Each binary that generates events:
* Maintains a historical record of previously generated events:
* Implemented with ["Least Recently Used Cache"](https://github.com/golang/groupcache/blob/master/lru/lru.go) in [`pkg/client/record/events_cache.go`](../../pkg/client/record/events_cache.go).
* Implemented behind an `EventCorrelator` that manages two subcomponents: `EventAggregator` and `EventLogger`
* The `EventCorrelator` observes all incoming events and lets each subcomponent visit and modify the event in turn.
* The `EventAggregator` runs an aggregation function over each event. This function buckets each event based on an `aggregateKey`,
and identifies the event uniquely with a `localKey` in that bucket.
* The default aggregation function groups similar events that differ only by `event.Message`. It's `localKey` is `event.Message` and its aggregate key is produced by joining:
* `event.Source.Component`
* `event.Source.Host`
* `event.InvolvedObject.Kind`
* `event.InvolvedObject.Namespace`
* `event.InvolvedObject.Name`
* `event.InvolvedObject.UID`
* `event.InvolvedObject.APIVersion`
* `event.Reason`
* If the `EventAggregator` observes a similar event produced 10 times in a 10 minute window, it drops the event that was provided as
input and creates a new event that differs only on the message. The message denotes that this event is used to group similar events
that matched on reason. This aggregated `Event` is then used in the event processing sequence.
* The `EventLogger` observes the event out of `EventAggregation` and tracks the number of times it has observed that event previously
by incrementing a key in a cache associated with that matching event.
* The key in the cache is generated from the event object minus timestamps/count/transient fields, specifically the following events fields are used to construct a unique key for an event:
* `event.Source.Component`
* `event.Source.Host`
@ -71,7 +98,7 @@ Each binary that generates events:
* `event.InvolvedObject.APIVersion`
* `event.Reason`
* `event.Message`
* The LRU cache is capped at 4096 events. That means if a component (e.g. kubelet) runs for a long period of time and generates tons of unique events, the previously generated events cache will not grow unchecked in memory. Instead, after 4096 unique events are generated, the oldest events are evicted from the cache.
* The LRU cache is capped at 4096 events for both `EventAggregator` and `EventLogger`. That means if a component (e.g. kubelet) runs for a long period of time and generates tons of unique events, the previously generated events cache will not grow unchecked in memory. Instead, after 4096 unique events are generated, the oldest events are evicted from the cache.
* When an event is generated, the previously generated events cache is checked (see [`pkg/client/unversioned/record/event.go`](http://releases.k8s.io/HEAD/pkg/client/unversioned/record/event.go)).
* If the key for the new event matches the key for a previously generated event (meaning all of the above fields match between the new event and some previously generated event), then the event is considered to be a duplicate and the existing event entry is updated in etcd:
* The new PUT (update) event API is called to update the existing event entry in etcd with the new last seen timestamp and count.

View File

@ -17,7 +17,6 @@ limitations under the License.
package record
import (
"encoding/json"
"fmt"
"math/rand"
"time"
@ -28,7 +27,6 @@ import (
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/strategicpatch"
"k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog"
@ -107,33 +105,23 @@ func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSin
// The default math/rand package functions aren't thread safe, so create a
// new Rand object for each StartRecording call.
randGen := rand.New(rand.NewSource(time.Now().UnixNano()))
var eventCache *historyCache = NewEventCache()
eventCorrelator := NewEventCorrelator(util.RealClock{})
return eventBroadcaster.StartEventWatcher(
func(event *api.Event) {
// Make a copy before modification, because there could be multiple listeners.
// Events are safe to copy like this.
eventCopy := *event
event = &eventCopy
var patch []byte
previousEvent := eventCache.getEvent(event)
updateExistingEvent := previousEvent.Count > 0
if updateExistingEvent {
// we still need to copy Name because the Patch relies on the Name to find the target event
event.Name = previousEvent.Name
event.Count = previousEvent.Count + 1
// we need to make sure the Count and LastTimestamp are the only differences between event and the eventCopy2
eventCopy2 := *event
eventCopy2.Count = 0
eventCopy2.LastTimestamp = unversioned.NewTime(time.Unix(0, 0))
newData, _ := json.Marshal(event)
oldData, _ := json.Marshal(eventCopy2)
patch, _ = strategicpatch.CreateStrategicMergePatch(oldData, newData, event)
result, err := eventCorrelator.EventCorrelate(event)
if err != nil {
util.HandleError(err)
}
if result.Skip {
return
}
tries := 0
for {
if recordEvent(sink, event, patch, updateExistingEvent, eventCache) {
if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) {
break
}
tries++
@ -167,7 +155,7 @@ func isKeyNotFoundError(err error) bool {
// was successfully recorded or discarded, false if it should be retried.
// If updateExistingEvent is false, it creates a new event, otherwise it updates
// existing event.
func recordEvent(sink EventSink, event *api.Event, patch []byte, updateExistingEvent bool, eventCache *historyCache) bool {
func recordEvent(sink EventSink, event *api.Event, patch []byte, updateExistingEvent bool, eventCorrelator *EventCorrelator) bool {
var newEvent *api.Event
var err error
if updateExistingEvent {
@ -180,7 +168,8 @@ func recordEvent(sink EventSink, event *api.Event, patch []byte, updateExistingE
newEvent, err = sink.Create(event)
}
if err == nil {
eventCache.addOrUpdateEvent(newEvent)
// we need to update our event correlator with the server returned state to handle name/resourceversion
eventCorrelator.UpdateState(newEvent)
return true
}

View File

@ -19,10 +19,8 @@ package record
import (
"encoding/json"
"fmt"
"reflect"
"runtime"
"strconv"
"strings"
"testing"
"time"
@ -85,7 +83,7 @@ func OnPatchFactory(testCache map[string]*api.Event, patchEvent chan<- *api.Even
return func(event *api.Event, patch []byte) (*api.Event, error) {
cachedEvent, found := testCache[getEventKey(event)]
if !found {
return nil, fmt.Errorf("unexpected error: couldn't find Event in testCache. Try to find Event: %v", event)
return nil, fmt.Errorf("unexpected error: couldn't find Event in testCache.")
}
originalData, err := json.Marshal(cachedEvent)
if err != nil {
@ -337,7 +335,7 @@ func TestEventf(t *testing.T) {
clock := &util.FakeClock{time.Now()}
recorder := recorderWithFakeClock(api.EventSource{Component: "eventTest"}, eventBroadcaster, clock)
for _, item := range table {
for index, item := range table {
clock.Step(1 * time.Second)
logWatcher1 := eventBroadcaster.StartLogging(t.Logf) // Prove that it is useful
logWatcher2 := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) {
@ -353,10 +351,10 @@ func TestEventf(t *testing.T) {
// validate event
if item.expectUpdate {
actualEvent := <-patchEvent
validateEvent(actualEvent, item.expect, t)
validateEvent(string(index), actualEvent, item.expect, t)
} else {
actualEvent := <-createEvent
validateEvent(actualEvent, item.expect, t)
validateEvent(string(index), actualEvent, item.expect, t)
}
logWatcher1.Stop()
logWatcher2.Stop()
@ -364,41 +362,6 @@ func TestEventf(t *testing.T) {
sinkWatcher.Stop()
}
func validateEvent(actualEvent *api.Event, expectedEvent *api.Event, t *testing.T) (*api.Event, error) {
recvEvent := *actualEvent
expectCompression := expectedEvent.Count > 1
t.Logf("expectedEvent.Count is %d\n", expectedEvent.Count)
// Just check that the timestamp was set.
if recvEvent.FirstTimestamp.IsZero() || recvEvent.LastTimestamp.IsZero() {
t.Errorf("timestamp wasn't set: %#v", recvEvent)
}
actualFirstTimestamp := recvEvent.FirstTimestamp
actualLastTimestamp := recvEvent.LastTimestamp
if actualFirstTimestamp.Equal(actualLastTimestamp) {
if expectCompression {
t.Errorf("FirstTimestamp (%q) and LastTimestamp (%q) must be different to indicate event compression happened, but were the same. Actual Event: %#v", actualFirstTimestamp, actualLastTimestamp, recvEvent)
}
} else {
if expectedEvent.Count == 1 {
t.Errorf("FirstTimestamp (%q) and LastTimestamp (%q) must be equal to indicate only one occurrence of the event, but were different. Actual Event: %#v", actualFirstTimestamp, actualLastTimestamp, recvEvent)
}
}
// Temp clear time stamps for comparison because actual values don't matter for comparison
recvEvent.FirstTimestamp = expectedEvent.FirstTimestamp
recvEvent.LastTimestamp = expectedEvent.LastTimestamp
// Check that name has the right prefix.
if n, en := recvEvent.Name, expectedEvent.Name; !strings.HasPrefix(n, en) {
t.Errorf("Name '%v' does not contain prefix '%v'", n, en)
}
recvEvent.Name = expectedEvent.Name
if e, a := expectedEvent, &recvEvent; !reflect.DeepEqual(e, a) {
t.Errorf("diff: %s", util.ObjectGoPrintDiff(e, a))
}
recvEvent.FirstTimestamp = actualFirstTimestamp
recvEvent.LastTimestamp = actualLastTimestamp
return actualEvent, nil
}
func recorderWithFakeClock(eventSource api.EventSource, eventBroadcaster EventBroadcaster, clock util.Clock) EventRecorder {
return &recorderImpl{eventSource, eventBroadcaster.(*eventBroadcasterImpl).Broadcaster, clock}
}
@ -520,7 +483,8 @@ func TestLotsOfEvents(t *testing.T) {
APIVersion: "version",
}
for i := 0; i < maxQueuedEvents; i++ {
go recorder.Eventf(ref, "Reason", strconv.Itoa(i))
// we need to vary the reason to prevent aggregation
go recorder.Eventf(ref, "Reason-"+string(i), strconv.Itoa(i))
}
// Make sure no events were dropped by either of the listeners.
for i := 0; i < maxQueuedEvents; i++ {
@ -605,7 +569,7 @@ func TestEventfNoNamespace(t *testing.T) {
clock := &util.FakeClock{time.Now()}
recorder := recorderWithFakeClock(api.EventSource{Component: "eventTest"}, eventBroadcaster, clock)
for _, item := range table {
for index, item := range table {
clock.Step(1 * time.Second)
logWatcher1 := eventBroadcaster.StartLogging(t.Logf) // Prove that it is useful
logWatcher2 := eventBroadcaster.StartLogging(func(formatter string, args ...interface{}) {
@ -621,10 +585,10 @@ func TestEventfNoNamespace(t *testing.T) {
// validate event
if item.expectUpdate {
actualEvent := <-patchEvent
validateEvent(actualEvent, item.expect, t)
validateEvent(string(index), actualEvent, item.expect, t)
} else {
actualEvent := <-createEvent
validateEvent(actualEvent, item.expect, t)
validateEvent(string(index), actualEvent, item.expect, t)
}
logWatcher1.Stop()
@ -878,33 +842,33 @@ func TestMultiSinkCache(t *testing.T) {
recorder := recorderWithFakeClock(api.EventSource{Component: "eventTest"}, eventBroadcaster, clock)
sinkWatcher := eventBroadcaster.StartRecordingToSink(&testEvents)
for _, item := range table {
for index, item := range table {
clock.Step(1 * time.Second)
recorder.Eventf(item.obj, item.reason, item.messageFmt, item.elements...)
// validate event
if item.expectUpdate {
actualEvent := <-patchEvent
validateEvent(actualEvent, item.expect, t)
validateEvent(string(index), actualEvent, item.expect, t)
} else {
actualEvent := <-createEvent
validateEvent(actualEvent, item.expect, t)
validateEvent(string(index), actualEvent, item.expect, t)
}
}
// Another StartRecordingToSink call should start to record events with new clean cache.
sinkWatcher2 := eventBroadcaster.StartRecordingToSink(&testEvents2)
for _, item := range table {
for index, item := range table {
clock.Step(1 * time.Second)
recorder.Eventf(item.obj, item.reason, item.messageFmt, item.elements...)
// validate event
if item.expectUpdate {
actualEvent := <-patchEvent2
validateEvent(actualEvent, item.expect, t)
validateEvent(string(index), actualEvent, item.expect, t)
} else {
actualEvent := <-createEvent2
validateEvent(actualEvent, item.expect, t)
validateEvent(string(index), actualEvent, item.expect, t)
}
}

View File

@ -17,85 +17,341 @@ limitations under the License.
package record
import (
"encoding/json"
"fmt"
"strings"
"sync"
"time"
"github.com/golang/groupcache/lru"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/strategicpatch"
)
type history struct {
// The number of times the event has occurred since first occurrence.
Count int
// The time at which the event was first recorded.
FirstTimestamp unversioned.Time
// The unique name of the first occurrence of this event
Name string
// Resource version returned from previous interaction with server
ResourceVersion string
}
const (
maxLruCacheEntries = 4096
// if we see the same event that varies only by message
// more than 10 times in a 10 minute period, aggregate the event
defaultAggregateMaxEvents = 10
defaultAggregateIntervalInSeconds = 600
)
type historyCache struct {
// getEventKey builds unique event key based on source, involvedObject, reason, message
func getEventKey(event *api.Event) string {
return strings.Join([]string{
event.Source.Component,
event.Source.Host,
event.InvolvedObject.Kind,
event.InvolvedObject.Namespace,
event.InvolvedObject.Name,
string(event.InvolvedObject.UID),
event.InvolvedObject.APIVersion,
event.Reason,
event.Message,
},
"")
}
// EventFilterFunc is a function that returns true if the event should be skipped
type EventFilterFunc func(event *api.Event) bool
// DefaultEventFilterFunc returns false for all incoming events
func DefaultEventFilterFunc(event *api.Event) bool {
return false
}
// EventAggregatorKeyFunc is responsible for grouping events for aggregation
// It returns a tuple of the following:
// aggregateKey - key the identifies the aggregate group to bucket this event
// localKey - key that makes this event in the local group
type EventAggregatorKeyFunc func(event *api.Event) (aggregateKey string, localKey string)
// EventAggregatorByReasonFunc aggregates events by exact match on event.Source, event.InvolvedObject, and event.Reason
func EventAggregatorByReasonFunc(event *api.Event) (string, string) {
return strings.Join([]string{
event.Source.Component,
event.Source.Host,
event.InvolvedObject.Kind,
event.InvolvedObject.Namespace,
event.InvolvedObject.Name,
string(event.InvolvedObject.UID),
event.InvolvedObject.APIVersion,
event.Reason,
},
""), event.Message
}
// EventAggregatorMessageFunc is responsible for producing an aggregation message
type EventAggregatorMessageFunc func(event *api.Event) string
// EventAggregratorByReasonMessageFunc returns an aggregate message by prefixing the incoming message
func EventAggregatorByReasonMessageFunc(event *api.Event) string {
return "(events with common reason combined)"
}
// EventAggregator identifies similar events and aggregates them into a single event
type EventAggregator struct {
sync.RWMutex
// The cache that manages aggregation state
cache *lru.Cache
// The function that groups events for aggregation
keyFunc EventAggregatorKeyFunc
// The function that generates a message for an aggregate event
messageFunc EventAggregatorMessageFunc
// The maximum number of events in the specified interval before aggregation occurs
maxEvents int
// The amount of time in seconds that must transpire since the last occurrence of a similar event before it's considered new
maxIntervalInSeconds int
// clock is used to allow for testing over a time interval
clock util.Clock
}
// NewEventAggregator returns a new instance of an EventAggregator
func NewEventAggregator(lruCacheSize int, keyFunc EventAggregatorKeyFunc, messageFunc EventAggregatorMessageFunc,
maxEvents int, maxIntervalInSeconds int, clock util.Clock) *EventAggregator {
return &EventAggregator{
cache: lru.New(lruCacheSize),
keyFunc: keyFunc,
messageFunc: messageFunc,
maxEvents: maxEvents,
maxIntervalInSeconds: maxIntervalInSeconds,
clock: clock,
}
}
// aggregateRecord holds data used to perform aggregation decisions
type aggregateRecord struct {
// we track the number of unique local keys we have seen in the aggregate set to know when to actually aggregate
// if the size of this set exceeds the max, we know we need to aggregate
localKeys sets.String
// The last time at which the aggregate was recorded
lastTimestamp unversioned.Time
}
// EventAggregate identifies similar events and groups into a common event if required
func (e *EventAggregator) EventAggregate(newEvent *api.Event) (*api.Event, error) {
aggregateKey, localKey := e.keyFunc(newEvent)
now := unversioned.NewTime(e.clock.Now())
record := aggregateRecord{localKeys: sets.NewString(), lastTimestamp: now}
e.Lock()
defer e.Unlock()
value, found := e.cache.Get(aggregateKey)
if found {
record = value.(aggregateRecord)
}
// if the last event was far enough in the past, it is not aggregated, and we must reset state
maxInterval := time.Duration(e.maxIntervalInSeconds) * time.Second
interval := now.Time.Sub(record.lastTimestamp.Time)
if interval > maxInterval {
record = aggregateRecord{localKeys: sets.NewString()}
}
record.localKeys.Insert(localKey)
record.lastTimestamp = now
e.cache.Add(aggregateKey, record)
if record.localKeys.Len() < e.maxEvents {
return newEvent, nil
}
// do not grow our local key set any larger than max
record.localKeys.PopAny()
// create a new aggregate event
eventCopy := &api.Event{
ObjectMeta: api.ObjectMeta{
Name: fmt.Sprintf("%v.%x", newEvent.InvolvedObject.Name, now.UnixNano()),
Namespace: newEvent.Namespace,
},
Count: 1,
FirstTimestamp: now,
InvolvedObject: newEvent.InvolvedObject,
LastTimestamp: now,
Message: e.messageFunc(newEvent),
Reason: newEvent.Reason,
Source: newEvent.Source,
}
return eventCopy, nil
}
// eventLog records data about when an event was observed
type eventLog struct {
// The number of times the event has occurred since first occurrence.
count int
// The time at which the event was first recorded.
firstTimestamp unversioned.Time
// The unique name of the first occurrence of this event
name string
// Resource version returned from previous interaction with server
resourceVersion string
}
// eventLogger logs occurrences of an event
type eventLogger struct {
sync.RWMutex
cache *lru.Cache
clock util.Clock
}
func NewEventCache() *historyCache {
return &historyCache{cache: lru.New(maxLruCacheEntries)}
// newEventLogger observes events and counts their frequencies
func newEventLogger(lruCacheEntries int, clock util.Clock) *eventLogger {
return &eventLogger{cache: lru.New(lruCacheEntries), clock: clock}
}
// addOrUpdateEvent creates a new entry for the given event in the previous events hash table if the event
// doesn't already exist, otherwise it updates the existing entry.
func (eventCache *historyCache) addOrUpdateEvent(newEvent *api.Event) history {
// eventObserve records the event, and determines if its frequency should update
func (e *eventLogger) eventObserve(newEvent *api.Event) (*api.Event, []byte, error) {
var (
patch []byte
err error
)
key := getEventKey(newEvent)
eventCache.Lock()
defer eventCache.Unlock()
eventCache.cache.Add(
eventCopy := *newEvent
event := &eventCopy
e.Lock()
defer e.Unlock()
lastObservation := e.lastEventObservationFromCache(key)
// we have seen this event before, so we must prepare a patch
if lastObservation.count > 0 {
// update the event based on the last observation so patch will work as desired
event.Name = lastObservation.name
event.ResourceVersion = lastObservation.resourceVersion
event.FirstTimestamp = lastObservation.firstTimestamp
event.Count = lastObservation.count + 1
eventCopy2 := *event
eventCopy2.Count = 0
eventCopy2.LastTimestamp = unversioned.NewTime(time.Unix(0, 0))
newData, _ := json.Marshal(event)
oldData, _ := json.Marshal(eventCopy2)
patch, err = strategicpatch.CreateStrategicMergePatch(oldData, newData, event)
}
// record our new observation
e.cache.Add(
key,
history{
Count: newEvent.Count,
FirstTimestamp: newEvent.FirstTimestamp,
Name: newEvent.Name,
ResourceVersion: newEvent.ResourceVersion,
})
return eventCache.getEventFromCache(key)
eventLog{
count: event.Count,
firstTimestamp: event.FirstTimestamp,
name: event.Name,
resourceVersion: event.ResourceVersion,
},
)
return event, patch, err
}
// getEvent returns the entry corresponding to the given event, if one exists, otherwise a history object
// with a count of 0 is returned.
func (eventCache *historyCache) getEvent(event *api.Event) history {
// updateState updates its internal tracking information based on latest server state
func (e *eventLogger) updateState(event *api.Event) {
key := getEventKey(event)
eventCache.RLock()
defer eventCache.RUnlock()
return eventCache.getEventFromCache(key)
e.Lock()
defer e.Unlock()
// record our new observation
e.cache.Add(
key,
eventLog{
count: event.Count,
firstTimestamp: event.FirstTimestamp,
name: event.Name,
resourceVersion: event.ResourceVersion,
},
)
}
func (eventCache *historyCache) getEventFromCache(key string) history {
value, ok := eventCache.cache.Get(key)
// lastEventObservationFromCache returns the event from the cache, reads must be protected via external lock
func (e *eventLogger) lastEventObservationFromCache(key string) eventLog {
value, ok := e.cache.Get(key)
if ok {
historyValue, ok := value.(history)
observationValue, ok := value.(eventLog)
if ok {
return historyValue
return observationValue
}
}
return history{}
return eventLog{}
}
func getEventKey(event *api.Event) string {
return event.Source.Component +
event.Source.Host +
event.InvolvedObject.Kind +
event.InvolvedObject.Namespace +
event.InvolvedObject.Name +
string(event.InvolvedObject.UID) +
event.InvolvedObject.APIVersion +
event.Reason +
event.Message
// EventCorrelator processes all incoming events and performs analysis to avoid overwhelming the system. It can filter all
// incoming events to see if the event should be filtered from further processing. It can aggregate similar events that occur
// frequently to protect the system from spamming events that are difficult for users to distinguish. It performs de-duplication
// to ensure events that are observed multiple times are compacted into a single event with increasing counts.
type EventCorrelator struct {
// the function to filter the event
filterFunc EventFilterFunc
// the object that performs event aggregation
aggregator *EventAggregator
// the object that observes events as they come through
logger *eventLogger
}
// EventCorrelateResult is the result of a Correlate
type EventCorrelateResult struct {
// the event after correlation
Event *api.Event
// if provided, perform a strategic patch when updating the record on the server
Patch []byte
// if true, do no further processing of the event
Skip bool
}
// NewEventCorrelator returns an EventCorrelator configured with default values.
//
// The EventCorrelator is responsible for event filtering, aggregating, and counting
// prior to interacting with the API server to record the event.
//
// The default behavior is as follows:
// * No events are filtered from being recorded
// * Aggregation is performed if a similar event is recorded 10 times in a
// in a 10 minute rolling interval. A similar event is an event that varies only by
// the Event.Message field. Rather than recording the precise event, aggregation
// will create a new event whose message reports that it has combined events with
// the same reason.
// * Events are incrementally counted if the exact same event is encountered multiple
// times.
func NewEventCorrelator(clock util.Clock) *EventCorrelator {
cacheSize := maxLruCacheEntries
return &EventCorrelator{
filterFunc: DefaultEventFilterFunc,
aggregator: NewEventAggregator(
cacheSize,
EventAggregatorByReasonFunc,
EventAggregatorByReasonMessageFunc,
defaultAggregateMaxEvents,
defaultAggregateIntervalInSeconds,
clock),
logger: newEventLogger(cacheSize, clock),
}
}
// EventCorrelate filters, aggregates, counts, and de-duplicates all incoming events
func (c *EventCorrelator) EventCorrelate(newEvent *api.Event) (*EventCorrelateResult, error) {
if c.filterFunc(newEvent) {
return &EventCorrelateResult{Skip: true}, nil
}
aggregateEvent, err := c.aggregator.EventAggregate(newEvent)
if err != nil {
return &EventCorrelateResult{}, err
}
observedEvent, patch, err := c.logger.eventObserve(aggregateEvent)
return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err
}
// UpdateState based on the latest observed state from server
func (c *EventCorrelator) UpdateState(event *api.Event) {
c.logger.updateState(event)
}

View File

@ -17,26 +17,32 @@ limitations under the License.
package record
import (
"reflect"
"strings"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/util"
)
func TestAddOrUpdateEventNoExisting(t *testing.T) {
// Arrange
eventCache := NewEventCache()
func makeObjectReference(kind, name, namespace string) api.ObjectReference {
return api.ObjectReference{
Kind: kind,
Name: name,
Namespace: namespace,
UID: "C934D34AFB20242",
APIVersion: "version",
}
}
func makeEvent(reason, message string, involvedObject api.ObjectReference) api.Event {
eventTime := unversioned.Now()
event := api.Event{
Reason: "my reasons are many",
Message: "my message is love",
InvolvedObject: api.ObjectReference{
Kind: "Pod",
Name: "awesome.name",
Namespace: "betterNamespace",
UID: "C934D34AFB20242",
APIVersion: "version",
},
Reason: reason,
Message: message,
InvolvedObject: involvedObject,
Source: api.EventSource{
Component: "kubelet",
Host: "kublet.node1",
@ -45,148 +51,202 @@ func TestAddOrUpdateEventNoExisting(t *testing.T) {
FirstTimestamp: eventTime,
LastTimestamp: eventTime,
}
// Act
result := eventCache.addOrUpdateEvent(&event)
// Assert
compareEventWithHistoryEntry(&event, &result, t)
return event
}
func TestAddOrUpdateEventExisting(t *testing.T) {
// Arrange
eventCache := NewEventCache()
event1Time := unversioned.Unix(2324, 2342)
event2Time := unversioned.Now()
event1 := api.Event{
Reason: "something happened",
Message: "can you believe it?",
ObjectMeta: api.ObjectMeta{
ResourceVersion: "rs1",
},
InvolvedObject: api.ObjectReference{
Kind: "Scheduler",
Name: "anOkName",
Namespace: "someNamespace",
UID: "C934D3234CD0242",
APIVersion: "version",
},
Source: api.EventSource{
Component: "kubelet",
Host: "kublet.node2",
},
Count: 1,
FirstTimestamp: event1Time,
LastTimestamp: event1Time,
func makeEvents(num int, template api.Event) []api.Event {
events := []api.Event{}
for i := 0; i < num; i++ {
events = append(events, template)
}
event2 := api.Event{
Reason: "something happened",
Message: "can you believe it?",
ObjectMeta: api.ObjectMeta{
ResourceVersion: "rs2",
},
InvolvedObject: api.ObjectReference{
Kind: "Scheduler",
Name: "anOkName",
Namespace: "someNamespace",
UID: "C934D3234CD0242",
APIVersion: "version",
},
Source: api.EventSource{
Component: "kubelet",
Host: "kublet.node2",
},
Count: 3,
FirstTimestamp: event1Time,
LastTimestamp: event2Time,
}
// Act
eventCache.addOrUpdateEvent(&event1)
result1 := eventCache.addOrUpdateEvent(&event2)
result2 := eventCache.getEvent(&event1)
// Assert
compareEventWithHistoryEntry(&event2, &result1, t)
compareEventWithHistoryEntry(&event2, &result2, t)
return events
}
func TestGetEventNoExisting(t *testing.T) {
// Arrange
eventCache := NewEventCache()
event := api.Event{
Reason: "to be or not to be",
Message: "do I exist",
InvolvedObject: api.ObjectReference{
Kind: "Controller",
Name: "iAmAController",
Namespace: "IHaveANamespace",
UID: "9039D34AFBCDA42",
APIVersion: "version",
},
Source: api.EventSource{
Component: "kubelet",
Host: "kublet.node3",
},
Count: 1,
func makeUniqueEvents(num int) []api.Event {
events := []api.Event{}
kind := "Pod"
for i := 0; i < num; i++ {
reason := strings.Join([]string{"reason", string(i)}, "-")
message := strings.Join([]string{"message", string(i)}, "-")
name := strings.Join([]string{"pod", string(i)}, "-")
namespace := strings.Join([]string{"ns", string(i)}, "-")
involvedObject := makeObjectReference(kind, name, namespace)
events = append(events, makeEvent(reason, message, involvedObject))
}
return events
}
// Act
existingEvent := eventCache.getEvent(&event)
func makeSimilarEvents(num int, template api.Event, messagePrefix string) []api.Event {
events := makeEvents(num, template)
for i := range events {
events[i].Message = strings.Join([]string{messagePrefix, string(i), events[i].Message}, "-")
}
return events
}
// Assert
if existingEvent.Count != 0 {
t.Fatalf("There should be no existing instance of this event in the hash table.")
func setCount(event api.Event, count int) api.Event {
event.Count = count
return event
}
func validateEvent(messagePrefix string, actualEvent *api.Event, expectedEvent *api.Event, t *testing.T) (*api.Event, error) {
recvEvent := *actualEvent
expectCompression := expectedEvent.Count > 1
t.Logf("%v - expectedEvent.Count is %d\n", messagePrefix, expectedEvent.Count)
// Just check that the timestamp was set.
if recvEvent.FirstTimestamp.IsZero() || recvEvent.LastTimestamp.IsZero() {
t.Errorf("%v - timestamp wasn't set: %#v", messagePrefix, recvEvent)
}
actualFirstTimestamp := recvEvent.FirstTimestamp
actualLastTimestamp := recvEvent.LastTimestamp
if actualFirstTimestamp.Equal(actualLastTimestamp) {
if expectCompression {
t.Errorf("%v - FirstTimestamp (%q) and LastTimestamp (%q) must be different to indicate event compression happened, but were the same. Actual Event: %#v", messagePrefix, actualFirstTimestamp, actualLastTimestamp, recvEvent)
}
} else {
if expectedEvent.Count == 1 {
t.Errorf("%v - FirstTimestamp (%q) and LastTimestamp (%q) must be equal to indicate only one occurrence of the event, but were different. Actual Event: %#v", messagePrefix, actualFirstTimestamp, actualLastTimestamp, recvEvent)
}
}
// Temp clear time stamps for comparison because actual values don't matter for comparison
recvEvent.FirstTimestamp = expectedEvent.FirstTimestamp
recvEvent.LastTimestamp = expectedEvent.LastTimestamp
// Check that name has the right prefix.
if n, en := recvEvent.Name, expectedEvent.Name; !strings.HasPrefix(n, en) {
t.Errorf("%v - Name '%v' does not contain prefix '%v'", messagePrefix, n, en)
}
recvEvent.Name = expectedEvent.Name
if e, a := expectedEvent, &recvEvent; !reflect.DeepEqual(e, a) {
t.Errorf("%v - diff: %s", messagePrefix, util.ObjectGoPrintDiff(e, a))
}
recvEvent.FirstTimestamp = actualFirstTimestamp
recvEvent.LastTimestamp = actualLastTimestamp
return actualEvent, nil
}
// TestDefaultEventFilterFunc ensures that no events are filtered
func TestDefaultEventFilterFunc(t *testing.T) {
event := makeEvent("end-of-world", "it was fun", makeObjectReference("Pod", "pod1", "other"))
if DefaultEventFilterFunc(&event) {
t.Fatalf("DefaultEventFilterFunc should always return false")
}
}
func TestGetEventExisting(t *testing.T) {
// Arrange
eventCache := NewEventCache()
eventTime := unversioned.Now()
event := api.Event{
Reason: "do I exist",
Message: "I do, oh my",
InvolvedObject: api.ObjectReference{
Kind: "Pod",
Name: "clever.name.here",
Namespace: "spaceOfName",
UID: "D933D32AFB2A238",
APIVersion: "version",
},
Source: api.EventSource{
Component: "kubelet",
Host: "kublet.node4",
},
Count: 1,
FirstTimestamp: eventTime,
LastTimestamp: eventTime,
// TestEventAggregatorByReasonFunc ensures that two events are aggregated if they vary only by event.message
func TestEventAggregatorByReasonFunc(t *testing.T) {
event1 := makeEvent("end-of-world", "it was fun", makeObjectReference("Pod", "pod1", "other"))
event2 := makeEvent("end-of-world", "it was awful", makeObjectReference("Pod", "pod1", "other"))
event3 := makeEvent("nevermind", "it was a bug", makeObjectReference("Pod", "pod1", "other"))
aggKey1, localKey1 := EventAggregatorByReasonFunc(&event1)
aggKey2, localKey2 := EventAggregatorByReasonFunc(&event2)
aggKey3, _ := EventAggregatorByReasonFunc(&event3)
if aggKey1 != aggKey2 {
t.Errorf("Expected %v equal %v", aggKey1, aggKey2)
}
if localKey1 == localKey2 {
t.Errorf("Expected %v to not equal %v", aggKey1, aggKey3)
}
if aggKey1 == aggKey3 {
t.Errorf("Expected %v to not equal %v", aggKey1, aggKey3)
}
eventCache.addOrUpdateEvent(&event)
// Act
existingEvent := eventCache.getEvent(&event)
// Assert
compareEventWithHistoryEntry(&event, &existingEvent, t)
}
func compareEventWithHistoryEntry(expected *api.Event, actual *history, t *testing.T) {
if actual.Count != expected.Count {
t.Fatalf("There should be one existing instance of this event in the hash table.")
// TestEventAggregatorByReasonMessageFunc validates the proper output for an aggregate message
func TestEventAggregatorByReasonMessageFunc(t *testing.T) {
expected := "(events with common reason combined)"
event1 := makeEvent("end-of-world", "it was fun", makeObjectReference("Pod", "pod1", "other"))
if actual := EventAggregatorByReasonMessageFunc(&event1); expected != actual {
t.Errorf("Expected %v got %v", expected, actual)
}
}
// TestEventCorrelator validates proper counting, aggregation of events
func TestEventCorrelator(t *testing.T) {
firstEvent := makeEvent("first", "i am first", makeObjectReference("Pod", "my-pod", "my-ns"))
duplicateEvent := makeEvent("duplicate", "me again", makeObjectReference("Pod", "my-pod", "my-ns"))
uniqueEvent := makeEvent("unique", "snowflake", makeObjectReference("Pod", "my-pod", "my-ns"))
similarEvent := makeEvent("similar", "similar message", makeObjectReference("Pod", "my-pod", "my-ns"))
aggregateEvent := makeEvent(similarEvent.Reason, EventAggregatorByReasonMessageFunc(&similarEvent), similarEvent.InvolvedObject)
scenario := map[string]struct {
previousEvents []api.Event
newEvent api.Event
expectedEvent api.Event
intervalSeconds int
}{
"create-a-single-event": {
previousEvents: []api.Event{},
newEvent: firstEvent,
expectedEvent: setCount(firstEvent, 1),
intervalSeconds: 5,
},
"the-same-event-should-just-count": {
previousEvents: makeEvents(1, duplicateEvent),
newEvent: duplicateEvent,
expectedEvent: setCount(duplicateEvent, 2),
intervalSeconds: 5,
},
"the-same-event-should-just-count-even-if-more-than-aggregate": {
previousEvents: makeEvents(defaultAggregateMaxEvents, duplicateEvent),
newEvent: duplicateEvent,
expectedEvent: setCount(duplicateEvent, defaultAggregateMaxEvents+1),
intervalSeconds: 5,
},
"create-many-unique-events": {
previousEvents: makeUniqueEvents(30),
newEvent: uniqueEvent,
expectedEvent: setCount(uniqueEvent, 1),
intervalSeconds: 5,
},
"similar-events-should-aggregate-event": {
previousEvents: makeSimilarEvents(defaultAggregateMaxEvents-1, similarEvent, similarEvent.Message),
newEvent: similarEvent,
expectedEvent: setCount(aggregateEvent, 1),
intervalSeconds: 5,
},
"similar-events-many-times-should-count-the-aggregate": {
previousEvents: makeSimilarEvents(defaultAggregateMaxEvents, similarEvent, similarEvent.Message),
newEvent: similarEvent,
expectedEvent: setCount(aggregateEvent, 2),
intervalSeconds: 5,
},
"similar-events-whose-interval-is-greater-than-aggregate-interval-do-not-aggregate": {
previousEvents: makeSimilarEvents(defaultAggregateMaxEvents-1, similarEvent, similarEvent.Message),
newEvent: similarEvent,
expectedEvent: setCount(similarEvent, 1),
intervalSeconds: defaultAggregateIntervalInSeconds,
},
}
for testScenario, testInput := range scenario {
eventInterval := time.Duration(testInput.intervalSeconds) * time.Second
clock := util.IntervalClock{Time: time.Now(), Duration: eventInterval}
correlator := NewEventCorrelator(&clock)
for i := range testInput.previousEvents {
event := testInput.previousEvents[i]
now := unversioned.NewTime(clock.Now())
event.FirstTimestamp = now
event.LastTimestamp = now
result, err := correlator.EventCorrelate(&event)
if err != nil {
t.Errorf("scenario %v: unexpected error playing back prevEvents %v", testScenario, err)
}
correlator.UpdateState(result.Event)
}
// update the input to current clock value
now := unversioned.NewTime(clock.Now())
testInput.newEvent.FirstTimestamp = now
testInput.newEvent.LastTimestamp = now
result, err := correlator.EventCorrelate(&testInput.newEvent)
if err != nil {
t.Errorf("scenario %v: unexpected error correlating input event %v", testScenario, err)
}
_, err = validateEvent(testScenario, result.Event, &testInput.expectedEvent, t)
if err != nil {
t.Errorf("scenario %v: unexpected error validating result %v", testScenario, err)
}
}
if !actual.FirstTimestamp.Equal(expected.FirstTimestamp) {
t.Fatalf("Unexpected FirstTimestamp. Expected: <%v> Actual: <%v>", expected.FirstTimestamp, actual.FirstTimestamp)
}
if actual.Name != expected.Name {
t.Fatalf("Unexpected Name. Expected: <%v> Actual: <%v>", expected.Name, actual.Name)
}
if actual.ResourceVersion != expected.ResourceVersion {
t.Fatalf("Unexpected ResourceVersion. Expected: <%v> Actual: <%v>", expected.ResourceVersion, actual.ResourceVersion)
}
}

View File

@ -59,3 +59,20 @@ func (f *FakeClock) Since(ts time.Time) time.Duration {
func (f *FakeClock) Step(d time.Duration) {
f.Time = f.Time.Add(d)
}
// IntervalClock implements Clock, but each invocation of Now steps the clock forward the specified duration
type IntervalClock struct {
Time time.Time
Duration time.Duration
}
// Now returns i's time.
func (i *IntervalClock) Now() time.Time {
i.Time = i.Time.Add(i.Duration)
return i.Time
}
// Since returns time since the time in i.
func (i *IntervalClock) Since(ts time.Time) time.Duration {
return i.Time.Sub(ts)
}