stream: Add HasReadPermission to Payload

Required now that filter is a method on PayloadEvents instead of Event
pull/9114/head
Daniel Nephin 2020-11-05 17:57:25 -05:00
parent 36202f7938
commit 868cfe1eac
9 changed files with 99 additions and 45 deletions

View File

@ -3,6 +3,8 @@ package state
import ( import (
memdb "github.com/hashicorp/go-memdb" memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/pbsubscribe" "github.com/hashicorp/consul/proto/pbsubscribe"
@ -19,6 +21,10 @@ type EventPayloadCheckServiceNode struct {
key string key string
} }
func (e EventPayloadCheckServiceNode) HasReadPermission(authz acl.Authorizer) bool {
return e.Value.CanRead(authz) == acl.Allow
}
func (e EventPayloadCheckServiceNode) FilterByKey(key, namespace string) bool { func (e EventPayloadCheckServiceNode) FilterByKey(key, namespace string) bool {
if key == "" && namespace == "" { if key == "" && namespace == "" {
return true return true

View File

@ -414,6 +414,10 @@ func (p nodePayload) FilterByKey(key, _ string) bool {
return p.key == key return p.key == key
} }
func (p nodePayload) HasReadPermission(acl.Authorizer) bool {
return true
}
func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLToken { func createTokenAndWaitForACLEventPublish(t *testing.T, s *Store) *structs.ACLToken {
token := &structs.ACLToken{ token := &structs.ACLToken{
AccessorID: "3af117a9-2233-4cf4-8ff8-3c749c9906b4", AccessorID: "3af117a9-2233-4cf4-8ff8-3c749c9906b4",

View File

@ -4,7 +4,11 @@ to the state store.
*/ */
package stream package stream
import "fmt" import (
"fmt"
"github.com/hashicorp/consul/acl"
)
// Topic is an identifier that partitions events. A subscription will only receive // Topic is an identifier that partitions events. A subscription will only receive
// events which match the Topic. // events which match the Topic.
@ -26,6 +30,11 @@ type Payload interface {
// subscription. // subscription.
// TODO: rename to MatchesKey // TODO: rename to MatchesKey
FilterByKey(key, namespace string) bool FilterByKey(key, namespace string) bool
// HasReadPermission uses the acl.Authorizer to determine if the items in the
// Payload are visible to the request. It returns true if the payload is
// authorized for Read, otherwise returns false.
HasReadPermission(authz acl.Authorizer) bool
} }
// PayloadEvents is an Payload which contains multiple Events. // PayloadEvents is an Payload which contains multiple Events.
@ -75,6 +84,12 @@ func (p *PayloadEvents) Len() int {
return len(p.Items) return len(p.Items)
} }
func (p *PayloadEvents) HasReadPermission(authz acl.Authorizer) bool {
return p.filter(func(event Event) bool {
return event.Payload.HasReadPermission(authz)
})
}
// IsEndOfSnapshot returns true if this is a framing event that indicates the // IsEndOfSnapshot returns true if this is a framing event that indicates the
// snapshot has completed. Subsequent events from Subscription.Next will be // snapshot has completed. Subsequent events from Subscription.Next will be
// streamed as they occur. // streamed as they occur.
@ -89,24 +104,34 @@ func (e Event) IsNewSnapshotToFollow() bool {
return e.Payload == newSnapshotToFollow{} return e.Payload == newSnapshotToFollow{}
} }
type endOfSnapshot struct{} type framingEvent struct{}
func (endOfSnapshot) FilterByKey(string, string) bool { func (framingEvent) FilterByKey(string, string) bool {
return true return true
} }
type newSnapshotToFollow struct{} func (framingEvent) HasReadPermission(acl.Authorizer) bool {
func (newSnapshotToFollow) FilterByKey(string, string) bool {
return true return true
} }
type endOfSnapshot struct {
framingEvent
}
type newSnapshotToFollow struct {
framingEvent
}
type closeSubscriptionPayload struct { type closeSubscriptionPayload struct {
tokensSecretIDs []string tokensSecretIDs []string
} }
func (closeSubscriptionPayload) FilterByKey(string, string) bool { func (closeSubscriptionPayload) FilterByKey(string, string) bool {
return true return false
}
func (closeSubscriptionPayload) HasReadPermission(acl.Authorizer) bool {
return false
} }
// NewCloseSubscriptionEvent returns a special Event that is handled by the // NewCloseSubscriptionEvent returns a special Event that is handled by the

View File

@ -7,6 +7,8 @@ import (
"time" "time"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/hashicorp/consul/acl"
) )
type intTopic int type intTopic int
@ -63,8 +65,9 @@ var testSnapshotEvent = Event{
} }
type simplePayload struct { type simplePayload struct {
key string key string
value string value string
noReadPerm bool
} }
func (p simplePayload) FilterByKey(key, _ string) bool { func (p simplePayload) FilterByKey(key, _ string) bool {
@ -74,6 +77,10 @@ func (p simplePayload) FilterByKey(key, _ string) bool {
return p.key == key return p.key == key
} }
func (p simplePayload) HasReadPermission(acl.Authorizer) bool {
return !p.noReadPerm
}
func newTestSnapshotHandlers() SnapshotHandlers { func newTestSnapshotHandlers() SnapshotHandlers {
return SnapshotHandlers{ return SnapshotHandlers{
testTopic: func(req SubscribeRequest, buf SnapshotAppender) (uint64, error) { testTopic: func(req SubscribeRequest, buf SnapshotAppender) (uint64, error) {

View File

@ -138,6 +138,7 @@ func newNSEvent(key, namespace string) Event {
} }
type nsPayload struct { type nsPayload struct {
framingEvent
key string key string
namespace string namespace string
value string value string
@ -146,3 +147,31 @@ type nsPayload struct {
func (p nsPayload) FilterByKey(key, namespace string) bool { func (p nsPayload) FilterByKey(key, namespace string) bool {
return (key == "" || key == p.key) && (namespace == "" || namespace == p.namespace) return (key == "" || key == p.key) && (namespace == "" || namespace == p.namespace)
} }
func TestPayloadEvents_HasReadPermission(t *testing.T) {
t.Run("some events filtered", func(t *testing.T) {
ep := NewPayloadEvents(
Event{Payload: simplePayload{key: "one", noReadPerm: true}},
Event{Payload: simplePayload{key: "two", noReadPerm: false}},
Event{Payload: simplePayload{key: "three", noReadPerm: true}},
Event{Payload: simplePayload{key: "four", noReadPerm: false}})
require.True(t, ep.HasReadPermission(nil))
expected := []Event{
{Payload: simplePayload{key: "two"}},
{Payload: simplePayload{key: "four"}},
}
require.Equal(t, expected, ep.Items)
})
t.Run("all events filtered", func(t *testing.T) {
ep := NewPayloadEvents(
Event{Payload: simplePayload{key: "one", noReadPerm: true}},
Event{Payload: simplePayload{key: "two", noReadPerm: true}},
Event{Payload: simplePayload{key: "three", noReadPerm: true}},
Event{Payload: simplePayload{key: "four", noReadPerm: true}})
require.False(t, ep.HasReadPermission(nil))
})
}

View File

@ -1,22 +0,0 @@
package subscribe
import (
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/stream"
)
// EnforceACL takes an acl.Authorizer and returns the decision for whether the
// event is allowed to be sent to this client or not.
func enforceACL(authz acl.Authorizer, e stream.Event) acl.EnforcementDecision {
switch {
case e.IsEndOfSnapshot(), e.IsNewSnapshotToFollow():
return acl.Allow
}
switch p := e.Payload.(type) {
case state.EventPayloadCheckServiceNode:
return p.Value.CanRead(authz)
}
return acl.Deny
}

View File

@ -58,12 +58,20 @@ func (l *eventLogger) Trace(e stream.Event) {
case e.IsEndOfSnapshot(): case e.IsEndOfSnapshot():
l.snapshotDone = true l.snapshotDone = true
l.logger.Trace("snapshot complete", "index", e.Index, "sent", l.count) l.logger.Trace("snapshot complete", "index", e.Index, "sent", l.count)
return
case e.IsNewSnapshotToFollow(): case e.IsNewSnapshotToFollow():
l.logger.Trace("starting new snapshot", "sent", l.count) l.logger.Trace("starting new snapshot", "sent", l.count)
return return
case l.snapshotDone:
l.logger.Trace("sending events", "index", e.Index, "sent", l.count, "batch_size", e.Len())
} }
l.count += uint64(e.Len()) size := 1
if l, ok := e.Payload.(length); ok {
size = l.Len()
}
l.logger.Trace("sending events", "index", e.Index, "sent", l.count, "batch_size", size)
l.count += uint64(size)
}
type length interface {
Len() int
} }

View File

@ -132,10 +132,8 @@ func filterByAuth(authz acl.Authorizer, event stream.Event) (stream.Event, bool)
if authz == nil { if authz == nil {
return event, true return event, true
} }
fn := func(e stream.Event) bool {
return enforceACL(authz, e) == acl.Allow return event, event.Payload.HasReadPermission(authz)
}
return event.Filter(fn)
} }
func newEventFromStreamEvent(event stream.Event) *pbsubscribe.Event { func newEventFromStreamEvent(event stream.Event) *pbsubscribe.Event {
@ -154,10 +152,10 @@ func newEventFromStreamEvent(event stream.Event) *pbsubscribe.Event {
func setPayload(e *pbsubscribe.Event, payload stream.Payload) { func setPayload(e *pbsubscribe.Event, payload stream.Payload) {
switch p := payload.(type) { switch p := payload.(type) {
case stream.PayloadEvents: case *stream.PayloadEvents:
e.Payload = &pbsubscribe.Event_EventBatch{ e.Payload = &pbsubscribe.Event_EventBatch{
EventBatch: &pbsubscribe.EventBatch{ EventBatch: &pbsubscribe.EventBatch{
Events: batchEventsFromEventSlice(p), Events: batchEventsFromEventSlice(p.Items),
}, },
} }
case state.EventPayloadCheckServiceNode: case state.EventPayloadCheckServiceNode:

View File

@ -917,8 +917,8 @@ func TestNewEventFromSteamEvent(t *testing.T) {
name: "event batch", name: "event batch",
event: stream.Event{ event: stream.Event{
Index: 2002, Index: 2002,
Payload: stream.PayloadEvents{ Payload: stream.NewPayloadEvents(
{ stream.Event{
Index: 2002, Index: 2002,
Payload: state.EventPayloadCheckServiceNode{ Payload: state.EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Register, Op: pbsubscribe.CatalogOp_Register,
@ -928,7 +928,7 @@ func TestNewEventFromSteamEvent(t *testing.T) {
}, },
}, },
}, },
{ stream.Event{
Index: 2002, Index: 2002,
Payload: state.EventPayloadCheckServiceNode{ Payload: state.EventPayloadCheckServiceNode{
Op: pbsubscribe.CatalogOp_Deregister, Op: pbsubscribe.CatalogOp_Deregister,
@ -937,8 +937,7 @@ func TestNewEventFromSteamEvent(t *testing.T) {
Service: &structs.NodeService{Service: "web1"}, Service: &structs.NodeService{Service: "web1"},
}, },
}, },
}, }),
},
}, },
expected: pbsubscribe.Event{ expected: pbsubscribe.Event{
Index: 2002, Index: 2002,