stream: move event filtering to PayloadEvents

Removes the weirdness around PayloadEvents.FilterByKey
pull/9114/head
Daniel Nephin 4 years ago
parent 79b5ca1ce6
commit 36202f7938

@ -24,62 +24,55 @@ type Payload interface {
// Generally this means that the payload matches the key and namespace or // Generally this means that the payload matches the key and namespace or
// the payload is a special framing event that should be returned to every // the payload is a special framing event that should be returned to every
// subscription. // subscription.
// TODO: rename to MatchesKey
FilterByKey(key, namespace string) bool FilterByKey(key, namespace string) bool
} }
// Len returns the number of events contained within this event. If the Payload // PayloadEvents is an Payload which contains multiple Events.
// is a []Event, the length of that slice is returned. Otherwise 1 is returned. type PayloadEvents struct {
func (e Event) Len() int { Items []Event
if batch, ok := e.Payload.(PayloadEvents); ok {
return len(batch)
}
return 1
} }
// Filter returns an Event filtered to only those Events where f returns true. func NewPayloadEvents(items ...Event) *PayloadEvents {
// If the second return value is false, every Event was removed by the filter. return &PayloadEvents{Items: items}
func (e Event) Filter(f func(Event) bool) (Event, bool) { }
batch, ok := e.Payload.(PayloadEvents)
if !ok { func (p *PayloadEvents) filter(f func(Event) bool) bool {
return e, f(e) items := p.Items
}
// To avoid extra allocations, iterate over the list of events first and // To avoid extra allocations, iterate over the list of events first and
// get a count of the total desired size. This trades off some extra cpu // get a count of the total desired size. This trades off some extra cpu
// time in the worse case (when not all items match the filter), for // time in the worse case (when not all items match the filter), for
// fewer memory allocations. // fewer memory allocations.
var size int var size int
for idx := range batch { for idx := range items {
if f(batch[idx]) { if f(items[idx]) {
size++ size++
} }
} }
if len(batch) == size || size == 0 { if len(items) == size || size == 0 {
return e, size != 0 return size != 0
} }
filtered := make(PayloadEvents, 0, size) filtered := make([]Event, 0, size)
for idx := range batch { for idx := range items {
event := batch[idx] event := items[idx]
if f(event) { if f(event) {
filtered = append(filtered, event) filtered = append(filtered, event)
} }
} }
if len(filtered) == 0 { p.Items = filtered
return e, false return true
}
e.Payload = filtered
return e, true
} }
// PayloadEvents is an Payload which contains multiple Events. func (p *PayloadEvents) FilterByKey(key, namespace string) bool {
type PayloadEvents []Event return p.filter(func(event Event) bool {
return event.Payload.FilterByKey(key, namespace)
})
}
// TODO: this method is not called, but needs to exist so that we can store func (p *PayloadEvents) Len() int {
// a slice of events as a payload. In the future we should be able to refactor return len(p.Items)
// Event.Filter so that this FilterByKey includes the re-slicing.
func (e PayloadEvents) FilterByKey(_, _ string) bool {
return true
} }
// IsEndOfSnapshot returns true if this is a framing event that indicates the // IsEndOfSnapshot returns true if this is a framing event that indicates the

