From 868cfe1eac118b258f2c7645e9c3be2b89edf073 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Thu, 5 Nov 2020 17:57:25 -0500 Subject: [PATCH] stream: Add HasReadPermission to Payload Required now that filter is a method on PayloadEvents instead of Event --- agent/consul/state/catalog_events.go | 6 +++ agent/consul/state/store_integration_test.go | 4 ++ agent/consul/stream/event.go | 39 ++++++++++++++++---- agent/consul/stream/event_publisher_test.go | 11 +++++- agent/consul/stream/event_test.go | 29 +++++++++++++++ agent/rpc/subscribe/auth.go | 22 ----------- agent/rpc/subscribe/logger.go | 14 +++++-- agent/rpc/subscribe/subscribe.go | 10 ++--- agent/rpc/subscribe/subscribe_test.go | 9 ++--- 9 files changed, 99 insertions(+), 45 deletions(-) delete mode 100644 agent/rpc/subscribe/auth.go diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go index 0a11a94360..2a410d299b 100644 --- a/agent/consul/state/catalog_events.go +++ b/agent/consul/state/catalog_events.go @@ -3,6 +3,8 @@ package state import ( memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/proto/pbsubscribe" @@ -19,6 +21,10 @@ type EventPayloadCheckServiceNode struct { key string } +func (e EventPayloadCheckServiceNode) HasReadPermission(authz acl.Authorizer) bool { + return e.Value.CanRead(authz) == acl.Allow +} + func (e EventPayloadCheckServiceNode) FilterByKey(key, namespace string) bool { if key == "" && namespace == "" { return true diff --git a/agent/consul/state/store_integration_test.go b/agent/consul/state/store_integration_test.go index d75512195e..5209138078 100644 --- a/agent/consul/state/store_integration_test.go +++ b/agent/consul/state/store_integration_test.go @@ -414,6 +414,10 @@ func (p nodePayload) FilterByKey(key, _ string) bool { return p.key == key } +func (p nodePayload) HasReadPermission(acl.Authorizer) bool { + return true +} + func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLToken { token := &structs.ACLToken{ AccessorID: "3af117a9-2233-4cf4-8ff8-3c749c9906b4", diff --git a/agent/consul/stream/event.go b/agent/consul/stream/event.go index 09369920bf..16a33fca70 100644 --- a/agent/consul/stream/event.go +++ b/agent/consul/stream/event.go @@ -4,7 +4,11 @@ to the state store. */ package stream -import "fmt" +import ( + "fmt" + + "github.com/hashicorp/consul/acl" +) // Topic is an identifier that partitions events. A subscription will only receive // events which match the Topic. @@ -26,6 +30,11 @@ type Payload interface { // subscription. // TODO: rename to MatchesKey FilterByKey(key, namespace string) bool + + // HasReadPermission uses the acl.Authorizer to determine if the items in the + // Payload are visible to the request. It returns true if the payload is + // authorized for Read, otherwise returns false. + HasReadPermission(authz acl.Authorizer) bool } // PayloadEvents is an Payload which contains multiple Events. @@ -75,6 +84,12 @@ func (p *PayloadEvents) Len() int { return len(p.Items) } +func (p *PayloadEvents) HasReadPermission(authz acl.Authorizer) bool { + return p.filter(func(event Event) bool { + return event.Payload.HasReadPermission(authz) + }) +} + // IsEndOfSnapshot returns true if this is a framing event that indicates the // snapshot has completed. Subsequent events from Subscription.Next will be // streamed as they occur. @@ -89,24 +104,34 @@ func (e Event) IsNewSnapshotToFollow() bool { return e.Payload == newSnapshotToFollow{} } -type endOfSnapshot struct{} +type framingEvent struct{} -func (endOfSnapshot) FilterByKey(string, string) bool { +func (framingEvent) FilterByKey(string, string) bool { return true } -type newSnapshotToFollow struct{} - -func (newSnapshotToFollow) FilterByKey(string, string) bool { +func (framingEvent) HasReadPermission(acl.Authorizer) bool { return true } +type endOfSnapshot struct { + framingEvent +} + +type newSnapshotToFollow struct { + framingEvent +} + type closeSubscriptionPayload struct { tokensSecretIDs []string } func (closeSubscriptionPayload) FilterByKey(string, string) bool { - return true + return false +} + +func (closeSubscriptionPayload) HasReadPermission(acl.Authorizer) bool { + return false } // NewCloseSubscriptionEvent returns a special Event that is handled by the diff --git a/agent/consul/stream/event_publisher_test.go b/agent/consul/stream/event_publisher_test.go index f2a9e43a36..c9f91fd3d3 100644 --- a/agent/consul/stream/event_publisher_test.go +++ b/agent/consul/stream/event_publisher_test.go @@ -7,6 +7,8 @@ import ( "time" "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/acl" ) type intTopic int @@ -63,8 +65,9 @@ var testSnapshotEvent = Event{ } type simplePayload struct { - key string - value string + key string + value string + noReadPerm bool } func (p simplePayload) FilterByKey(key, _ string) bool { @@ -74,6 +77,10 @@ func (p simplePayload) FilterByKey(key, _ string) bool { return p.key == key } +func (p simplePayload) HasReadPermission(acl.Authorizer) bool { + return !p.noReadPerm +} + func newTestSnapshotHandlers() SnapshotHandlers { return SnapshotHandlers{ testTopic: func(req SubscribeRequest, buf SnapshotAppender) (uint64, error) { diff --git a/agent/consul/stream/event_test.go b/agent/consul/stream/event_test.go index f3daba8021..5a9cccea82 100644 --- a/agent/consul/stream/event_test.go +++ b/agent/consul/stream/event_test.go @@ -138,6 +138,7 @@ func newNSEvent(key, namespace string) Event { } type nsPayload struct { + framingEvent key string namespace string value string @@ -146,3 +147,31 @@ type nsPayload struct { func (p nsPayload) FilterByKey(key, namespace string) bool { return (key == "" || key == p.key) && (namespace == "" || namespace == p.namespace) } + +func TestPayloadEvents_HasReadPermission(t *testing.T) { + t.Run("some events filtered", func(t *testing.T) { + ep := NewPayloadEvents( + Event{Payload: simplePayload{key: "one", noReadPerm: true}}, + Event{Payload: simplePayload{key: "two", noReadPerm: false}}, + Event{Payload: simplePayload{key: "three", noReadPerm: true}}, + Event{Payload: simplePayload{key: "four", noReadPerm: false}}) + + require.True(t, ep.HasReadPermission(nil)) + expected := []Event{ + {Payload: simplePayload{key: "two"}}, + {Payload: simplePayload{key: "four"}}, + } + require.Equal(t, expected, ep.Items) + }) + + t.Run("all events filtered", func(t *testing.T) { + ep := NewPayloadEvents( + Event{Payload: simplePayload{key: "one", noReadPerm: true}}, + Event{Payload: simplePayload{key: "two", noReadPerm: true}}, + Event{Payload: simplePayload{key: "three", noReadPerm: true}}, + Event{Payload: simplePayload{key: "four", noReadPerm: true}}) + + require.False(t, ep.HasReadPermission(nil)) + }) + +} diff --git a/agent/rpc/subscribe/auth.go b/agent/rpc/subscribe/auth.go deleted file mode 100644 index b41b1fdc40..0000000000 --- a/agent/rpc/subscribe/auth.go +++ /dev/null @@ -1,22 +0,0 @@ -package subscribe - -import ( - "github.com/hashicorp/consul/acl" - "github.com/hashicorp/consul/agent/consul/state" - "github.com/hashicorp/consul/agent/consul/stream" -) - -// EnforceACL takes an acl.Authorizer and returns the decision for whether the -// event is allowed to be sent to this client or not. -func enforceACL(authz acl.Authorizer, e stream.Event) acl.EnforcementDecision { - switch { - case e.IsEndOfSnapshot(), e.IsNewSnapshotToFollow(): - return acl.Allow - } - - switch p := e.Payload.(type) { - case state.EventPayloadCheckServiceNode: - return p.Value.CanRead(authz) - } - return acl.Deny -} diff --git a/agent/rpc/subscribe/logger.go b/agent/rpc/subscribe/logger.go index ddddb20ca5..99394f5465 100644 --- a/agent/rpc/subscribe/logger.go +++ b/agent/rpc/subscribe/logger.go @@ -58,12 +58,20 @@ func (l *eventLogger) Trace(e stream.Event) { case e.IsEndOfSnapshot(): l.snapshotDone = true l.logger.Trace("snapshot complete", "index", e.Index, "sent", l.count) + return case e.IsNewSnapshotToFollow(): l.logger.Trace("starting new snapshot", "sent", l.count) return - case l.snapshotDone: - l.logger.Trace("sending events", "index", e.Index, "sent", l.count, "batch_size", e.Len()) } - l.count += uint64(e.Len()) + size := 1 + if l, ok := e.Payload.(length); ok { + size = l.Len() + } + l.logger.Trace("sending events", "index", e.Index, "sent", l.count, "batch_size", size) + l.count += uint64(size) +} + +type length interface { + Len() int } diff --git a/agent/rpc/subscribe/subscribe.go b/agent/rpc/subscribe/subscribe.go index 71919babab..0e98893f92 100644 --- a/agent/rpc/subscribe/subscribe.go +++ b/agent/rpc/subscribe/subscribe.go @@ -132,10 +132,8 @@ func filterByAuth(authz acl.Authorizer, event stream.Event) (stream.Event, bool) if authz == nil { return event, true } - fn := func(e stream.Event) bool { - return enforceACL(authz, e) == acl.Allow - } - return event.Filter(fn) + + return event, event.Payload.HasReadPermission(authz) } func newEventFromStreamEvent(event stream.Event) *pbsubscribe.Event { @@ -154,10 +152,10 @@ func newEventFromStreamEvent(event stream.Event) *pbsubscribe.Event { func setPayload(e *pbsubscribe.Event, payload stream.Payload) { switch p := payload.(type) { - case stream.PayloadEvents: + case *stream.PayloadEvents: e.Payload = &pbsubscribe.Event_EventBatch{ EventBatch: &pbsubscribe.EventBatch{ - Events: batchEventsFromEventSlice(p), + Events: batchEventsFromEventSlice(p.Items), }, } case state.EventPayloadCheckServiceNode: diff --git a/agent/rpc/subscribe/subscribe_test.go b/agent/rpc/subscribe/subscribe_test.go index bc41ed1e88..fe3a073252 100644 --- a/agent/rpc/subscribe/subscribe_test.go +++ b/agent/rpc/subscribe/subscribe_test.go @@ -917,8 +917,8 @@ func TestNewEventFromSteamEvent(t *testing.T) { name: "event batch", event: stream.Event{ Index: 2002, - Payload: stream.PayloadEvents{ - { + Payload: stream.NewPayloadEvents( + stream.Event{ Index: 2002, Payload: state.EventPayloadCheckServiceNode{ Op: pbsubscribe.CatalogOp_Register, @@ -928,7 +928,7 @@ func TestNewEventFromSteamEvent(t *testing.T) { }, }, }, - { + stream.Event{ Index: 2002, Payload: state.EventPayloadCheckServiceNode{ Op: pbsubscribe.CatalogOp_Deregister, @@ -937,8 +937,7 @@ func TestNewEventFromSteamEvent(t *testing.T) { Service: &structs.NodeService{Service: "web1"}, }, }, - }, - }, + }), }, expected: pbsubscribe.Event{ Index: 2002,