streaming: split event buffer by key (#12080)

pull/12225/head
Dan Upton 2022-01-28 12:27:00 +00:00 committed by GitHub
parent c1cb58bdcb
commit fdfe079674
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 429 additions and 376 deletions

3
.changelog/12080.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:enhancement
streaming: Improved performance when the server is handling many concurrent subscriptions and has a high number of CPU cores
```

View File

@ -32,31 +32,26 @@ func (e EventPayloadCheckServiceNode) HasReadPermission(authz acl.Authorizer) bo
return e.Value.CanRead(authz) == acl.Allow
}
func (e EventPayloadCheckServiceNode) MatchesKey(key, namespace, partition string) bool {
if key == "" && namespace == "" && partition == "" {
return true
}
if e.Value.Service == nil {
return false
}
name := e.Value.Service.Service
if e.overrideKey != "" {
name = e.overrideKey
}
ns := e.Value.Service.EnterpriseMeta.NamespaceOrDefault()
if e.overrideNamespace != "" {
ns = e.overrideNamespace
}
ap := e.Value.Service.EnterpriseMeta.PartitionOrDefault()
func (e EventPayloadCheckServiceNode) Subject() stream.Subject {
partition := e.Value.Service.PartitionOrDefault()
if e.overridePartition != "" {
ap = e.overridePartition
partition = e.overridePartition
}
partition = strings.ToLower(partition)
return (key == "" || strings.EqualFold(key, name)) &&
(namespace == "" || strings.EqualFold(namespace, ns)) &&
(partition == "" || strings.EqualFold(partition, ap))
namespace := e.Value.Service.NamespaceOrDefault()
if e.overrideNamespace != "" {
namespace = e.overrideNamespace
}
namespace = strings.ToLower(namespace)
key := e.Value.Service.Service
if e.overrideKey != "" {
key = e.overrideKey
}
key = strings.ToLower(key)
return stream.Subject(partition + "/" + namespace + "/" + key)
}
// serviceHealthSnapshot returns a stream.SnapshotFunc that provides a snapshot
@ -67,8 +62,7 @@ func serviceHealthSnapshot(db ReadDB, topic stream.Topic) stream.SnapshotFunc {
defer tx.Abort()
connect := topic == topicServiceHealthConnect
entMeta := structs.NewEnterpriseMetaWithPartition(req.Partition, req.Namespace)
idx, nodes, err := checkServiceNodesTxn(tx, nil, req.Key, connect, &entMeta)
idx, nodes, err := checkServiceNodesTxn(tx, nil, req.Key, connect, &req.EnterpriseMeta)
if err != nil {
return 0, err
}

View File

@ -11,11 +11,106 @@ import (
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/proto/pbcommon"
"github.com/hashicorp/consul/proto/pbsubscribe"
"github.com/hashicorp/consul/types"
)
func TestEventPayloadCheckServiceNode_SubjectMatchesRequests(t *testing.T) {
// Matches.
for desc, tc := range map[string]struct {
evt EventPayloadCheckServiceNode
req stream.SubscribeRequest
}{
"default partition and namespace": {
EventPayloadCheckServiceNode{
Value: &structs.CheckServiceNode{
Service: &structs.NodeService{
Service: "foo",
},
},
},
stream.SubscribeRequest{
Key: "foo",
EnterpriseMeta: structs.EnterpriseMeta{},
},
},
"mixed casing": {
EventPayloadCheckServiceNode{
Value: &structs.CheckServiceNode{
Service: &structs.NodeService{
Service: "FoO",
},
},
},
stream.SubscribeRequest{Key: "foo"},
},
"override key": {
EventPayloadCheckServiceNode{
Value: &structs.CheckServiceNode{
Service: &structs.NodeService{
Service: "foo",
},
},
overrideKey: "bar",
},
stream.SubscribeRequest{Key: "bar"},
},
} {
t.Run(desc, func(t *testing.T) {
require.Equal(t, tc.req.Subject(), tc.evt.Subject())
})
}
// Non-matches.
for desc, tc := range map[string]struct {
evt EventPayloadCheckServiceNode
req stream.SubscribeRequest
}{
"different key": {
EventPayloadCheckServiceNode{
Value: &structs.CheckServiceNode{
Service: &structs.NodeService{
Service: "foo",
},
},
},
stream.SubscribeRequest{
Key: "bar",
},
},
"different partition": {
EventPayloadCheckServiceNode{
Value: &structs.CheckServiceNode{
Service: &structs.NodeService{
Service: "foo",
},
},
overridePartition: "bar",
},
stream.SubscribeRequest{
Key: "foo",
},
},
"different namespace": {
EventPayloadCheckServiceNode{
Value: &structs.CheckServiceNode{
Service: &structs.NodeService{
Service: "foo",
},
},
overrideNamespace: "bar",
},
stream.SubscribeRequest{
Key: "foo",
},
},
} {
t.Run(desc, func(t *testing.T) {
require.NotEqual(t, tc.req.Subject(), tc.evt.Subject())
})
}
}
func TestServiceHealthSnapshot(t *testing.T) {
store := NewStateStore(nil)
@ -1771,7 +1866,7 @@ func assertDeepEqual(t *testing.T, x, y interface{}, opts ...cmp.Option) {
// all events for a particular topic are grouped together. The sort is
// stable so events with the same key retain their relative order.
//
// This sort should match the logic in EventPayloadCheckServiceNode.MatchesKey
// This sort should match the logic in EventPayloadCheckServiceNode.Subject
// to avoid masking bugs.
var cmpPartialOrderEvents = cmp.Options{
cmpopts.SortSlices(func(i, j stream.Event) bool {
@ -2418,107 +2513,6 @@ func newTestEventServiceHealthDeregister(index uint64, nodeNum int, svc string)
}
}
func TestEventPayloadCheckServiceNode_FilterByKey(t *testing.T) {
type testCase struct {
name string
payload EventPayloadCheckServiceNode
key string
namespace string
partition string // TODO(partitions): create test cases for this being set
expected bool
}
fn := func(t *testing.T, tc testCase) {
if tc.namespace != "" && pbcommon.DefaultEnterpriseMeta.Namespace == "" {
t.Skip("cant test namespace matching without namespace support")
}
require.Equal(t, tc.expected, tc.payload.MatchesKey(tc.key, tc.namespace, tc.partition))
}
var testCases = []testCase{
{
name: "no key or namespace",
payload: newPayloadCheckServiceNode("srv1", "ns1"),
expected: true,
},
{
name: "no key, with namespace match",
payload: newPayloadCheckServiceNode("srv1", "ns1"),
namespace: "ns1",
expected: true,
},
{
name: "no namespace, with key match",
payload: newPayloadCheckServiceNode("srv1", "ns1"),
key: "srv1",
expected: true,
},
{
name: "key match, namespace mismatch",
payload: newPayloadCheckServiceNode("srv1", "ns1"),
key: "srv1",
namespace: "ns2",
expected: false,
},
{
name: "key mismatch, namespace match",
payload: newPayloadCheckServiceNode("srv1", "ns1"),
key: "srv2",
namespace: "ns1",
expected: false,
},
{
name: "override key match",
payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "srv1", ""),
key: "srv1",
namespace: "ns1",
expected: true,
},
{
name: "override key mismatch",
payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "srv2", ""),
key: "proxy",
namespace: "ns1",
expected: false,
},
{
name: "override namespace match",
payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "", "ns2"),
key: "proxy",
namespace: "ns2",
expected: true,
},
{
name: "override namespace mismatch",
payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "", "ns3"),
key: "proxy",
namespace: "ns1",
expected: false,
},
{
name: "override both key and namespace match",
payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "srv1", "ns2"),
key: "srv1",
namespace: "ns2",
expected: true,
},
{
name: "override both key and namespace mismatch namespace",
payload: newPayloadCheckServiceNodeWithOverride("proxy", "ns1", "srv2", "ns3"),
key: "proxy",
namespace: "ns1",
expected: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
fn(t, tc)
})
}
}
func newPayloadCheckServiceNode(service, namespace string) EventPayloadCheckServiceNode {
return EventPayloadCheckServiceNode{
Value: &structs.CheckServiceNode{

View File

@ -419,26 +419,14 @@ type nodePayload struct {
node *structs.ServiceNode
}
func (p nodePayload) MatchesKey(key, _, partition string) bool {
if key == "" && partition == "" {
return true
}
if p.node == nil {
return false
}
if structs.PartitionOrDefault(partition) != p.node.PartitionOrDefault() {
return false
}
return p.key == key
}
func (p nodePayload) HasReadPermission(acl.Authorizer) bool {
return true
}
func (p nodePayload) Subject() stream.Subject {
return stream.Subject(p.node.PartitionOrDefault() + "/" + p.node.NamespaceOrDefault() + "/" + p.key)
}
func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLToken {
token := &structs.ACLToken{
AccessorID: "3af117a9-2233-4cf4-8ff8-3c749c9906b4",

View File

@ -14,6 +14,11 @@ import (
// events which match the Topic.
type Topic fmt.Stringer
// Subject identifies a portion of a topic for which a subscriber wishes to
// receive events (e.g. health events for a particular service) usually the
// normalized resource name (including partition and namespace if applicable).
type Subject string
// Event is a structure with identifiers and a payload. Events are Published to
// EventPublisher and returned to Subscribers.
type Event struct {
@ -26,18 +31,16 @@ type Event struct {
// should not modify the state of the payload if the Event is being submitted to
// EventPublisher.Publish.
type Payload interface {
// MatchesKey must return true if the Payload should be included in a
// subscription requested with the key, namespace, and partition.
//
// Generally this means that the payload matches the key, namespace, and
// partition or the payload is a special framing event that should be
// returned to every subscription.
MatchesKey(key, namespace, partition string) bool
// HasReadPermission uses the acl.Authorizer to determine if the items in the
// Payload are visible to the request. It returns true if the payload is
// authorized for Read, otherwise returns false.
HasReadPermission(authz acl.Authorizer) bool
// Subject is used to identify which subscribers should be notified of this
// event - e.g. those subscribing to health events for a particular service.
// it is usually the normalized resource name (including the partition and
// namespace if applicable).
Subject() Subject
}
// PayloadEvents is a Payload that may be returned by Subscription.Next when
@ -81,14 +84,6 @@ func (p *PayloadEvents) filter(f func(Event) bool) bool {
return true
}
// MatchesKey filters the PayloadEvents to those which match the key,
// namespace, and partition.
func (p *PayloadEvents) MatchesKey(key, namespace, partition string) bool {
return p.filter(func(event Event) bool {
return event.Payload.MatchesKey(key, namespace, partition)
})
}
func (p *PayloadEvents) Len() int {
return len(p.Items)
}
@ -101,6 +96,14 @@ func (p *PayloadEvents) HasReadPermission(authz acl.Authorizer) bool {
})
}
// Subject is required to satisfy the Payload interface but is not implemented
// by PayloadEvents. PayloadEvents structs are constructed by Subscription.Next
// *after* Subject has been used to dispatch the enclosed events to the correct
// buffer.
func (PayloadEvents) Subject() Subject {
panic("PayloadEvents does not implement Subject")
}
// IsEndOfSnapshot returns true if this is a framing event that indicates the
// snapshot has completed. Subsequent events from Subscription.Next will be
// streamed as they occur.
@ -117,12 +120,15 @@ func (e Event) IsNewSnapshotToFollow() bool {
type framingEvent struct{}
func (framingEvent) MatchesKey(string, string, string) bool {
func (framingEvent) HasReadPermission(acl.Authorizer) bool {
return true
}
func (framingEvent) HasReadPermission(acl.Authorizer) bool {
return true
// Subject is required by the Payload interface but is not implemented by
// framing events, as they are typically *manually* appended to the correct
// buffer and do not need to be routed using a Subject.
func (framingEvent) Subject() Subject {
panic("framing events do not implement Subject")
}
type endOfSnapshot struct {
@ -137,12 +143,15 @@ type closeSubscriptionPayload struct {
tokensSecretIDs []string
}
func (closeSubscriptionPayload) MatchesKey(string, string, string) bool {
func (closeSubscriptionPayload) HasReadPermission(acl.Authorizer) bool {
return false
}
func (closeSubscriptionPayload) HasReadPermission(acl.Authorizer) bool {
return false
// Subject is required by the Payload interface but it is not implemented by
// closeSubscriptionPayload, as this event type is handled separately and not
// actually appended to the buffer.
func (closeSubscriptionPayload) Subject() Subject {
panic("closeSubscriptionPayload does not implement Subject")
}
// NewCloseSubscriptionEvent returns a special Event that is handled by the

View File

@ -20,16 +20,16 @@ type EventPublisher struct {
// seconds.
snapCacheTTL time.Duration
// This lock protects the topicBuffers, and snapCache
// This lock protects the snapCache, topicBuffers and topicBuffer.refs.
lock sync.RWMutex
// topicBuffers stores the head of the linked-list buffer to publish events to
// topicBuffers stores the head of the linked-list buffers to publish events to
// for a topic.
topicBuffers map[Topic]*eventBuffer
topicBuffers map[topicSubject]*topicBuffer
// snapCache if a cache of EventSnapshots indexed by topic and key.
// snapCache if a cache of EventSnapshots indexed by topic and subject.
// TODO(streaming): new snapshotCache struct for snapCache and snapCacheTTL
snapCache map[Topic]map[string]*eventSnapshot
snapCache map[topicSubject]*eventSnapshot
subscriptions *subscriptions
@ -41,6 +41,13 @@ type EventPublisher struct {
snapshotHandlers SnapshotHandlers
}
// topicSubject is used as a map key when accessing topic buffers and cached
// snapshots.
type topicSubject struct {
Topic Topic
Subject Subject
}
type subscriptions struct {
// lock for byToken. If both subscription.lock and EventPublisher.lock need
// to be held, EventPublisher.lock MUST always be acquired first.
@ -54,6 +61,14 @@ type subscriptions struct {
byToken map[string]map[*SubscribeRequest]*Subscription
}
// topicBuffer augments the eventBuffer with a reference counter, enabling
// clean up of unused buffers once there are no longer any subscribers for
// the given topic and key.
type topicBuffer struct {
refs int // refs is guarded by EventPublisher.lock.
buf *eventBuffer
}
// SnapshotHandlers is a mapping of Topic to a function which produces a snapshot
// of events for the SubscribeRequest. Events are appended to the snapshot using SnapshotAppender.
// The nil Topic is reserved and should not be used.
@ -79,8 +94,8 @@ type SnapshotAppender interface {
func NewEventPublisher(handlers SnapshotHandlers, snapCacheTTL time.Duration) *EventPublisher {
e := &EventPublisher{
snapCacheTTL: snapCacheTTL,
topicBuffers: make(map[Topic]*eventBuffer),
snapCache: make(map[Topic]map[string]*eventSnapshot),
topicBuffers: make(map[topicSubject]*topicBuffer),
snapCache: make(map[topicSubject]*eventSnapshot),
publishCh: make(chan []Event, 64),
subscriptions: &subscriptions{
byToken: make(map[string]map[*SubscribeRequest]*Subscription),
@ -116,36 +131,59 @@ func (e *EventPublisher) Run(ctx context.Context) {
// publishEvent appends the events to any applicable topic buffers. It handles
// any closeSubscriptionPayload events by closing associated subscriptions.
func (e *EventPublisher) publishEvent(events []Event) {
eventsByTopic := make(map[Topic][]Event)
groupedEvents := make(map[topicSubject][]Event)
for _, event := range events {
if unsubEvent, ok := event.Payload.(closeSubscriptionPayload); ok {
e.subscriptions.closeSubscriptionsForTokens(unsubEvent.tokensSecretIDs)
continue
}
eventsByTopic[event.Topic] = append(eventsByTopic[event.Topic], event)
groupKey := topicSubject{event.Topic, event.Payload.Subject()}
groupedEvents[groupKey] = append(groupedEvents[groupKey], event)
}
e.lock.Lock()
defer e.lock.Unlock()
for topic, events := range eventsByTopic {
e.getTopicBuffer(topic).Append(events)
for groupKey, events := range groupedEvents {
// Note: bufferForPublishing returns nil if there are no subscribers for the
// given topic and subject, in which case events will be dropped on the floor and
// future subscribers will catch up by consuming the snapshot.
if buf := e.bufferForPublishing(groupKey); buf != nil {
buf.Append(events)
}
}
}
// getTopicBuffer for the topic. Creates a new event buffer if one does not
// already exist.
// bufferForSubscription returns the topic event buffer to which events for the
// given topic and key will be appended. If no such buffer exists, a new buffer
// will be created.
//
// EventPublisher.lock must be held to call this method.
func (e *EventPublisher) getTopicBuffer(topic Topic) *eventBuffer {
buf, ok := e.topicBuffers[topic]
// Warning: e.lock MUST be held when calling this function.
func (e *EventPublisher) bufferForSubscription(key topicSubject) *topicBuffer {
buf, ok := e.topicBuffers[key]
if !ok {
buf = newEventBuffer()
e.topicBuffers[topic] = buf
buf = &topicBuffer{
buf: newEventBuffer(),
}
e.topicBuffers[key] = buf
}
return buf
}
// bufferForPublishing returns the event buffer to which events for the given
// topic and key should be appended. nil will be returned if there are no
// subscribers for the given topic and key.
//
// Warning: e.lock MUST be held when calling this function.
func (e *EventPublisher) bufferForPublishing(key topicSubject) *eventBuffer {
buf, ok := e.topicBuffers[key]
if !ok {
return nil
}
return buf.buf
}
// Subscribe returns a new Subscription for the given request. A subscription
// will receive an initial snapshot of events matching the request if req.Index > 0.
// After the snapshot, events will be streamed as they are created.
@ -163,7 +201,34 @@ func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error)
e.lock.Lock()
defer e.lock.Unlock()
topicHead := e.getTopicBuffer(req.Topic).Head()
topicBuf := e.bufferForSubscription(req.topicSubject())
topicBuf.refs++
// freeBuf is used to free the topic buffer once there are no remaining
// subscribers for the given topic and key.
//
// Note: it's called by Subcription.Unsubscribe which has its own side-effects
// that are made without holding e.lock (so there's a moment where the ref
// counter is inconsistent with the subscription map) — in practice this is
// fine, we don't need these things to be strongly consistent. The alternative
// would be to hold both locks, which introduces the risk of deadlocks.
freeBuf := func() {
e.lock.Lock()
defer e.lock.Unlock()
topicBuf.refs--
if topicBuf.refs == 0 {
delete(e.topicBuffers, req.topicSubject())
// Evict cached snapshot too because the topic buffer will have been spliced
// onto it. If we don't do this, any new subscribers started before the cache
// TTL is reached will get "stuck" waiting on the old buffer.
delete(e.snapCache, req.topicSubject())
}
}
topicHead := topicBuf.buf.Head()
// If the client view is fresh, resume the stream.
if req.Index > 0 && topicHead.HasEventIndex(req.Index) {
@ -173,7 +238,7 @@ func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error)
// the subscription will receive new events.
next, _ := topicHead.NextNoBlock()
buf.AppendItem(next)
return e.subscriptions.add(req, subscriptionHead), nil
return e.subscriptions.add(req, subscriptionHead, freeBuf), nil
}
snapFromCache := e.getCachedSnapshotLocked(req)
@ -186,7 +251,7 @@ func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error)
// If the request.Index is 0 the client has no view, send a full snapshot.
if req.Index == 0 {
return e.subscriptions.add(req, snapFromCache.First), nil
return e.subscriptions.add(req, snapFromCache.First, freeBuf), nil
}
// otherwise the request has an Index, the client view is stale and must be reset
@ -197,11 +262,17 @@ func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error)
Payload: newSnapshotToFollow{},
}})
result.buffer.AppendItem(snapFromCache.First)
return e.subscriptions.add(req, result.First), nil
return e.subscriptions.add(req, result.First, freeBuf), nil
}
func (s *subscriptions) add(req *SubscribeRequest, head *bufferItem) *Subscription {
sub := newSubscription(*req, head, s.unsubscribe(req))
func (s *subscriptions) add(req *SubscribeRequest, head *bufferItem, freeBuf func()) *Subscription {
// We wrap freeBuf in a sync.Once as it's expected that Subscription.unsub is
// idempotent, but freeBuf decrements the reference counter on every call.
var once sync.Once
sub := newSubscription(*req, head, func() {
s.unsubscribe(req)
once.Do(freeBuf)
})
s.lock.Lock()
defer s.lock.Unlock()
@ -228,24 +299,17 @@ func (s *subscriptions) closeSubscriptionsForTokens(tokenSecretIDs []string) {
}
}
// unsubscribe returns a function that the subscription will call to remove
// itself from the subsByToken.
// This function is returned as a closure so that the caller doesn't need to keep
// track of the SubscriptionRequest, and can not accidentally call unsubscribe with the
// wrong pointer.
func (s *subscriptions) unsubscribe(req *SubscribeRequest) func() {
return func() {
s.lock.Lock()
defer s.lock.Unlock()
func (s *subscriptions) unsubscribe(req *SubscribeRequest) {
s.lock.Lock()
defer s.lock.Unlock()
subsByToken, ok := s.byToken[req.Token]
if !ok {
return
}
delete(subsByToken, req)
if len(subsByToken) == 0 {
delete(s.byToken, req.Token)
}
subsByToken, ok := s.byToken[req.Token]
if !ok {
return
}
delete(subsByToken, req)
if len(subsByToken) == 0 {
delete(s.byToken, req.Token)
}
}
@ -262,13 +326,7 @@ func (s *subscriptions) closeAll() {
// EventPublisher.lock must be held to call this method.
func (e *EventPublisher) getCachedSnapshotLocked(req *SubscribeRequest) *eventSnapshot {
topicSnaps, ok := e.snapCache[req.Topic]
if !ok {
topicSnaps = make(map[string]*eventSnapshot)
e.snapCache[req.Topic] = topicSnaps
}
snap, ok := topicSnaps[snapCacheKey(req)]
snap, ok := e.snapCache[req.topicSubject()]
if ok && snap.err() == nil {
return snap
}
@ -280,16 +338,12 @@ func (e *EventPublisher) setCachedSnapshotLocked(req *SubscribeRequest, snap *ev
if e.snapCacheTTL == 0 {
return
}
e.snapCache[req.Topic][snapCacheKey(req)] = snap
e.snapCache[req.topicSubject()] = snap
// Setup a cache eviction
time.AfterFunc(e.snapCacheTTL, func() {
e.lock.Lock()
defer e.lock.Unlock()
delete(e.snapCache[req.Topic], snapCacheKey(req))
delete(e.snapCache, req.topicSubject())
})
}
func snapCacheKey(req *SubscribeRequest) string {
return req.Partition + "/" + req.Namespace + "/" + req.Key
}

View File

@ -56,6 +56,13 @@ func TestEventPublisher_SubscribeWithIndex0(t *testing.T) {
Payload: simplePayload{key: "sub-key", value: "the-published-event-payload"},
}
require.Equal(t, expected, next)
// Subscriber should not see events for other keys
publisher.Publish([]Event{{
Topic: testTopic,
Payload: simplePayload{key: "other-key", value: "this-should-not-reach-the-subscriber"},
}})
assertNoResult(t, eventCh)
}
var testSnapshotEvent = Event{
@ -70,17 +77,12 @@ type simplePayload struct {
noReadPerm bool
}
func (p simplePayload) MatchesKey(key, _, _ string) bool {
if key == "" {
return true
}
return p.key == key
}
func (p simplePayload) HasReadPermission(acl.Authorizer) bool {
return !p.noReadPerm
}
func (p simplePayload) Subject() Subject { return Subject("default/default/" + p.key) }
func newTestSnapshotHandlers() SnapshotHandlers {
return SnapshotHandlers{
testTopic: func(req SubscribeRequest, buf SnapshotAppender) (uint64, error) {
@ -190,9 +192,10 @@ func TestEventPublisher_SubscribeWithIndex0_FromCache(t *testing.T) {
publisher := NewEventPublisher(newTestSnapshotHandlers(), time.Second)
go publisher.Run(ctx)
sub, err := publisher.Subscribe(req)
require.NoError(t, err)
sub.Unsubscribe()
defer sub.Unsubscribe()
publisher.snapshotHandlers[testTopic] = func(_ SubscribeRequest, _ SnapshotAppender) (uint64, error) {
return 0, fmt.Errorf("error should not be seen, cache should have been used")
@ -200,6 +203,7 @@ func TestEventPublisher_SubscribeWithIndex0_FromCache(t *testing.T) {
sub, err = publisher.Subscribe(req)
require.NoError(t, err)
defer sub.Unsubscribe()
eventCh := runSubscription(ctx, sub)
next := getNextEvent(t, eventCh)
@ -233,7 +237,11 @@ func TestEventPublisher_SubscribeWithIndexNotZero_CanResume(t *testing.T) {
publisher := NewEventPublisher(newTestSnapshotHandlers(), time.Second)
go publisher.Run(ctx)
// Include the same event in the topicBuffer
simulateExistingSubscriber(t, publisher, req)
// Publish the testSnapshotEvent, to ensure that it is skipped over when
// splicing the topic buffer onto the snapshot.
publisher.publishEvent([]Event{testSnapshotEvent})
runStep(t, "start a subscription and unsub", func(t *testing.T) {
@ -338,7 +346,11 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshotFromCache(t *testin
publisher := NewEventPublisher(newTestSnapshotHandlers(), time.Second)
go publisher.Run(ctx)
// Include the same event in the topicBuffer
simulateExistingSubscriber(t, publisher, req)
// Publish the testSnapshotEvent, to ensure that it is skipped over when
// splicing the topic buffer onto the snapshot.
publisher.publishEvent([]Event{testSnapshotEvent})
runStep(t, "start a subscription and unsub", func(t *testing.T) {
@ -421,7 +433,11 @@ func TestEventPublisher_SubscribeWithIndexNotZero_NewSnapshot_WithCache(t *testi
publisher := NewEventPublisher(handlers, time.Second)
go publisher.Run(ctx)
// Include the same events in the topicBuffer
simulateExistingSubscriber(t, publisher, req)
// Publish the events, to ensure they are is skipped over when splicing the
// topic buffer onto the snapshot.
publisher.publishEvent([]Event{testSnapshotEvent})
publisher.publishEvent([]Event{nextEvent})
@ -495,3 +511,60 @@ func TestEventPublisher_Unsubscribe_ClosesSubscription(t *testing.T) {
require.Error(t, err)
require.Contains(t, err.Error(), "subscription was closed by unsubscribe")
}
func TestEventPublisher_Unsubscribe_FreesResourcesWhenThereAreNoSubscribers(t *testing.T) {
req := &SubscribeRequest{
Topic: testTopic,
Key: "sub-key",
}
publisher := NewEventPublisher(newTestSnapshotHandlers(), time.Second)
sub1, err := publisher.Subscribe(req)
require.NoError(t, err)
// Expect a topic buffer and snapshot to have been created.
publisher.lock.Lock()
require.NotNil(t, publisher.topicBuffers[req.topicSubject()])
require.NotNil(t, publisher.snapCache[req.topicSubject()])
publisher.lock.Unlock()
// Create another subscription and close the old one, to ensure the buffer and
// snapshot stick around as long as there's at least one subscriber.
sub2, err := publisher.Subscribe(req)
require.NoError(t, err)
sub1.Unsubscribe()
publisher.lock.Lock()
require.NotNil(t, publisher.topicBuffers[req.topicSubject()])
require.NotNil(t, publisher.snapCache[req.topicSubject()])
publisher.lock.Unlock()
// Close the other subscription and expect the buffer and snapshot to have
// been cleaned up.
sub2.Unsubscribe()
publisher.lock.Lock()
require.Nil(t, publisher.topicBuffers[req.topicSubject()])
require.Nil(t, publisher.snapCache[req.topicSubject()])
publisher.lock.Unlock()
}
// simulateExistingSubscriber creates a subscription that remains open throughout
// a test to prevent the topic buffer getting garbage-collected.
//
// It evicts the created snapshot from the cache immediately (simulating an
// existing subscription that has been open long enough the snapshot's TTL has
// been reached) so you can test snapshots getting created afresh.
func simulateExistingSubscriber(t *testing.T, p *EventPublisher, r *SubscribeRequest) {
t.Helper()
sub, err := p.Subscribe(r)
require.NoError(t, err)
t.Cleanup(sub.Unsubscribe)
p.lock.Lock()
delete(p.snapCache, r.topicSubject())
p.lock.Unlock()
}

View File

@ -20,119 +20,6 @@ func newSimpleEvent(key string, index uint64) Event {
return Event{Index: index, Payload: simplePayload{key: key}}
}
func TestPayloadEvents_FilterByKey(t *testing.T) {
type testCase struct {
name string
req SubscribeRequest
events []Event
expectEvent bool
expected *PayloadEvents
expectedCap int
}
fn := func(t *testing.T, tc testCase) {
events := make([]Event, 0, 5)
events = append(events, tc.events...)
pe := &PayloadEvents{Items: events}
ok := pe.MatchesKey(tc.req.Key, tc.req.Namespace, tc.req.Partition)
require.Equal(t, tc.expectEvent, ok)
if !tc.expectEvent {
return
}
require.Equal(t, tc.expected, pe)
// test if there was a new array allocated or not
require.Equal(t, tc.expectedCap, cap(pe.Items))
}
var testCases = []testCase{
{
name: "all events match, no key or namespace",
req: SubscribeRequest{Topic: testTopic},
events: []Event{
newSimpleEvent("One", 102),
newSimpleEvent("Two", 102)},
expectEvent: true,
expected: newPayloadEvents(
newSimpleEvent("One", 102),
newSimpleEvent("Two", 102)),
expectedCap: 5,
},
{
name: "all events match, no namespace",
req: SubscribeRequest{Topic: testTopic, Key: "Same"},
events: []Event{
newSimpleEvent("Same", 103),
newSimpleEvent("Same", 103)},
expectEvent: true,
expected: newPayloadEvents(
newSimpleEvent("Same", 103),
newSimpleEvent("Same", 103)),
expectedCap: 5,
},
{
name: "all events match, no key",
req: SubscribeRequest{Topic: testTopic, Namespace: "apps"},
events: []Event{
newNSEvent("Something", "apps"),
newNSEvent("Other", "apps")},
expectEvent: true,
expected: newPayloadEvents(
newNSEvent("Something", "apps"),
newNSEvent("Other", "apps")),
expectedCap: 5,
},
{
name: "some evens match, no namespace",
req: SubscribeRequest{Topic: testTopic, Key: "Same"},
events: []Event{
newSimpleEvent("Same", 104),
newSimpleEvent("Other", 104),
newSimpleEvent("Same", 104)},
expectEvent: true,
expected: newPayloadEvents(
newSimpleEvent("Same", 104),
newSimpleEvent("Same", 104)),
expectedCap: 2,
},
{
name: "some events match, no key",
req: SubscribeRequest{Topic: testTopic, Namespace: "apps"},
events: []Event{
newNSEvent("app1", "apps"),
newNSEvent("db1", "dbs"),
newNSEvent("app2", "apps")},
expectEvent: true,
expected: newPayloadEvents(
newNSEvent("app1", "apps"),
newNSEvent("app2", "apps")),
expectedCap: 2,
},
{
name: "no events match key",
req: SubscribeRequest{Topic: testTopic, Key: "Other"},
events: []Event{
newSimpleEvent("Same", 0),
newSimpleEvent("Same", 0)},
},
{
name: "no events match namespace",
req: SubscribeRequest{Topic: testTopic, Namespace: "apps"},
events: []Event{
newNSEvent("app1", "group1"),
newNSEvent("app2", "group2")},
expectEvent: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
fn(t, tc)
})
}
}
// TODO(partitions)
func newNSEvent(key, namespace string) Event {
return Event{Index: 22, Payload: nsPayload{key: key, namespace: namespace}}
@ -146,12 +33,6 @@ type nsPayload struct {
value string
}
func (p nsPayload) MatchesKey(key, namespace, partition string) bool {
return (key == "" || key == p.key) &&
(namespace == "" || namespace == p.namespace) &&
(partition == "" || partition == p.partition)
}
func TestPayloadEvents_HasReadPermission(t *testing.T) {
t.Run("some events filtered", func(t *testing.T) {
ep := newPayloadEvents(

View File

@ -4,7 +4,10 @@ import (
"context"
"errors"
"fmt"
"strings"
"sync/atomic"
"github.com/hashicorp/consul/agent/structs"
)
const (
@ -59,12 +62,9 @@ type SubscribeRequest struct {
// be returned by the subscription. A blank key will return all events. Key
// is generally the name of the resource.
Key string
// Namespace used to filter events in the topic. Only events matching the
// namespace will be returned by the subscription.
Namespace string
// Partition used to filter events in the topic. Only events matching the
// partition will be returned by the subscription.
Partition string // TODO(partitions): make this work
// EnterpriseMeta is used to filter events in the topic. Only events matching
// the partition and namespace will be returned by the subscription.
EnterpriseMeta structs.EnterpriseMeta
// Token that was used to authenticate the request. If any ACL policy
// changes impact the token the subscription will be forcefully closed.
Token string
@ -74,6 +74,19 @@ type SubscribeRequest struct {
Index uint64
}
func (req SubscribeRequest) Subject() Subject {
var (
partition = req.EnterpriseMeta.PartitionOrDefault()
namespace = req.EnterpriseMeta.NamespaceOrDefault()
key = strings.ToLower(req.Key)
)
return Subject(partition + "/" + namespace + "/" + key)
}
func (req SubscribeRequest) topicSubject() topicSubject {
return topicSubject{req.Topic, req.Subject()}
}
// newSubscription return a new subscription. The caller is responsible for
// calling Unsubscribe when it is done with the subscription, to free resources.
func newSubscription(req SubscribeRequest, item *bufferItem, unsub func()) *Subscription {
@ -104,11 +117,7 @@ func (s *Subscription) Next(ctx context.Context) (Event, error) {
if len(next.Events) == 0 {
continue
}
event := newEventFromBatch(s.req, next.Events)
if !event.Payload.MatchesKey(s.req.Key, s.req.Namespace, s.req.Partition) {
continue
}
return event, nil
return newEventFromBatch(s.req, next.Events), nil
}
}

View File

@ -6,10 +6,32 @@ import (
time "time"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/structs"
)
func noopUnSub() {}
func TestSubscription_Subject(t *testing.T) {
for desc, tc := range map[string]struct {
req SubscribeRequest
sub Subject
}{
"default partition and namespace": {
SubscribeRequest{Key: "foo", EnterpriseMeta: structs.EnterpriseMeta{}},
"default/default/foo",
},
"mixed casing": {
SubscribeRequest{Key: "BaZ"},
"default/default/baz",
},
} {
t.Run(desc, func(t *testing.T) {
require.Equal(t, tc.sub, tc.req.Subject())
})
}
}
func TestSubscription(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
@ -59,10 +81,6 @@ func TestSubscription(t *testing.T) {
"Event should have been delivered after short time, took %s", elapsed)
require.Equal(t, index, got.Index)
// Event with wrong key should not be delivered. Deliver a good message right
// so we don't have to block test thread forever or cancel func yet.
index++
publishTestEvent(index, eb, "nope")
index++
publishTestEvent(index, eb, "test")

View File

@ -57,6 +57,10 @@ func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsub
return err
}
if req.Key == "" {
return status.Error(codes.InvalidArgument, "Key is required")
}
sub, err := h.Backend.Subscribe(toStreamSubscribeRequest(req, entMeta))
if err != nil {
return err
@ -89,12 +93,11 @@ func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsub
func toStreamSubscribeRequest(req *pbsubscribe.SubscribeRequest, entMeta structs.EnterpriseMeta) *stream.SubscribeRequest {
return &stream.SubscribeRequest{
Topic: req.Topic,
Key: req.Key,
Token: req.Token,
Index: req.Index,
Namespace: entMeta.NamespaceOrEmpty(),
Partition: entMeta.PartitionOrEmpty(),
Topic: req.Topic,
Key: req.Key,
EnterpriseMeta: entMeta,
Token: req.Token,
Index: req.Index,
}
}

View File

@ -30,6 +30,33 @@ import (
"github.com/hashicorp/consul/types"
)
func TestServer_Subscribe_KeyIsRequired(t *testing.T) {
backend, err := newTestBackend()
require.NoError(t, err)
addr := runTestServer(t, NewServer(backend, hclog.New(nil)))
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
t.Cleanup(cancel)
conn, err := gogrpc.DialContext(ctx, addr.String(), gogrpc.WithInsecure())
require.NoError(t, err)
t.Cleanup(logError(t, conn.Close))
client := pbsubscribe.NewStateChangeSubscriptionClient(conn)
stream, err := client.Subscribe(ctx, &pbsubscribe.SubscribeRequest{
Topic: pbsubscribe.Topic_ServiceHealth,
Key: "",
})
require.NoError(t, err)
_, err = stream.Recv()
require.Error(t, err)
require.Equal(t, codes.InvalidArgument.String(), status.Code(err).String())
require.Contains(t, err.Error(), "Key is required")
}
func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
backend, err := newTestBackend()
require.NoError(t, err)
@ -878,6 +905,8 @@ func assertNoEvents(t *testing.T, chEvents chan eventOrError) {
func logError(t *testing.T, f func() error) func() {
return func() {
t.Helper()
if err := f(); err != nil {
t.Logf(err.Error())
}

View File

@ -90,9 +90,8 @@ type SubscribeRequest struct {
Topic Topic `protobuf:"varint,1,opt,name=Topic,proto3,enum=subscribe.Topic" json:"Topic,omitempty"`
// Key is a topic-specific identifier that restricts the scope of the
// subscription to only events pertaining to that identifier. For example,
// to receive events for a single service, the service's name is
// specified as the key. An empty key indicates that all events in the topic
// are of interest.
// to receive events for a single service, the service's name is specified
// as the key.
Key string `protobuf:"bytes,2,opt,name=Key,proto3" json:"Key,omitempty"`
// Token is the ACL token to authenticate the request. The token must have
// sufficient privileges to read the requested information otherwise events

View File

@ -51,9 +51,8 @@ message SubscribeRequest {
// Key is a topic-specific identifier that restricts the scope of the
// subscription to only events pertaining to that identifier. For example,
// to receive events for a single service, the service's name is
// specified as the key. An empty key indicates that all events in the topic
// are of interest.
// to receive events for a single service, the service's name is specified
// as the key.
string Key = 2;
// Token is the ACL token to authenticate the request. The token must have