@ -15,3 +15,134 @@ func TestEvent_IsEndOfSnapshot(t *testing.T) {
require.False(t, e.IsEndOfSnapshot()) require.False(t, e.IsEndOfSnapshot())
}) })
} }
func newSimpleEvent(key string, index uint64) Event {
return Event{Index: index, Payload: simplePayload{key: key}}
}
func TestPayloadEvents_FilterByKey(t *testing.T) {
type testCase struct {
name string
req SubscribeRequest
events []Event
expectEvent bool
expected *PayloadEvents
expectedCap int
}
fn := func(t *testing.T, tc testCase) {
events := make([]Event, 0, 5)
events = append(events, tc.events...)
pe := &PayloadEvents{Items: events}
ok := pe.FilterByKey(tc.req.Key, tc.req.Namespace)
require.Equal(t, tc.expectEvent, ok)
if !tc.expectEvent {
return
}
require.Equal(t, tc.expected, pe)
// test if there was a new array allocated or not
require.Equal(t, tc.expectedCap, cap(pe.Items))
}
var testCases = []testCase{
{
name: "all events match, no key or namespace",
req: SubscribeRequest{Topic: testTopic},
events: []Event{
newSimpleEvent("One", 102),
newSimpleEvent("Two", 102)},
expectEvent: true,
expected: NewPayloadEvents(
newSimpleEvent("One", 102),
newSimpleEvent("Two", 102)),
expectedCap: 5,
},
{
name: "all events match, no namespace",
req: SubscribeRequest{Topic: testTopic, Key: "Same"},
events: []Event{
newSimpleEvent("Same", 103),
newSimpleEvent("Same", 103)},
expectEvent: true,
expected: NewPayloadEvents(
newSimpleEvent("Same", 103),
newSimpleEvent("Same", 103)),
expectedCap: 5,
},
{
name: "all events match, no key",
req: SubscribeRequest{Topic: testTopic, Namespace: "apps"},
events: []Event{
newNSEvent("Something", "apps"),
newNSEvent("Other", "apps")},
expectEvent: true,
expected: NewPayloadEvents(
newNSEvent("Something", "apps"),
newNSEvent("Other", "apps")),
expectedCap: 5,
},
{
name: "some evens match, no namespace",
req: SubscribeRequest{Topic: testTopic, Key: "Same"},
events: []Event{
newSimpleEvent("Same", 104),
newSimpleEvent("Other", 104),
newSimpleEvent("Same", 104)},
expectEvent: true,
expected: NewPayloadEvents(
newSimpleEvent("Same", 104),
newSimpleEvent("Same", 104)),
expectedCap: 2,
},
{
name: "some events match, no key",
req: SubscribeRequest{Topic: testTopic, Namespace: "apps"},
events: []Event{
newNSEvent("app1", "apps"),
newNSEvent("db1", "dbs"),
newNSEvent("app2", "apps")},
expectEvent: true,
expected: NewPayloadEvents(
newNSEvent("app1", "apps"),
newNSEvent("app2", "apps")),
expectedCap: 2,
},
{
name: "no events match key",
req: SubscribeRequest{Topic: testTopic, Key: "Other"},
events: []Event{
newSimpleEvent("Same", 0),
newSimpleEvent("Same", 0)},
},
{
name: "no events match namespace",
req: SubscribeRequest{Topic: testTopic, Namespace: "apps"},
events: []Event{
newNSEvent("app1", "group1"),
newNSEvent("app2", "group2")},
expectEvent: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
fn(t, tc)
})
}
}
func newNSEvent(key, namespace string) Event {
return Event{Index: 22, Payload: nsPayload{key: key, namespace: namespace}}
}
type nsPayload struct {
key string
namespace string
value string
}
func (p nsPayload) FilterByKey(key, namespace string) bool {
return (key == "" || key == p.key) && (namespace == "" || namespace == p.namespace)
}

@ -101,8 +101,8 @@ func (s *Subscription) Next(ctx context.Context) (Event, error) {
if len(next.Events) == 0 { if len(next.Events) == 0 {
continue continue
} }
event, ok := filterByKey(s.req, next.Events) event := newEventFromBatch(s.req, next.Events)
if !ok { if !event.Payload.FilterByKey(s.req.Key, s.req.Namespace) {
continue continue
} }
return event, nil return event, nil
@ -128,22 +128,10 @@ func newEventFromBatch(req SubscribeRequest, events []Event) Event {
return Event{ return Event{
Topic: req.Topic, Topic: req.Topic,
Index: first.Index, Index: first.Index,
Payload: PayloadEvents(events), Payload: NewPayloadEvents(events...),
} }
} }
func filterByKey(req SubscribeRequest, events []Event) (Event, bool) {
event := newEventFromBatch(req, events)
if req.Key == "" && req.Namespace == "" {
return event, true
}
fn := func(e Event) bool {
return e.Payload.FilterByKey(req.Key, req.Namespace)
}
return event.Filter(fn)
}
// Close the subscription. Subscribers will receive an error when they call Next, // Close the subscription. Subscribers will receive an error when they call Next,
// and will need to perform a new Subscribe request. // and will need to perform a new Subscribe request.
// It is safe to call from any goroutine. // It is safe to call from any goroutine.

