diff --git a/agent/rpc/subscribe/auth.go b/agent/rpc/subscribe/auth.go index 094ed4e3bf..b41b1fdc40 100644 --- a/agent/rpc/subscribe/auth.go +++ b/agent/rpc/subscribe/auth.go @@ -16,25 +16,7 @@ func enforceACL(authz acl.Authorizer, e stream.Event) acl.EnforcementDecision { switch p := e.Payload.(type) { case state.EventPayloadCheckServiceNode: - csn := p.Value - if csn.Node == nil || csn.Service == nil || csn.Node.Node == "" || csn.Service.Service == "" { - return acl.Deny - } - - // TODO: what about acl.Default? - // TODO(streaming): we need the AuthorizerContext for ent - if dec := authz.NodeRead(csn.Node.Node, nil); dec != acl.Allow { - return acl.Deny - } - - // TODO(streaming): we need the AuthorizerContext for ent - // Enterprise support for streaming events - they don't have enough data to - // populate it yet. - if dec := authz.ServiceRead(csn.Service.Service, nil); dec != acl.Allow { - return acl.Deny - } - return acl.Allow + return p.Value.CanRead(authz) } - return acl.Deny } diff --git a/agent/rpc/subscribe/subscribe.go b/agent/rpc/subscribe/subscribe.go index 981c1714b0..bcf87460e1 100644 --- a/agent/rpc/subscribe/subscribe.go +++ b/agent/rpc/subscribe/subscribe.go @@ -35,6 +35,8 @@ type Logger interface { var _ pbsubscribe.StateChangeSubscriptionServer = (*Server)(nil) type Backend interface { + // TODO(streaming): Use ResolveTokenAndDefaultMeta instead once SubscribeRequest + // has an EnterpriseMeta. ResolveToken(token string) (acl.Authorizer, error) Forward(dc string, f func(*grpc.ClientConn) error) (handled bool, err error) Subscribe(req *stream.SubscribeRequest) (*stream.Subscription, error) @@ -51,7 +53,6 @@ func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsub defer logger.Trace("subscription closed") // Resolve the token and create the ACL filter. - // TODO(streaming): handle token expiry gracefully... authz, err := h.Backend.ResolveToken(req.Token) if err != nil { return err @@ -64,7 +65,6 @@ func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsub defer sub.Unsubscribe() ctx := serverStream.Context() - elog := &eventLogger{logger: logger} for { events, err := sub.Next(ctx) diff --git a/agent/structs/structs.go b/agent/structs/structs.go index ca730e7449..eef34c3f60 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -19,6 +19,7 @@ import ( "github.com/hashicorp/serf/coordinate" "github.com/mitchellh/hashstructure" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/lib" @@ -1576,6 +1577,25 @@ func (csn *CheckServiceNode) BestAddress(wan bool) (string, int) { return addr, port } +func (csn *CheckServiceNode) CanRead(authz acl.Authorizer) acl.EnforcementDecision { + if csn.Node == nil || csn.Service == nil { + return acl.Deny + } + + // TODO(streaming): add enterprise test that uses namespaces + authzContext := new(acl.AuthorizerContext) + csn.Service.FillAuthzContext(authzContext) + + if authz.NodeRead(csn.Node.Node, authzContext) != acl.Allow { + return acl.Deny + } + + if authz.ServiceRead(csn.Service.Service, authzContext) != acl.Allow { + return acl.Deny + } + return acl.Allow +} + type CheckServiceNodes []CheckServiceNode // Shuffle does an in-place random shuffle using the Fisher-Yates algorithm. diff --git a/agent/structs/structs_test.go b/agent/structs/structs_test.go index a9c75bca00..0b4e9c497e 100644 --- a/agent/structs/structs_test.go +++ b/agent/structs/structs_test.go @@ -8,13 +8,15 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/types" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func TestEncodeDecode(t *testing.T) { @@ -1152,7 +1154,7 @@ func TestStructs_HealthCheck_Clone(t *testing.T) { } } -func TestStructs_CheckServiceNodes_Shuffle(t *testing.T) { +func TestCheckServiceNodes_Shuffle(t *testing.T) { // Make a huge list of nodes. var nodes CheckServiceNodes for i := 0; i < 100; i++ { @@ -1185,7 +1187,7 @@ func TestStructs_CheckServiceNodes_Shuffle(t *testing.T) { } } -func TestStructs_CheckServiceNodes_Filter(t *testing.T) { +func TestCheckServiceNodes_Filter(t *testing.T) { nodes := CheckServiceNodes{ CheckServiceNode{ Node: &Node{ @@ -1288,6 +1290,79 @@ func TestStructs_CheckServiceNodes_Filter(t *testing.T) { } } +func TestCheckServiceNodes_CanRead(t *testing.T) { + type testCase struct { + name string + csn CheckServiceNode + authz acl.Authorizer + expected acl.EnforcementDecision + } + + fn := func(t *testing.T, tc testCase) { + actual := tc.csn.CanRead(tc.authz) + require.Equal(t, tc.expected, actual) + } + + var testCases = []testCase{ + { + name: "empty", + expected: acl.Deny, + }, + { + name: "node read not authorized", + csn: CheckServiceNode{ + Node: &Node{Node: "name"}, + Service: &NodeService{Service: "service-name"}, + }, + authz: aclAuthorizerCheckServiceNode{allowService: true}, + expected: acl.Deny, + }, + { + name: "service read not authorized", + csn: CheckServiceNode{ + Node: &Node{Node: "name"}, + Service: &NodeService{Service: "service-name"}, + }, + authz: aclAuthorizerCheckServiceNode{allowNode: true}, + expected: acl.Deny, + }, + { + name: "read authorized", + csn: CheckServiceNode{ + Node: &Node{Node: "name"}, + Service: &NodeService{Service: "service-name"}, + }, + authz: acl.AllowAll(), + expected: acl.Allow, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + fn(t, tc) + }) + } +} + +type aclAuthorizerCheckServiceNode struct { + acl.Authorizer + allowNode bool + allowService bool +} + +func (a aclAuthorizerCheckServiceNode) ServiceRead(string, *acl.AuthorizerContext) acl.EnforcementDecision { + if a.allowService { + return acl.Allow + } + return acl.Deny +} + +func (a aclAuthorizerCheckServiceNode) NodeRead(string, *acl.AuthorizerContext) acl.EnforcementDecision { + if a.allowNode { + return acl.Allow + } + return acl.Deny +} + func TestStructs_DirEntry_Clone(t *testing.T) { e := &DirEntry{ LockIndex: 5,