diff --git a/agent/cache-types/streaming_health_services.go b/agent/cache-types/streaming_health_services.go index e3d0adefd0..ac1e6e5285 100644 --- a/agent/cache-types/streaming_health_services.go +++ b/agent/cache-types/streaming_health_services.go @@ -73,7 +73,7 @@ func (c *StreamingHealthServices) Fetch(opts cache.FetchOptions, req cache.Reque Token: srvReq.Token, Datacenter: srvReq.Datacenter, Index: index, - // TODO(streaming): set Namespace from srvReq.EnterpriseMeta.Namespace + Namespace: srvReq.EnterpriseMeta.GetNamespace(), } if srvReq.Connect { req.Topic = pbsubscribe.Topic_ServiceHealthConnect diff --git a/agent/cache-types/streaming_health_services_test.go b/agent/cache-types/streaming_health_services_test.go index 5c963c7a8a..768962aa83 100644 --- a/agent/cache-types/streaming_health_services_test.go +++ b/agent/cache-types/streaming_health_services_test.go @@ -14,11 +14,13 @@ import ( "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/proto/pbcommon" "github.com/hashicorp/consul/proto/pbsubscribe" ) func TestStreamingHealthServices_EmptySnapshot(t *testing.T) { - client := NewTestStreamingClient() + namespace := pbcommon.DefaultEnterpriseMeta.Namespace + client := NewTestStreamingClient(namespace) typ := StreamingHealthServices{deps: MaterializerDeps{ Client: client, Logger: hclog.Default(), @@ -33,8 +35,9 @@ func TestStreamingHealthServices_EmptySnapshot(t *testing.T) { Timeout: time.Second, } req := &structs.ServiceSpecificRequest{ - Datacenter: "dc1", - ServiceName: "web", + Datacenter: "dc1", + ServiceName: "web", + EnterpriseMeta: structs.EnterpriseMetaInitializer(namespace), } empty := &structs.IndexedCheckServiceNodes{ Nodes: structs.CheckServiceNodes{}, @@ -215,8 +218,17 @@ func requireResultsSame(t *testing.T, want, got *structs.IndexedCheckServiceNode require.ElementsMatch(t, wantIDs, gotIDs) } +// getNamespace returns a namespace if namespace support exists, otherwise +// returns the empty string. It allows the same tests to work in both oss and ent +// without duplicating the tests. +func getNamespace(ns string) string { + meta := structs.EnterpriseMetaInitializer(ns) + return meta.GetNamespace() +} + func TestStreamingHealthServices_FullSnapshot(t *testing.T) { - client := NewTestStreamingClient() + namespace := getNamespace("ns2") + client := NewTestStreamingClient(namespace) typ := StreamingHealthServices{deps: MaterializerDeps{ Client: client, Logger: hclog.Default(), @@ -238,8 +250,9 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) { Timeout: 1 * time.Second, } req := &structs.ServiceSpecificRequest{ - Datacenter: "dc1", - ServiceName: "web", + Datacenter: "dc1", + ServiceName: "web", + EnterpriseMeta: structs.EnterpriseMetaInitializer(namespace), } gatherNodes := func(res interface{}) []string { @@ -345,7 +358,8 @@ func TestStreamingHealthServices_FullSnapshot(t *testing.T) { } func TestStreamingHealthServices_EventBatches(t *testing.T) { - client := NewTestStreamingClient() + namespace := getNamespace("ns3") + client := NewTestStreamingClient(namespace) typ := StreamingHealthServices{deps: MaterializerDeps{ Client: client, Logger: hclog.Default(), @@ -366,8 +380,9 @@ func TestStreamingHealthServices_EventBatches(t *testing.T) { Timeout: 1 * time.Second, } req := &structs.ServiceSpecificRequest{ - Datacenter: "dc1", - ServiceName: "web", + Datacenter: "dc1", + ServiceName: "web", + EnterpriseMeta: structs.EnterpriseMetaInitializer(namespace), } gatherNodes := func(res interface{}) []string { @@ -415,7 +430,8 @@ func TestStreamingHealthServices_EventBatches(t *testing.T) { } func TestStreamingHealthServices_Filtering(t *testing.T) { - client := NewTestStreamingClient() + namespace := getNamespace("ns3") + client := NewTestStreamingClient(namespace) typ := StreamingHealthServices{deps: MaterializerDeps{ Client: client, Logger: hclog.Default(), @@ -436,8 +452,9 @@ func TestStreamingHealthServices_Filtering(t *testing.T) { Timeout: 1 * time.Second, } req := &structs.ServiceSpecificRequest{ - Datacenter: "dc1", - ServiceName: "web", + Datacenter: "dc1", + ServiceName: "web", + EnterpriseMeta: structs.EnterpriseMetaInitializer(namespace), QueryOptions: structs.QueryOptions{ Filter: `Node.Node == "node2"`, }, diff --git a/agent/cache-types/streaming_test.go b/agent/cache-types/streaming_test.go index 09721817a0..b12809c3c5 100644 --- a/agent/cache-types/streaming_test.go +++ b/agent/cache-types/streaming_test.go @@ -2,6 +2,7 @@ package cachetype import ( "context" + "fmt" "google.golang.org/grpc" @@ -12,8 +13,9 @@ import ( // for queueing up custom events to a subscriber. type TestStreamingClient struct { pbsubscribe.StateChangeSubscription_SubscribeClient - events chan eventOrErr - ctx context.Context + events chan eventOrErr + ctx context.Context + expectedNamespace string } type eventOrErr struct { @@ -21,17 +23,22 @@ type eventOrErr struct { Event *pbsubscribe.Event } -func NewTestStreamingClient() *TestStreamingClient { +func NewTestStreamingClient(ns string) *TestStreamingClient { return &TestStreamingClient{ - events: make(chan eventOrErr, 32), + events: make(chan eventOrErr, 32), + expectedNamespace: ns, } } func (t *TestStreamingClient) Subscribe( ctx context.Context, - _ *pbsubscribe.SubscribeRequest, + req *pbsubscribe.SubscribeRequest, _ ...grpc.CallOption, ) (pbsubscribe.StateChangeSubscription_SubscribeClient, error) { + if req.Namespace != t.expectedNamespace { + return nil, fmt.Errorf("wrong SubscribeRequest.Namespace %v, expected %v", + req.Namespace, t.expectedNamespace) + } t.ctx = ctx return t, nil } diff --git a/agent/consul/state/catalog_events.go b/agent/consul/state/catalog_events.go index f6af8e019f..0a11a94360 100644 --- a/agent/consul/state/catalog_events.go +++ b/agent/consul/state/catalog_events.go @@ -32,7 +32,8 @@ func (e EventPayloadCheckServiceNode) FilterByKey(key, namespace string) bool { if e.key != "" { name = e.key } - return key == name && namespace == e.Value.Service.EnterpriseMeta.GetNamespace() + ns := e.Value.Service.EnterpriseMeta.GetNamespace() + return (key == "" || key == name) && (namespace == "" || namespace == ns) } // serviceHealthSnapshot returns a stream.SnapshotFunc that provides a snapshot @@ -45,8 +46,8 @@ func serviceHealthSnapshot(db ReadDB, topic stream.Topic) stream.SnapshotFunc { defer tx.Abort() connect := topic == topicServiceHealthConnect - // TODO(namespace-streaming): plumb entMeta through from SubscribeRequest - idx, nodes, err := checkServiceNodesTxn(tx, nil, req.Key, connect, nil) + entMeta := structs.EnterpriseMetaInitializer(req.Namespace) + idx, nodes, err := checkServiceNodesTxn(tx, nil, req.Key, connect, &entMeta) if err != nil { return 0, err } @@ -349,8 +350,7 @@ func getPayloadCheckServiceNode(payload stream.Payload) *structs.CheckServiceNod // parseCheckServiceNodes but is more efficient since we know they are all on // the same node. func newServiceHealthEventsForNode(tx ReadTxn, idx uint64, node string) ([]stream.Event, error) { - // TODO(namespace-streaming): figure out the right EntMeta and mystery arg. - services, err := catalogServiceListByNode(tx, node, nil, false) + services, err := catalogServiceListByNode(tx, node, structs.WildcardEnterpriseMeta(), true) if err != nil { return nil, err } @@ -384,8 +384,7 @@ func getNodeAndChecks(tx ReadTxn, node string) (*structs.Node, serviceChecksFunc } n := nodeRaw.(*structs.Node) - // TODO(namespace-streaming): work out what EntMeta is needed here, wildcard? - iter, err := catalogListChecksByNode(tx, node, nil) + iter, err := catalogListChecksByNode(tx, node, structs.WildcardEnterpriseMeta()) if err != nil { return nil, nil, err } diff --git a/agent/consul/state/catalog_events_test.go b/agent/consul/state/catalog_events_test.go index ac4a07d67e..1efead52ad 100644 --- a/agent/consul/state/catalog_events_test.go +++ b/agent/consul/state/catalog_events_test.go @@ -8,6 +8,8 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" "github.com/stretchr/testify/require" + "github.com/hashicorp/consul/proto/pbcommon" + "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" @@ -1459,3 +1461,98 @@ func newTestEventServiceHealthDeregister(index uint64, nodeNum int, svc string) }, } } + +func TestEventPayloadCheckServiceNode_FilterByKey(t *testing.T) { + type testCase struct { + name string + payload EventPayloadCheckServiceNode + key string + namespace string + expected bool + } + + fn := func(t *testing.T, tc testCase) { + if tc.namespace != "" && pbcommon.DefaultEnterpriseMeta.Namespace == "" { + t.Skip("cant test namespace matching without namespace support") + } + + require.Equal(t, tc.expected, tc.payload.FilterByKey(tc.key, tc.namespace)) + } + + var testCases = []testCase{ + { + name: "no key or namespace", + payload: newPayloadCheckServiceNode("srv1", "ns1"), + expected: true, + }, + { + name: "no key, with namespace match", + payload: newPayloadCheckServiceNode("srv1", "ns1"), + namespace: "ns1", + expected: true, + }, + { + name: "no namespace, with key match", + payload: newPayloadCheckServiceNode("srv1", "ns1"), + key: "srv1", + expected: true, + }, + { + name: "key match, namespace mismatch", + payload: newPayloadCheckServiceNode("srv1", "ns1"), + key: "srv1", + namespace: "ns2", + expected: false, + }, + { + name: "key mismatch, namespace match", + payload: newPayloadCheckServiceNode("srv1", "ns1"), + key: "srv2", + namespace: "ns1", + expected: false, + }, + { + name: "override key match", + payload: newPayloadCheckServiceNodeWithKey("proxy", "ns1", "srv1"), + key: "srv1", + namespace: "ns1", + expected: true, + }, + { + name: "override key match", + payload: newPayloadCheckServiceNodeWithKey("proxy", "ns1", "srv2"), + key: "proxy", + namespace: "ns1", + expected: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + fn(t, tc) + }) + } +} + +func newPayloadCheckServiceNode(service, namespace string) EventPayloadCheckServiceNode { + return EventPayloadCheckServiceNode{ + Value: &structs.CheckServiceNode{ + Service: &structs.NodeService{ + Service: service, + EnterpriseMeta: structs.EnterpriseMetaInitializer(namespace), + }, + }, + } +} + +func newPayloadCheckServiceNodeWithKey(service, namespace, key string) EventPayloadCheckServiceNode { + return EventPayloadCheckServiceNode{ + Value: &structs.CheckServiceNode{ + Service: &structs.NodeService{ + Service: service, + EnterpriseMeta: structs.EnterpriseMetaInitializer(namespace), + }, + }, + key: key, + } +} diff --git a/agent/consul/stream/event_publisher.go b/agent/consul/stream/event_publisher.go index 53a1bf8dd0..379bfdfa8d 100644 --- a/agent/consul/stream/event_publisher.go +++ b/agent/consul/stream/event_publisher.go @@ -267,7 +267,7 @@ func (e *EventPublisher) getCachedSnapshotLocked(req *SubscribeRequest) *eventSn e.snapCache[req.Topic] = topicSnaps } - snap, ok := topicSnaps[req.Key] + snap, ok := topicSnaps[snapCacheKey(req)] if ok && snap.err() == nil { return snap } @@ -279,12 +279,16 @@ func (e *EventPublisher) setCachedSnapshotLocked(req *SubscribeRequest, snap *ev if e.snapCacheTTL == 0 { return } - e.snapCache[req.Topic][req.Key] = snap + e.snapCache[req.Topic][snapCacheKey(req)] = snap // Setup a cache eviction time.AfterFunc(e.snapCacheTTL, func() { e.lock.Lock() defer e.lock.Unlock() - delete(e.snapCache[req.Topic], req.Key) + delete(e.snapCache[req.Topic], snapCacheKey(req)) }) } + +func snapCacheKey(req *SubscribeRequest) string { + return fmt.Sprintf(req.Namespace + "/" + req.Key) +} diff --git a/agent/consul/stream/subscription.go b/agent/consul/stream/subscription.go index bcc76acef7..472b0ce90d 100644 --- a/agent/consul/stream/subscription.go +++ b/agent/consul/stream/subscription.go @@ -134,7 +134,7 @@ func newEventFromBatch(req SubscribeRequest, events []Event) Event { func filterByKey(req SubscribeRequest, events []Event) (Event, bool) { event := newEventFromBatch(req, events) - if req.Key == "" { + if req.Key == "" && req.Namespace == "" { return event, true } diff --git a/agent/consul/stream/subscription_test.go b/agent/consul/stream/subscription_test.go index dc6c8a06c2..2c192b1840 100644 --- a/agent/consul/stream/subscription_test.go +++ b/agent/consul/stream/subscription_test.go @@ -138,63 +138,147 @@ func publishTestEvent(index uint64, b *eventBuffer, key string) { b.Append([]Event{e}) } -func TestFilter_NoKey(t *testing.T) { - events := make(PayloadEvents, 0, 5) - events = append(events, newSimpleEvent("One", 102), newSimpleEvent("Two", 102)) - - req := SubscribeRequest{Topic: testTopic} - actual, ok := filterByKey(req, events) - require.True(t, ok) - require.Equal(t, Event{Topic: testTopic, Index: 102, Payload: events}, actual) - - // test that a new array was not allocated - require.Equal(t, cap(actual.Payload.(PayloadEvents)), 5) -} - func newSimpleEvent(key string, index uint64) Event { return Event{Index: index, Payload: simplePayload{key: key}} } -func TestFilter_WithKey_AllEventsMatch(t *testing.T) { - events := make(PayloadEvents, 0, 5) - events = append(events, newSimpleEvent("Same", 103), newSimpleEvent("Same", 103)) - - req := SubscribeRequest{Topic: testTopic, Key: "Same"} - actual, ok := filterByKey(req, events) - require.True(t, ok) - expected := Event{Topic: testTopic, Index: 103, Payload: events} - require.Equal(t, expected, actual) - - // test that a new array was not allocated - require.Equal(t, 5, cap(actual.Payload.(PayloadEvents))) -} - -func TestFilter_WithKey_SomeEventsMatch(t *testing.T) { - events := make([]Event, 0, 5) - events = append(events, - newSimpleEvent("Same", 104), - newSimpleEvent("Other", 0), - newSimpleEvent("Same", 0)) - - req := SubscribeRequest{Topic: testTopic, Key: "Same"} - actual, ok := filterByKey(req, events) - require.True(t, ok) - expected := Event{ - Topic: testTopic, - Index: 104, - Payload: PayloadEvents{newSimpleEvent("Same", 104), newSimpleEvent("Same", 0)}, +func TestFilterByKey(t *testing.T) { + type testCase struct { + name string + req SubscribeRequest + events []Event + expectEvent bool + expected Event + expectedCap int } - require.Equal(t, expected, actual) - // test that a new array was allocated with the correct size - require.Equal(t, cap(actual.Payload.(PayloadEvents)), 2) + fn := func(t *testing.T, tc testCase) { + events := make(PayloadEvents, 0, 5) + events = append(events, tc.events...) + + actual, ok := filterByKey(tc.req, events) + require.Equal(t, tc.expectEvent, ok) + if !tc.expectEvent { + return + } + + require.Equal(t, tc.expected, actual) + // test if there was a new array allocated or not + require.Equal(t, tc.expectedCap, cap(actual.Payload.(PayloadEvents))) + } + + var testCases = []testCase{ + { + name: "all events match, no key or namespace", + req: SubscribeRequest{Topic: testTopic}, + events: []Event{ + newSimpleEvent("One", 102), + newSimpleEvent("Two", 102)}, + expectEvent: true, + expected: Event{ + Topic: testTopic, + Index: 102, + Payload: PayloadEvents{ + newSimpleEvent("One", 102), + newSimpleEvent("Two", 102)}}, + expectedCap: 5, + }, + { + name: "all events match, no namespace", + req: SubscribeRequest{Topic: testTopic, Key: "Same"}, + events: []Event{ + newSimpleEvent("Same", 103), + newSimpleEvent("Same", 103)}, + expectEvent: true, + expected: Event{ + Topic: testTopic, + Index: 103, + Payload: PayloadEvents{ + newSimpleEvent("Same", 103), + newSimpleEvent("Same", 103)}}, + expectedCap: 5, + }, + { + name: "all events match, no key", + req: SubscribeRequest{Topic: testTopic, Namespace: "apps"}, + events: []Event{ + newNSEvent("Something", "apps"), + newNSEvent("Other", "apps")}, + expectEvent: true, + expected: Event{ + Topic: testTopic, + Index: 22, + Payload: PayloadEvents{ + newNSEvent("Something", "apps"), + newNSEvent("Other", "apps")}}, + expectedCap: 5, + }, + { + name: "some evens match, no namespace", + req: SubscribeRequest{Topic: testTopic, Key: "Same"}, + events: []Event{ + newSimpleEvent("Same", 104), + newSimpleEvent("Other", 104), + newSimpleEvent("Same", 104)}, + expectEvent: true, + expected: Event{ + Topic: testTopic, + Index: 104, + Payload: PayloadEvents{ + newSimpleEvent("Same", 104), + newSimpleEvent("Same", 104)}}, + expectedCap: 2, + }, + { + name: "some events match, no key", + req: SubscribeRequest{Topic: testTopic, Namespace: "apps"}, + events: []Event{ + newNSEvent("app1", "apps"), + newNSEvent("db1", "dbs"), + newNSEvent("app2", "apps")}, + expectEvent: true, + expected: Event{ + Topic: testTopic, + Index: 22, + Payload: PayloadEvents{ + newNSEvent("app1", "apps"), + newNSEvent("app2", "apps")}}, + expectedCap: 2, + }, + { + name: "no events match key", + req: SubscribeRequest{Topic: testTopic, Key: "Other"}, + events: []Event{ + newSimpleEvent("Same", 0), + newSimpleEvent("Same", 0)}, + }, + { + name: "no events match namespace", + req: SubscribeRequest{Topic: testTopic, Namespace: "apps"}, + events: []Event{ + newNSEvent("app1", "group1"), + newNSEvent("app2", "group2")}, + expectEvent: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + fn(t, tc) + }) + } } -func TestFilter_WithKey_NoEventsMatch(t *testing.T) { - events := make([]Event, 0, 5) - events = append(events, newSimpleEvent("Same", 0), newSimpleEvent("Same", 0)) - - req := SubscribeRequest{Topic: testTopic, Key: "Other"} - _, ok := filterByKey(req, events) - require.False(t, ok) +func newNSEvent(key, namespace string) Event { + return Event{Index: 22, Payload: nsPayload{key: key, namespace: namespace}} +} + +type nsPayload struct { + key string + namespace string + value string +} + +func (p nsPayload) FilterByKey(key, namespace string) bool { + return (key == "" || key == p.key) && (namespace == "" || namespace == p.namespace) } diff --git a/agent/consul/subscribe_backend.go b/agent/consul/subscribe_backend.go index 56f2bac01a..d1888911d8 100644 --- a/agent/consul/subscribe_backend.go +++ b/agent/consul/subscribe_backend.go @@ -7,6 +7,7 @@ import ( "github.com/hashicorp/consul/agent/consul/stream" agentgrpc "github.com/hashicorp/consul/agent/grpc" "github.com/hashicorp/consul/agent/rpc/subscribe" + "github.com/hashicorp/consul/agent/structs" ) type subscribeBackend struct { @@ -16,8 +17,12 @@ type subscribeBackend struct { // TODO: refactor Resolve methods to an ACLBackend that can be used by all // the endpoints. -func (s subscribeBackend) ResolveToken(token string) (acl.Authorizer, error) { - return s.srv.ResolveToken(token) +func (s subscribeBackend) ResolveTokenAndDefaultMeta( + token string, + entMeta *structs.EnterpriseMeta, + authzContext *acl.AuthorizerContext, +) (acl.Authorizer, error) { + return s.srv.ResolveTokenAndDefaultMeta(token, entMeta, authzContext) } var _ subscribe.Backend = (*subscribeBackend)(nil) diff --git a/agent/rpc/subscribe/logger.go b/agent/rpc/subscribe/logger.go index 8615fbd905..ddddb20ca5 100644 --- a/agent/rpc/subscribe/logger.go +++ b/agent/rpc/subscribe/logger.go @@ -37,11 +37,12 @@ func (s *streamID) String() string { return s.id } -func (h *Server) newLoggerForRequest(req *pbsubscribe.SubscribeRequest) Logger { - return h.Logger.With( +func newLoggerForRequest(l Logger, req *pbsubscribe.SubscribeRequest) Logger { + return l.With( "topic", req.Topic.String(), "dc", req.Datacenter, "key", req.Key, + "namespace", req.Namespace, "index", req.Index, "stream_id", &streamID{}) } diff --git a/agent/rpc/subscribe/subscribe.go b/agent/rpc/subscribe/subscribe.go index 93c5e65d6c..71919babab 100644 --- a/agent/rpc/subscribe/subscribe.go +++ b/agent/rpc/subscribe/subscribe.go @@ -12,6 +12,7 @@ import ( "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/stream" + "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/proto/pbservice" "github.com/hashicorp/consul/proto/pbsubscribe" ) @@ -35,15 +36,13 @@ type Logger interface { var _ pbsubscribe.StateChangeSubscriptionServer = (*Server)(nil) type Backend interface { - // TODO(streaming): Use ResolveTokenAndDefaultMeta instead once SubscribeRequest - // has an EnterpriseMeta. - ResolveToken(token string) (acl.Authorizer, error) + ResolveTokenAndDefaultMeta(token string, entMeta *structs.EnterpriseMeta, authzContext *acl.AuthorizerContext) (acl.Authorizer, error) Forward(dc string, f func(*grpc.ClientConn) error) (handled bool, err error) Subscribe(req *stream.SubscribeRequest) (*stream.Subscription, error) } func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsubscribe.StateChangeSubscription_SubscribeServer) error { - logger := h.newLoggerForRequest(req) + logger := newLoggerForRequest(h.Logger, req) handled, err := h.Backend.Forward(req.Datacenter, forwardToDC(req, serverStream, logger)) if handled || err != nil { return err @@ -52,13 +51,13 @@ func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsub logger.Trace("new subscription") defer logger.Trace("subscription closed") - // Resolve the token and create the ACL filter. - authz, err := h.Backend.ResolveToken(req.Token) + entMeta := structs.EnterpriseMetaInitializer(req.Namespace) + authz, err := h.Backend.ResolveTokenAndDefaultMeta(req.Token, &entMeta, nil) if err != nil { return err } - sub, err := h.Backend.Subscribe(toStreamSubscribeRequest(req)) + sub, err := h.Backend.Subscribe(toStreamSubscribeRequest(req, entMeta)) if err != nil { return err } @@ -90,13 +89,13 @@ func (h *Server) Subscribe(req *pbsubscribe.SubscribeRequest, serverStream pbsub } } -// TODO: can be replaced by mog conversion -func toStreamSubscribeRequest(req *pbsubscribe.SubscribeRequest) *stream.SubscribeRequest { +func toStreamSubscribeRequest(req *pbsubscribe.SubscribeRequest, entMeta structs.EnterpriseMeta) *stream.SubscribeRequest { return &stream.SubscribeRequest{ - Topic: req.Topic, - Key: req.Key, - Token: req.Token, - Index: req.Index, + Topic: req.Topic, + Key: req.Key, + Token: req.Token, + Index: req.Index, + Namespace: entMeta.GetNamespace(), } } diff --git a/agent/rpc/subscribe/subscribe_test.go b/agent/rpc/subscribe/subscribe_test.go index cd5ebead82..bc41ed1e88 100644 --- a/agent/rpc/subscribe/subscribe_test.go +++ b/agent/rpc/subscribe/subscribe_test.go @@ -93,8 +93,9 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) { runStep(t, "setup a client and subscribe to a topic", func(t *testing.T) { streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn) streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{ - Topic: pbsubscribe.Topic_ServiceHealth, - Key: "redis", + Topic: pbsubscribe.Topic_ServiceHealth, + Key: "redis", + Namespace: pbcommon.DefaultEnterpriseMeta.Namespace, }) require.NoError(t, err) @@ -130,7 +131,7 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) { Expose: pbservice.ExposeConfig{}, }, RaftIndex: raftIndex(ids, "reg2", "reg2"), - EnterpriseMeta: pbcommon.EnterpriseMeta{}, + EnterpriseMeta: pbcommon.DefaultEnterpriseMeta, }, }, }, @@ -160,7 +161,7 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) { Expose: pbservice.ExposeConfig{}, }, RaftIndex: raftIndex(ids, "reg3", "reg3"), - EnterpriseMeta: pbcommon.EnterpriseMeta{}, + EnterpriseMeta: pbcommon.DefaultEnterpriseMeta, }, }, }, @@ -209,7 +210,7 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) { Expose: pbservice.ExposeConfig{}, }, RaftIndex: raftIndex(ids, "reg3", "reg3"), - EnterpriseMeta: pbcommon.EnterpriseMeta{}, + EnterpriseMeta: pbcommon.DefaultEnterpriseMeta, }, Checks: []*pbservice.HealthCheck{ { @@ -220,7 +221,7 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) { ServiceID: "redis1", ServiceName: "redis", RaftIndex: raftIndex(ids, "update", "update"), - EnterpriseMeta: pbcommon.EnterpriseMeta{}, + EnterpriseMeta: pbcommon.DefaultEnterpriseMeta, }, }, }, @@ -261,7 +262,7 @@ func getEvent(t *testing.T, ch chan eventOrError) *pbsubscribe.Event { case item := <-ch: require.NoError(t, item.err) return item.event - case <-time.After(10 * time.Second): + case <-time.After(2 * time.Second): t.Fatalf("timeout waiting on event from server") } return nil @@ -280,7 +281,11 @@ type testBackend struct { forwardConn *gogrpc.ClientConn } -func (b testBackend) ResolveToken(token string) (acl.Authorizer, error) { +func (b testBackend) ResolveTokenAndDefaultMeta( + token string, + _ *structs.EnterpriseMeta, + _ *acl.AuthorizerContext, +) (acl.Authorizer, error) { return b.authorizer(token), nil } @@ -440,6 +445,7 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) { Topic: pbsubscribe.Topic_ServiceHealth, Key: "redis", Datacenter: "dc2", + Namespace: pbcommon.DefaultEnterpriseMeta.Namespace, }) require.NoError(t, err) go recvEvents(chEvents, streamHandle) @@ -474,7 +480,7 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) { MeshGateway: pbservice.MeshGatewayConfig{}, Expose: pbservice.ExposeConfig{}, }, - EnterpriseMeta: pbcommon.EnterpriseMeta{}, + EnterpriseMeta: pbcommon.DefaultEnterpriseMeta, RaftIndex: raftIndex(ids, "reg2", "reg2"), }, }, @@ -504,7 +510,7 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) { MeshGateway: pbservice.MeshGatewayConfig{}, Expose: pbservice.ExposeConfig{}, }, - EnterpriseMeta: pbcommon.EnterpriseMeta{}, + EnterpriseMeta: pbcommon.DefaultEnterpriseMeta, RaftIndex: raftIndex(ids, "reg3", "reg3"), }, }, @@ -554,7 +560,7 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) { MeshGateway: pbservice.MeshGatewayConfig{}, Expose: pbservice.ExposeConfig{}, }, - EnterpriseMeta: pbcommon.EnterpriseMeta{}, + EnterpriseMeta: pbcommon.DefaultEnterpriseMeta, }, Checks: []*pbservice.HealthCheck{ { @@ -565,7 +571,7 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) { ServiceID: "redis1", ServiceName: "redis", RaftIndex: raftIndex(ids, "update", "update"), - EnterpriseMeta: pbcommon.EnterpriseMeta{}, + EnterpriseMeta: pbcommon.DefaultEnterpriseMeta, }, }, }, @@ -595,10 +601,8 @@ node "node1" { policy = "write" } ` - authorizer, err := acl.NewAuthorizerFromRules( - "1", 0, rules, acl.SyntaxCurrent, - &acl.Config{WildcardName: structs.WildcardSpecifier}, - nil) + cfg := &acl.Config{WildcardName: structs.WildcardSpecifier} + authorizer, err := acl.NewAuthorizerFromRules("1", 0, rules, acl.SyntaxCurrent, cfg, nil) require.NoError(t, err) authorizer = acl.NewChainedAuthorizer([]acl.Authorizer{authorizer, acl.DenyAll()}) require.Equal(t, acl.Deny, authorizer.NodeRead("denied", nil)) @@ -676,9 +680,10 @@ node "node1" { runStep(t, "setup a client, subscribe to a topic, and receive a snapshot", func(t *testing.T) { streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{ - Topic: pbsubscribe.Topic_ServiceHealth, - Key: "foo", - Token: token, + Topic: pbsubscribe.Topic_ServiceHealth, + Key: "foo", + Token: token, + Namespace: pbcommon.DefaultEnterpriseMeta.Namespace, }) require.NoError(t, err) diff --git a/agent/structs/structs.go b/agent/structs/structs.go index 46a6ba3992..5e0956a062 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -1626,9 +1626,8 @@ func (csn *CheckServiceNode) CanRead(authz acl.Authorizer) acl.EnforcementDecisi return acl.Deny } - // TODO(streaming): add enterprise test that uses namespaces authzContext := new(acl.AuthorizerContext) - csn.Service.FillAuthzContext(authzContext) + csn.Service.EnterpriseMeta.FillAuthzContext(authzContext) if authz.NodeRead(csn.Node.Node, authzContext) != acl.Allow { return acl.Deny diff --git a/agent/structs/structs_test.go b/agent/structs/structs_test.go index 0b4e9c497e..d6d85aee29 100644 --- a/agent/structs/structs_test.go +++ b/agent/structs/structs_test.go @@ -1290,7 +1290,7 @@ func TestCheckServiceNodes_Filter(t *testing.T) { } } -func TestCheckServiceNodes_CanRead(t *testing.T) { +func TestCheckServiceNode_CanRead(t *testing.T) { type testCase struct { name string csn CheckServiceNode diff --git a/proto/pbcommon/common_oss.go b/proto/pbcommon/common_oss.go new file mode 100644 index 0000000000..024f207faf --- /dev/null +++ b/proto/pbcommon/common_oss.go @@ -0,0 +1,5 @@ +// +build !consulent + +package pbcommon + +var DefaultEnterpriseMeta = EnterpriseMeta{} diff --git a/proto/pbservice/convert_oss.go b/proto/pbservice/convert_oss.go index 81048ca8d0..f49a84ac7f 100644 --- a/proto/pbservice/convert_oss.go +++ b/proto/pbservice/convert_oss.go @@ -1,3 +1,5 @@ +// +build !consulent + package pbservice import ( diff --git a/proto/pbservice/convert_oss_test.go b/proto/pbservice/convert_oss_test.go new file mode 100644 index 0000000000..7848e04d02 --- /dev/null +++ b/proto/pbservice/convert_oss_test.go @@ -0,0 +1,12 @@ +// +build !consulent + +package pbservice + +import ( + fuzz "github.com/google/gofuzz" + + "github.com/hashicorp/consul/agent/structs" +) + +func randEnterpriseMeta(_ *structs.EnterpriseMeta, _ fuzz.Continue) { +} diff --git a/proto/pbservice/convert_test.go b/proto/pbservice/convert_test.go index d2b81fdb3a..24889dc88a 100644 --- a/proto/pbservice/convert_test.go +++ b/proto/pbservice/convert_test.go @@ -108,8 +108,3 @@ func randInterface(m *interface{}, c fuzz.Continue) { } } } - -// TODO(streaming): this is a quick fix to get the tests passing in enterprise. -// This needs to use a real random value once enterprise support is complete. -func randEnterpriseMeta(_ *structs.EnterpriseMeta, _ fuzz.Continue) { -}