@ -138,147 +138,29 @@ func publishTestEvent(index uint64, b *eventBuffer, key string) {
b.Append([]Event{e}) b.Append([]Event{e})
} }
func newSimpleEvent(key string, index uint64) Event { func TestNewEventsFromBatch(t *testing.T) {
return Event{Index: index, Payload: simplePayload{key: key}} t.Run("single item", func(t *testing.T) {
} first := Event{
Topic: testTopic,
func TestFilterByKey(t *testing.T) { Index: 1234,
type testCase struct { Payload: simplePayload{key: "key"},
name string
req SubscribeRequest
events []Event
expectEvent bool
expected Event
expectedCap int
}
fn := func(t *testing.T, tc testCase) {
events := make(PayloadEvents, 0, 5)
events = append(events, tc.events...)
actual, ok := filterByKey(tc.req, events)
require.Equal(t, tc.expectEvent, ok)
if !tc.expectEvent {
return
} }
e := newEventFromBatch(SubscribeRequest{}, []Event{first})
require.Equal(t, tc.expected, actual) require.Equal(t, first, e)
// test if there was a new array allocated or not })
require.Equal(t, tc.expectedCap, cap(actual.Payload.(PayloadEvents))) t.Run("many items", func(t *testing.T) {
} events := []Event{
newSimpleEvent("foo", 9999),
var testCases = []testCase{ newSimpleEvent("foo", 9999),
{ newSimpleEvent("zee", 9999),
name: "all events match, no key or namespace", }
req: SubscribeRequest{Topic: testTopic}, req := SubscribeRequest{Topic: testTopic}
events: []Event{ e := newEventFromBatch(req, events)
newSimpleEvent("One", 102), expected := Event{
newSimpleEvent("Two", 102)}, Topic: testTopic,
expectEvent: true, Index: 9999,
expected: Event{ Payload: NewPayloadEvents(events...),
Topic: testTopic, }
Index: 102, require.Equal(t, expected, e)
Payload: PayloadEvents{ })
newSimpleEvent("One", 102),
newSimpleEvent("Two", 102)}},
expectedCap: 5,
},
{
name: "all events match, no namespace",
req: SubscribeRequest{Topic: testTopic, Key: "Same"},
events: []Event{
newSimpleEvent("Same", 103),
newSimpleEvent("Same", 103)},
expectEvent: true,
expected: Event{
Topic: testTopic,
Index: 103,
Payload: PayloadEvents{
newSimpleEvent("Same", 103),
newSimpleEvent("Same", 103)}},
expectedCap: 5,
},
{
name: "all events match, no key",
req: SubscribeRequest{Topic: testTopic, Namespace: "apps"},
events: []Event{
newNSEvent("Something", "apps"),
newNSEvent("Other", "apps")},
expectEvent: true,
expected: Event{
Topic: testTopic,
Index: 22,
Payload: PayloadEvents{
newNSEvent("Something", "apps"),
newNSEvent("Other", "apps")}},
expectedCap: 5,
},
{
name: "some evens match, no namespace",
req: SubscribeRequest{Topic: testTopic, Key: "Same"},
events: []Event{
newSimpleEvent("Same", 104),
newSimpleEvent("Other", 104),
newSimpleEvent("Same", 104)},
expectEvent: true,
expected: Event{
Topic: testTopic,
Index: 104,
Payload: PayloadEvents{
newSimpleEvent("Same", 104),
newSimpleEvent("Same", 104)}},
expectedCap: 2,
},
{
name: "some events match, no key",
req: SubscribeRequest{Topic: testTopic, Namespace: "apps"},
events: []Event{
newNSEvent("app1", "apps"),
newNSEvent("db1", "dbs"),
newNSEvent("app2", "apps")},
expectEvent: true,
expected: Event{
Topic: testTopic,
Index: 22,
Payload: PayloadEvents{
newNSEvent("app1", "apps"),
newNSEvent("app2", "apps")}},
expectedCap: 2,
},
{
name: "no events match key",
req: SubscribeRequest{Topic: testTopic, Key: "Other"},
events: []Event{
newSimpleEvent("Same", 0),
newSimpleEvent("Same", 0)},
},
{
name: "no events match namespace",
req: SubscribeRequest{Topic: testTopic, Namespace: "apps"},
events: []Event{
newNSEvent("app1", "group1"),
newNSEvent("app2", "group2")},
expectEvent: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
fn(t, tc)
})
}
}
func newNSEvent(key, namespace string) Event {
return Event{Index: 22, Payload: nsPayload{key: key, namespace: namespace}}
}
type nsPayload struct {
key string
namespace string
value string
}
func (p nsPayload) FilterByKey(key, namespace string) bool {
return (key == "" || key == p.key) && (namespace == "" || namespace == p.namespace)
} }

Loading…
Cancel
Save