Consul is a distributed, highly available, and data center aware solution to connect and configure applications across dynamic, distributed infrastructure.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

160 lines
3.8 KiB

package submatview
import (
"context"
"testing"
"time"
mock "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/acl/resolver"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/pbsubscribe"
)
func TestLocalMaterializer(t *testing.T) {
const (
index = 123
topic = pbsubscribe.Topic_ServiceResolver
key = "web"
token = "some-acl-token"
)
var (
snapshotEvent = stream.Event{
Topic: topic,
Index: index,
Payload: state.EventPayloadConfigEntry{
Value: &structs.ServiceResolverConfigEntry{
Name: key,
Meta: map[string]string{"snapshot": "true"},
},
},
}
publishedEvent1 = stream.Event{
Topic: topic,
Index: index + 1,
Payload: state.EventPayloadConfigEntry{
Value: &structs.ServiceResolverConfigEntry{
Name: key,
Meta: map[string]string{"published": "true"},
},
},
}
publishedEvent2 = stream.Event{
Topic: topic,
Index: index + 2,
Payload: state.EventPayloadConfigEntry{
Value: &structs.ServiceResolverConfigEntry{
Name: key,
Meta: map[string]string{"published": "true"},
},
},
}
)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
publisher := stream.NewEventPublisher(10 * time.Second)
publisher.RegisterHandler(topic, func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
buf.Append([]stream.Event{snapshotEvent})
return index, nil
}, false)
go publisher.Run(ctx)
// This allows us to swap the authorizer out at runtime.
authz := &struct{ acl.Authorizer }{acl.AllowAll()}
aclResolver := NewMockACLResolver(t)
aclResolver.On("ResolveTokenAndDefaultMeta", token, mock.Anything, mock.Anything).
Return(resolver.Result{Authorizer: authz}, nil)
view := newTestView()
m := NewLocalMaterializer(LocalMaterializerDeps{
Backend: publisher,
ACLResolver: aclResolver,
Deps: Deps{
View: view,
Request: func(index uint64) *pbsubscribe.SubscribeRequest {
return &pbsubscribe.SubscribeRequest{
Topic: topic,
Index: index,
Subject: &pbsubscribe.SubscribeRequest_NamedSubject{
NamedSubject: &pbsubscribe.NamedSubject{
Key: key,
},
},
Token: token,
}
},
},
})
go m.Run(ctx)
// Check that the view received the snapshot events.
events := view.getEvents(t)
require.Len(t, events, 1)
require.Equal(t, snapshotEvent.Payload.ToSubscriptionEvent(index), events[0])
publisher.Publish([]stream.Event{publishedEvent1})
// Check that the view received the published events.
events = view.getEvents(t)
require.Len(t, events, 1)
require.Equal(t, publishedEvent1.Payload.ToSubscriptionEvent(index+1), events[0])
// Replace the authorizer and check that we don't receive newly published events.
authz.Authorizer = acl.DenyAll()
publisher.Publish([]stream.Event{publishedEvent2})
view.expectNoEvents(t)
}
func newTestView() *testView {
return &testView{
eventsCh: make(chan []*pbsubscribe.Event),
}
}
type testView struct {
eventsCh chan []*pbsubscribe.Event
}
func (testView) Reset() {}
func (testView) Result(uint64) any { return nil }
func (v *testView) Update(events []*pbsubscribe.Event) error {
v.eventsCh <- events
return nil
}
func (v *testView) getEvents(t *testing.T) []*pbsubscribe.Event {
t.Helper()
select {
case events := <-v.eventsCh:
return events
case <-time.After(100 * time.Millisecond):
t.Fatal("timeout waiting for view to receive events")
return nil
}
}
func (v *testView) expectNoEvents(t *testing.T) {
t.Helper()
select {
case <-v.eventsCh:
t.Fatal("expected no events to be received")
case <-time.After(100 * time.Millisecond):
}
}