stream: Return a single event from a subscription.Next

Handle batch events as a single event
pull/8818/head
Daniel Nephin 2020-10-05 12:38:38 -04:00
parent f5d11562f2
commit b27068b72a
7 changed files with 208 additions and 202 deletions

View File

@ -5,10 +5,11 @@ import (
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/require"
)
func TestStore_IntegrationWithEventPublisher_ACLTokenUpdate(t *testing.T) {
@ -294,7 +295,7 @@ func TestStore_IntegrationWithEventPublisher_ACLRoleUpdate(t *testing.T) {
}
type nextResult struct {
Events []stream.Event
Event stream.Event
Err error
}
@ -304,7 +305,7 @@ func testRunSub(sub *stream.Subscription) <-chan nextResult {
for {
es, err := sub.Next(context.TODO())
eventCh <- nextResult{
Events: es,
Event: es,
Err: err,
}
if err != nil {
@ -320,8 +321,8 @@ func assertNoEvent(t *testing.T, eventCh <-chan nextResult) {
select {
case next := <-eventCh:
require.NoError(t, next.Err)
require.Len(t, next.Events, 1)
t.Fatalf("got unwanted event: %#v", next.Events[0].Payload)
require.Len(t, next.Event, 1)
t.Fatalf("got unwanted event: %#v", next.Event.Payload)
case <-time.After(100 * time.Millisecond):
}
}
@ -331,8 +332,7 @@ func assertEvent(t *testing.T, eventCh <-chan nextResult) *stream.Event {
select {
case next := <-eventCh:
require.NoError(t, next.Err)
require.Len(t, next.Events, 1)
return &next.Events[0]
return &next.Event
case <-time.After(100 * time.Millisecond):
t.Fatalf("no event after 100ms")
}
@ -362,7 +362,7 @@ func assertReset(t *testing.T, eventCh <-chan nextResult, allowEOS bool) {
select {
case next := <-eventCh:
if allowEOS {
if next.Err == nil && len(next.Events) == 1 && next.Events[0].IsEndOfSnapshot() {
if next.Err == nil && next.Event.IsEndOfSnapshot() {
continue
}
}

View File

@ -19,6 +19,51 @@ type Event struct {
Payload interface{}
}
// Len returns the number of events contained within this event. If the Payload
// is a []Event, the length of that slice is returned. Otherwise 1 is returned.
func (e Event) Len() int {
if batch, ok := e.Payload.([]Event); ok {
return len(batch)
}
return 1
}
// Filter returns an Event filtered to only those Events where f returns true.
// If the second return value is false, every Event was removed by the filter.
func (e Event) Filter(f func(Event) bool) (Event, bool) {
batch, ok := e.Payload.([]Event)
if !ok {
return e, f(e)
}
// To avoid extra allocations, iterate over the list of events first and
// get a count of the total desired size. This trades off some extra cpu
// time in the worse case (when not all items match the filter), for
// fewer memory allocations.
var size int
for idx := range batch {
if f(batch[idx]) {
size++
}
}
if len(batch) == size || size == 0 {
return e, size != 0
}
filtered := make([]Event, 0, size)
for idx := range batch {
event := batch[idx]
if f(event) {
filtered = append(filtered, event)
}
}
if len(filtered) == 0 {
return e, false
}
e.Payload = filtered
return e, true
}
// IsEndOfSnapshot returns true if this is a framing event that indicates the
// snapshot has completed. Subsequent events from Subscription.Next will be
// streamed as they occur.

View File

@ -32,13 +32,11 @@ func TestEventPublisher_SubscribeWithIndex0(t *testing.T) {
require.NoError(t, err)
eventCh := runSubscription(ctx, sub)
next := getNextEvents(t, eventCh)
expected := []Event{testSnapshotEvent}
require.Equal(t, expected, next)
next := getNextEvent(t, eventCh)
require.Equal(t, testSnapshotEvent, next)
next = getNextEvents(t, eventCh)
require.Len(t, next, 1)
require.True(t, next[0].IsEndOfSnapshot())
next = getNextEvent(t, eventCh)
require.True(t, next.IsEndOfSnapshot())
assertNoResult(t, eventCh)
@ -50,8 +48,8 @@ func TestEventPublisher_SubscribeWithIndex0(t *testing.T) {
publisher.Publish(events)
// Subscriber should see the published event
next = getNextEvents(t, eventCh)
expected = []Event{{Payload: "the-published-event-payload", Key: "sub-key", Topic: testTopic}}
next = getNextEvent(t, eventCh)
expected := Event{Payload: "the-published-event-payload", Key: "sub-key", Topic: testTopic}
require.Equal(t, expected, next)
}
@ -80,7 +78,7 @@ func runSubscription(ctx context.Context, sub *Subscription) <-chan eventOrErr {
for {
es, err := sub.Next(ctx)
eventCh <- eventOrErr{
Events: es,
Event: es,
Err: err,
}
if err != nil {
@ -92,19 +90,19 @@ func runSubscription(ctx context.Context, sub *Subscription) <-chan eventOrErr {
}
type eventOrErr struct {
Events []Event
Event Event
Err error
}
func getNextEvents(t *testing.T, eventCh <-chan eventOrErr) []Event {
func getNextEvent(t *testing.T, eventCh <-chan eventOrErr) Event {
t.Helper()
select {
case next := <-eventCh:
require.NoError(t, next.Err)
return next.Events
return next.Event
case <-time.After(100 * time.Millisecond):
t.Fatalf("timeout waiting for event from subscription")
return nil
return Event{}
}
}
@ -113,8 +111,7 @@ func assertNoResult(t *testing.T, eventCh <-chan eventOrErr) {
select {
case next := <-eventCh:
require.NoError(t, next.Err)
require.Len(t, next.Events, 1)
t.Fatalf("received unexpected event: %#v", next.Events[0].Payload)
t.Fatalf("received unexpected event: %#v", next.Event.Payload)
case <-time.After(25 * time.Millisecond):
}
}
@ -152,11 +149,11 @@ func TestEventPublisher_ShutdownClosesSubscriptions(t *testing.T) {
func consumeSub(ctx context.Context, sub *Subscription) error {
for {
events, err := sub.Next(ctx)
event, err := sub.Next(ctx)
switch {
case err != nil:
return err
case len(events) == 1 && events[0].IsEndOfSnapshot():
case event.IsEndOfSnapshot():
continue
}
}
@ -183,28 +180,25 @@ func TestEventPublisher_SubscribeWithIndex0_FromCache(t *testing.T) {
require.NoError(t, err)
eventCh := runSubscription(ctx, sub)
next := getNextEvents(t, eventCh)
expected := []Event{testSnapshotEvent}
require.Equal(t, expected, next)
next := getNextEvent(t, eventCh)
require.Equal(t, testSnapshotEvent, next)
next = getNextEvents(t, eventCh)
require.Len(t, next, 1)
require.True(t, next[0].IsEndOfSnapshot())
next = getNextEvent(t, eventCh)
require.True(t, next.IsEndOfSnapshot())
// Now subscriber should block waiting for updates
assertNoResult(t, eventCh)
events := []Event{{
expected := Event{
Topic: testTopic,
Key: "sub-key",
Payload: "the-published-event-payload",
Index: 3,
}}
publisher.Publish(events)
}
publisher.Publish([]Event{expected})
// Subscriber should see the published event
next = getNextEvents(t, eventCh)
expected = []Event{events[0]}
next = getNextEvent(t, eventCh)
require.Equal(t, expected, next)
}
@ -228,14 +222,12 @@ func TestEventPublisher_SubscribeWithIndexNotZero_CanResume(t *testing.T) {
eventCh := runSubscription(ctx, sub)
next := getNextEvents(t, eventCh)
expected := []Event{testSnapshotEvent}
require.Equal(t, expected, next)
next := getNextEvent(t, eventCh)
require.Equal(t, testSnapshotEvent, next)
next = getNextEvents(t, eventCh)
require.Len(t, next, 1)
require.True(t, next[0].IsEndOfSnapshot())
require.Equal(t, uint64(1), next[0].Index)
next = getNextEvent(t, eventCh)
require.True(t, next.IsEndOfSnapshot())
require.Equal(t, uint64(1), next.Index)
})
runStep(t, "resume the subscription", func(t *testing.T) {
@ -255,8 +247,8 @@ func TestEventPublisher_SubscribeWithIndexNotZero_CanResume(t *testing.T) {
}
publisher.publishEvent([]Event{expected})
next := getNextEvents(t, eventCh)
require.Equal(t, []Event{expected}, next)
next := getNextEvent(t, eventCh)
require.Equal(t, expected, next)
})
}
@ -280,14 +272,12 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshot(t *testing.T) {
eventCh := runSubscription(ctx, sub)
next := getNextEvents(t, eventCh)
expected := []Event{testSnapshotEvent}
require.Equal(t, expected, next)
next := getNextEvent(t, eventCh)
require.Equal(t, testSnapshotEvent, next)
next = getNextEvents(t, eventCh)
require.Len(t, next, 1)
require.True(t, next[0].IsEndOfSnapshot())
require.Equal(t, uint64(1), next[0].Index)
next = getNextEvent(t, eventCh)
require.True(t, next.IsEndOfSnapshot())
require.Equal(t, uint64(1), next.Index)
})
nextEvent := Event{
@ -308,14 +298,14 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshot(t *testing.T) {
require.NoError(t, err)
eventCh := runSubscription(ctx, sub)
next := getNextEvents(t, eventCh)
require.True(t, next[0].IsNewSnapshotToFollow(), next)
next := getNextEvent(t, eventCh)
require.True(t, next.IsNewSnapshotToFollow(), next)
next = getNextEvents(t, eventCh)
require.Equal(t, testSnapshotEvent, next[0])
next = getNextEvent(t, eventCh)
require.Equal(t, testSnapshotEvent, next)
next = getNextEvents(t, eventCh)
require.True(t, next[0].IsEndOfSnapshot())
next = getNextEvent(t, eventCh)
require.True(t, next.IsEndOfSnapshot())
})
}
@ -339,14 +329,12 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshotFromCache(t *testin
eventCh := runSubscription(ctx, sub)
next := getNextEvents(t, eventCh)
expected := []Event{testSnapshotEvent}
require.Equal(t, expected, next)
next := getNextEvent(t, eventCh)
require.Equal(t, testSnapshotEvent, next)
next = getNextEvents(t, eventCh)
require.Len(t, next, 1)
require.True(t, next[0].IsEndOfSnapshot())
require.Equal(t, uint64(1), next[0].Index)
next = getNextEvent(t, eventCh)
require.True(t, next.IsEndOfSnapshot())
require.Equal(t, uint64(1), next.Index)
})
nextEvent := Event{
@ -371,17 +359,17 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshotFromCache(t *testin
require.NoError(t, err)
eventCh := runSubscription(ctx, sub)
next := getNextEvents(t, eventCh)
require.True(t, next[0].IsNewSnapshotToFollow(), next)
next := getNextEvent(t, eventCh)
require.True(t, next.IsNewSnapshotToFollow(), next)
next = getNextEvents(t, eventCh)
require.Equal(t, testSnapshotEvent, next[0])
next = getNextEvent(t, eventCh)
require.Equal(t, testSnapshotEvent, next)
next = getNextEvents(t, eventCh)
require.True(t, next[0].IsEndOfSnapshot())
next = getNextEvent(t, eventCh)
require.True(t, next.IsEndOfSnapshot())
next = getNextEvents(t, eventCh)
require.Equal(t, nextEvent, next[0])
next = getNextEvent(t, eventCh)
require.Equal(t, nextEvent, next)
})
}

View File

@ -65,59 +65,56 @@ func newSubscription(req SubscribeRequest, item *bufferItem, unsub func()) *Subs
}
}
// Next returns the next set of events to deliver. It must only be called from a
// Next returns the next Event to deliver. It must only be called from a
// single goroutine concurrently as it mutates the Subscription.
func (s *Subscription) Next(ctx context.Context) ([]Event, error) {
func (s *Subscription) Next(ctx context.Context) (Event, error) {
if atomic.LoadUint32(&s.state) == subscriptionStateClosed {
return nil, ErrSubscriptionClosed
return Event{}, ErrSubscriptionClosed
}
for {
next, err := s.currentItem.Next(ctx, s.forceClosed)
switch {
case err != nil && atomic.LoadUint32(&s.state) == subscriptionStateClosed:
return nil, ErrSubscriptionClosed
return Event{}, ErrSubscriptionClosed
case err != nil:
return nil, err
return Event{}, err
}
s.currentItem = next
events := filter(s.req.Key, next.Events)
if len(events) == 0 {
if len(next.Events) == 0 {
continue
}
return events, nil
event, ok := filterByKey(s.req, next.Events)
if !ok {
continue
}
return event, nil
}
}
// filter events to only those that match the key exactly.
func filter(key string, events []Event) []Event {
if key == "" || len(events) == 0 {
return events
func newEventFromBatch(req SubscribeRequest, events []Event) Event {
first := events[0]
if len(events) == 1 {
return first
}
return Event{
Topic: req.Topic,
Key: req.Key,
Index: first.Index,
Payload: events,
}
}
func filterByKey(req SubscribeRequest, events []Event) (Event, bool) {
event := newEventFromBatch(req, events)
if req.Key == "" {
return event, true
}
var count int
for _, e := range events {
if key == e.Key {
count++
fn := func(e Event) bool {
return req.Key == e.Key
}
}
// Only allocate a new slice if some events need to be filtered out
switch count {
case 0:
return nil
case len(events):
return events
}
result := make([]Event, 0, count)
for _, e := range events {
if key == e.Key {
result = append(result, e)
}
}
return result
return event.Filter(fn)
}
// Close the subscription. Subscribers will receive an error when they call Next,

View File

@ -36,8 +36,7 @@ func TestSubscription(t *testing.T) {
require.NoError(t, err)
require.True(t, elapsed < 200*time.Millisecond,
"Event should have been delivered immediately, took %s", elapsed)
require.Len(t, got, 1)
require.Equal(t, index, got[0].Index)
require.Equal(t, index, got.Index)
// Schedule an event publish in a while
index++
@ -54,8 +53,7 @@ func TestSubscription(t *testing.T) {
"Event should have been delivered after blocking 200ms, took %s", elapsed)
require.True(t, elapsed < 2*time.Second,
"Event should have been delivered after short time, took %s", elapsed)
require.Len(t, got, 1)
require.Equal(t, index, got[0].Index)
require.Equal(t, index, got.Index)
// Event with wrong key should not be delivered. Deliver a good message right
// so we don't have to block test thread forever or cancel func yet.
@ -70,9 +68,8 @@ func TestSubscription(t *testing.T) {
require.NoError(t, err)
require.True(t, elapsed < 200*time.Millisecond,
"Event should have been delivered immediately, took %s", elapsed)
require.Len(t, got, 1)
require.Equal(t, index, got[0].Index)
require.Equal(t, "test", got[0].Key)
require.Equal(t, index, got.Index)
require.Equal(t, "test", got.Key)
// Cancelling the subscription context should unblock Next
start = time.Now()
@ -91,9 +88,7 @@ func TestSubscription(t *testing.T) {
func TestSubscription_Close(t *testing.T) {
eb := newEventBuffer()
index := uint64(100)
startHead := eb.Head()
// Start with an event in the buffer
@ -115,8 +110,7 @@ func TestSubscription_Close(t *testing.T) {
require.NoError(t, err)
require.True(t, elapsed < 200*time.Millisecond,
"Event should have been delivered immediately, took %s", elapsed)
require.Len(t, got, 1)
require.Equal(t, index, got[0].Index)
require.Equal(t, index, got.Index)
// Schedule a Close simulating the server deciding this subscroption
// needs to reset (e.g. on ACL perm change).
@ -149,46 +143,55 @@ func publishTestEvent(index uint64, b *eventBuffer, key string) {
func TestFilter_NoKey(t *testing.T) {
events := make([]Event, 0, 5)
events = append(events, Event{Key: "One"}, Event{Key: "Two"})
events = append(events, Event{Key: "One", Index: 102}, Event{Key: "Two"})
actual := filter("", events)
require.Equal(t, events, actual)
req := SubscribeRequest{Topic: testTopic}
actual, ok := filterByKey(req, events)
require.True(t, ok)
require.Equal(t, Event{Topic: testTopic, Index: 102, Payload: events}, actual)
// test that a new array was not allocated
require.Equal(t, cap(actual), 5)
require.Equal(t, cap(actual.Payload.([]Event)), 5)
}
func TestFilter_WithKey_AllEventsMatch(t *testing.T) {
events := make([]Event, 0, 5)
events = append(events, Event{Key: "Same"}, Event{Key: "Same"})
events = append(events, Event{Key: "Same", Index: 103}, Event{Key: "Same"})
actual := filter("Same", events)
require.Equal(t, events, actual)
req := SubscribeRequest{Topic: testTopic, Key: "Same"}
actual, ok := filterByKey(req, events)
require.True(t, ok)
expected := Event{Topic: testTopic, Index: 103, Key: "Same", Payload: events}
require.Equal(t, expected, actual)
// test that a new array was not allocated
require.Equal(t, cap(actual), 5)
require.Equal(t, 5, cap(actual.Payload.([]Event)))
}
func TestFilter_WithKey_SomeEventsMatch(t *testing.T) {
events := make([]Event, 0, 5)
events = append(events, Event{Key: "Same"}, Event{Key: "Other"}, Event{Key: "Same"})
events = append(events, Event{Key: "Same", Index: 104}, Event{Key: "Other"}, Event{Key: "Same"})
actual := filter("Same", events)
expected := []Event{{Key: "Same"}, {Key: "Same"}}
req := SubscribeRequest{Topic: testTopic, Key: "Same"}
actual, ok := filterByKey(req, events)
require.True(t, ok)
expected := Event{
Topic: testTopic,
Index: 104,
Key: "Same",
Payload: []Event{{Key: "Same", Index: 104}, {Key: "Same"}},
}
require.Equal(t, expected, actual)
// test that a new array was allocated with the correct size
require.Equal(t, cap(actual), 2)
require.Equal(t, cap(actual.Payload.([]Event)), 2)
}
func TestFilter_WithKey_NoEventsMatch(t *testing.T) {
events := make([]Event, 0, 5)
events = append(events, Event{Key: "Same"}, Event{Key: "Same"})
actual := filter("Other", events)
var expected []Event
require.Equal(t, expected, actual)
// test that no array was allocated
require.Equal(t, cap(actual), 0)
req := SubscribeRequest{Topic: testTopic, Key: "Other"}
_, ok := filterByKey(req, events)
require.False(t, ok)
}

View File

@ -52,21 +52,17 @@ type eventLogger struct {
count uint64
}
func (l *eventLogger) Trace(e []stream.Event) {
if len(e) == 0 {
return
}
first := e[0]
func (l *eventLogger) Trace(e stream.Event) {
switch {
case first.IsEndOfSnapshot():
case e.IsEndOfSnapshot():
l.snapshotDone = true
l.logger.Trace("snapshot complete", "index", first.Index, "sent", l.count)
case first.IsNewSnapshotToFollow():
l.logger.Trace("snapshot complete", "index", e.Index, "sent", l.count)
case e.IsNewSnapshotToFollow():
l.logger.Trace("starting new snapshot", "sent", l.count)
return
case l.snapshotDone:
l.logger.Trace("sending events", "index", first.Index, "sent", l.count, "batch_size", len(e))
l.logger.Trace("sending events", "index", e.Index, "sent", l.count, "batch_size", e.Len())
}
l.count += uint64(len(e))
l.count += uint64(e.Len())
}

View File

@ -67,7 +67,7 @@ func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsub
elog := &eventLogger{logger: logger}
for {
events, err := sub.Next(ctx)
event, err := sub.Next(ctx)
switch {
case errors.Is(err, stream.ErrSubscriptionClosed):
logger.Trace("subscription reset by server")
@ -76,13 +76,14 @@ func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsub
return err
}
events = filterStreamEvents(authz, events)
if len(events) == 0 {
var ok bool
event, ok = filterByAuth(authz, event)
if !ok {
continue
}
elog.Trace(events)
e := newEventFromStreamEvents(req, events)
elog.Trace(event)
e := newEventFromStreamEvent(req, event)
if err := serverStream.Send(e); err != nil {
return err
}
@ -126,45 +127,24 @@ func forwardToDC(
}
}
// filterStreamEvents to only those allowed by the acl token.
func filterStreamEvents(authz acl.Authorizer, events []stream.Event) []stream.Event {
// filterByAuth to only those Events allowed by the acl token.
func filterByAuth(authz acl.Authorizer, event stream.Event) (stream.Event, bool) {
// authz will be nil when ACLs are disabled
if authz == nil || len(events) == 0 {
return events
if authz == nil {
return event, true
}
// Fast path for the common case of only 1 event since we can avoid slice
// allocation in the hot path of every single update event delivered in vast
// majority of cases with this. Note that this is called _per event/item_ when
// sending snapshots which is a lot worse than being called once on regular
// result.
if len(events) == 1 {
if enforceACL(authz, events[0]) == acl.Allow {
return events
fn := func(e stream.Event) bool {
return enforceACL(authz, e) == acl.Allow
}
return nil
}
var filtered []stream.Event
for idx := range events {
event := events[idx]
if enforceACL(authz, event) == acl.Allow {
filtered = append(filtered, event)
}
}
return filtered
return event.Filter(fn)
}
func newEventFromStreamEvents(req *pbsubscribe.SubscribeRequest, events []stream.Event) *pbsubscribe.Event {
func newEventFromStreamEvent(req *pbsubscribe.SubscribeRequest, event stream.Event) *pbsubscribe.Event {
e := &pbsubscribe.Event{
Topic: req.Topic,
Key: req.Key,
Index: events[0].Index,
Index: event.Index,
}
if len(events) == 1 {
event := events[0]
// TODO: refactor so these are only checked once, instead of 3 times.
switch {
case event.IsEndOfSnapshot():
e.Payload = &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true}
@ -173,21 +153,18 @@ func newEventFromStreamEvents(req *pbsubscribe.SubscribeRequest, events []stream
e.Payload = &pbsubscribe.Event_NewSnapshotToFollow{NewSnapshotToFollow: true}
return e
}
setPayload(e, event.Payload)
return e
}
e.Payload = &pbsubscribe.Event_EventBatch{
EventBatch: &pbsubscribe.EventBatch{
Events: batchEventsFromEventSlice(events),
},
}
return e
}
func setPayload(e *pbsubscribe.Event, payload interface{}) {
switch p := payload.(type) {
case []stream.Event:
e.Payload = &pbsubscribe.Event_EventBatch{
EventBatch: &pbsubscribe.EventBatch{
Events: batchEventsFromEventSlice(p),
},
}
case state.EventPayloadCheckServiceNode:
e.Payload = &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{