mirror of https://github.com/hashicorp/consul
subscribe: Add steps to rpc/subscribe tests
To make them easier to followpull/8893/head
parent
57a7057067
commit
f185124320
|
@ -32,11 +32,39 @@ import (
|
|||
func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
|
||||
backend, err := newTestBackend()
|
||||
require.NoError(t, err)
|
||||
srv := NewServer(backend, hclog.New(nil))
|
||||
addr := newTestServer(t, srv)
|
||||
addr := runTestServer(t, NewServer(backend, hclog.New(nil)))
|
||||
ids := newCounter()
|
||||
|
||||
{
|
||||
var req *structs.RegisterRequest
|
||||
runStep(t, "register two instances of the redis service", func(t *testing.T) {
|
||||
req = &structs.RegisterRequest{
|
||||
Node: "node1",
|
||||
Address: "3.4.5.6",
|
||||
Datacenter: "dc1",
|
||||
Service: &structs.NodeService{
|
||||
ID: "redis1",
|
||||
Service: "redis",
|
||||
Address: "3.4.5.6",
|
||||
Port: 8080,
|
||||
},
|
||||
}
|
||||
require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg2"), req))
|
||||
|
||||
req = &structs.RegisterRequest{
|
||||
Node: "node2",
|
||||
Address: "1.2.3.4",
|
||||
Datacenter: "dc1",
|
||||
Service: &structs.NodeService{
|
||||
ID: "redis1",
|
||||
Service: "redis",
|
||||
Address: "1.1.1.1",
|
||||
Port: 8080,
|
||||
},
|
||||
}
|
||||
require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg3"), req))
|
||||
})
|
||||
|
||||
runStep(t, "register a service by a different name", func(t *testing.T) {
|
||||
req := &structs.RegisterRequest{
|
||||
Node: "other",
|
||||
Address: "2.3.4.5",
|
||||
|
@ -49,33 +77,7 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
|
|||
},
|
||||
}
|
||||
require.NoError(t, backend.store.EnsureRegistration(ids.Next("other"), req))
|
||||
}
|
||||
{
|
||||
req := &structs.RegisterRequest{
|
||||
Node: "node1",
|
||||
Address: "3.4.5.6",
|
||||
Datacenter: "dc1",
|
||||
Service: &structs.NodeService{
|
||||
ID: "redis1",
|
||||
Service: "redis",
|
||||
Address: "3.4.5.6",
|
||||
Port: 8080,
|
||||
},
|
||||
}
|
||||
require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg2"), req))
|
||||
}
|
||||
req := &structs.RegisterRequest{
|
||||
Node: "node2",
|
||||
Address: "1.2.3.4",
|
||||
Datacenter: "dc1",
|
||||
Service: &structs.NodeService{
|
||||
ID: "redis1",
|
||||
Service: "redis",
|
||||
Address: "1.1.1.1",
|
||||
Port: 8080,
|
||||
},
|
||||
}
|
||||
require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg3"), req))
|
||||
})
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
t.Cleanup(cancel)
|
||||
|
@ -84,6 +86,10 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
t.Cleanup(logError(t, conn.Close))
|
||||
|
||||
chEvents := make(chan eventOrError, 0)
|
||||
var snapshotEvents []*pbsubscribe.Event
|
||||
|
||||
runStep(t, "setup a client and subscribe to a topic", func(t *testing.T) {
|
||||
streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn)
|
||||
streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{
|
||||
Topic: pbsubscribe.Topic_ServiceHealth,
|
||||
|
@ -91,19 +97,18 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
|
|||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
chEvents := make(chan eventOrError, 0)
|
||||
go recvEvents(chEvents, streamHandle)
|
||||
|
||||
var snapshotEvents []*pbsubscribe.Event
|
||||
for i := 0; i < 3; i++ {
|
||||
snapshotEvents = append(snapshotEvents, getEvent(t, chEvents))
|
||||
}
|
||||
})
|
||||
|
||||
runStep(t, "receive the initial snapshot of events", func(t *testing.T) {
|
||||
expected := []*pbsubscribe.Event{
|
||||
{
|
||||
Topic: pbsubscribe.Topic_ServiceHealth,
|
||||
Key: "redis",
|
||||
Index: ids.Last(),
|
||||
Index: ids.For("reg3"),
|
||||
Payload: &pbsubscribe.Event_ServiceHealth{
|
||||
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
|
||||
Op: pbsubscribe.CatalogOp_Register,
|
||||
|
@ -135,7 +140,7 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
|
|||
{
|
||||
Topic: pbsubscribe.Topic_ServiceHealth,
|
||||
Key: "redis",
|
||||
Index: ids.Last(),
|
||||
Index: ids.For("reg3"),
|
||||
Payload: &pbsubscribe.Event_ServiceHealth{
|
||||
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
|
||||
Op: pbsubscribe.CatalogOp_Register,
|
||||
|
@ -167,16 +172,17 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
|
|||
{
|
||||
Topic: pbsubscribe.Topic_ServiceHealth,
|
||||
Key: "redis",
|
||||
Index: ids.Last(),
|
||||
Index: ids.For("reg3"),
|
||||
Payload: &pbsubscribe.Event_EndOfSnapshot{EndOfSnapshot: true},
|
||||
},
|
||||
}
|
||||
assertDeepEqual(t, expected, snapshotEvents)
|
||||
})
|
||||
|
||||
// Update the registration by adding a check.
|
||||
runStep(t, "update the registration by adding a check", func(t *testing.T) {
|
||||
req.Check = &structs.HealthCheck{
|
||||
Node: "node2",
|
||||
CheckID: types.CheckID("check1"),
|
||||
CheckID: "check1",
|
||||
ServiceID: "redis1",
|
||||
ServiceName: "redis",
|
||||
Name: "check 1",
|
||||
|
@ -229,6 +235,7 @@ func TestServer_Subscribe_IntegrationWithBackend(t *testing.T) {
|
|||
},
|
||||
}
|
||||
assertDeepEqual(t, expectedEvent, event)
|
||||
})
|
||||
}
|
||||
|
||||
type eventOrError struct {
|
||||
|
@ -312,7 +319,7 @@ func newTestBackend() (*testBackend, error) {
|
|||
|
||||
var _ Backend = (*testBackend)(nil)
|
||||
|
||||
func newTestServer(t *testing.T, server *Server) net.Addr {
|
||||
func runTestServer(t *testing.T, server *Server) net.Addr {
|
||||
addr := &net.IPAddr{IP: net.ParseIP("127.0.0.1")}
|
||||
var grpcServer *gogrpc.Server
|
||||
handler := grpc.NewHandler(addr, func(srv *gogrpc.Server) {
|
||||
|
@ -373,12 +380,12 @@ func raftIndex(ids *counter, created, modified string) pbcommon.RaftIndex {
|
|||
func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
|
||||
backendLocal, err := newTestBackend()
|
||||
require.NoError(t, err)
|
||||
addrLocal := newTestServer(t, NewServer(backendLocal, hclog.New(nil)))
|
||||
addrLocal := runTestServer(t, NewServer(backendLocal, hclog.New(nil)))
|
||||
|
||||
backendRemoteDC, err := newTestBackend()
|
||||
require.NoError(t, err)
|
||||
srvRemoteDC := NewServer(backendRemoteDC, hclog.New(nil))
|
||||
addrRemoteDC := newTestServer(t, srvRemoteDC)
|
||||
addrRemoteDC := runTestServer(t, srvRemoteDC)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
t.Cleanup(cancel)
|
||||
|
@ -389,8 +396,10 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
|
|||
backendLocal.forwardConn = connRemoteDC
|
||||
|
||||
ids := newCounter()
|
||||
{
|
||||
req := &structs.RegisterRequest{
|
||||
|
||||
var req *structs.RegisterRequest
|
||||
runStep(t, "register three services", func(t *testing.T) {
|
||||
req = &structs.RegisterRequest{
|
||||
Node: "other",
|
||||
Address: "2.3.4.5",
|
||||
Datacenter: "dc2",
|
||||
|
@ -402,9 +411,7 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
|
|||
},
|
||||
}
|
||||
require.NoError(t, backendRemoteDC.store.EnsureRegistration(ids.Next("reg1"), req))
|
||||
}
|
||||
{
|
||||
req := &structs.RegisterRequest{
|
||||
req = &structs.RegisterRequest{
|
||||
Node: "node1",
|
||||
Address: "3.4.5.6",
|
||||
Datacenter: "dc2",
|
||||
|
@ -416,9 +423,7 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
|
|||
},
|
||||
}
|
||||
require.NoError(t, backendRemoteDC.store.EnsureRegistration(ids.Next("reg2"), req))
|
||||
}
|
||||
|
||||
req := &structs.RegisterRequest{
|
||||
req = &structs.RegisterRequest{
|
||||
Node: "node2",
|
||||
Address: "1.2.3.4",
|
||||
Datacenter: "dc2",
|
||||
|
@ -430,11 +435,16 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
|
|||
},
|
||||
}
|
||||
require.NoError(t, backendRemoteDC.store.EnsureRegistration(ids.Next("reg3"), req))
|
||||
})
|
||||
|
||||
connLocal, err := gogrpc.DialContext(ctx, addrLocal.String(), gogrpc.WithInsecure())
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(logError(t, connLocal.Close))
|
||||
|
||||
chEvents := make(chan eventOrError, 0)
|
||||
var snapshotEvents []*pbsubscribe.Event
|
||||
|
||||
runStep(t, "setup a client and subscribe to a topic", func(t *testing.T) {
|
||||
streamClient := pbsubscribe.NewStateChangeSubscriptionClient(connLocal)
|
||||
streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{
|
||||
Topic: pbsubscribe.Topic_ServiceHealth,
|
||||
|
@ -442,15 +452,14 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
|
|||
Datacenter: "dc2",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
chEvents := make(chan eventOrError, 0)
|
||||
go recvEvents(chEvents, streamHandle)
|
||||
|
||||
var snapshotEvents []*pbsubscribe.Event
|
||||
for i := 0; i < 3; i++ {
|
||||
snapshotEvents = append(snapshotEvents, getEvent(t, chEvents))
|
||||
}
|
||||
})
|
||||
|
||||
runStep(t, "receive the initial snapshot of events", func(t *testing.T) {
|
||||
expected := []*pbsubscribe.Event{
|
||||
{
|
||||
Topic: pbsubscribe.Topic_ServiceHealth,
|
||||
|
@ -524,8 +533,9 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
|
|||
},
|
||||
}
|
||||
assertDeepEqual(t, expected, snapshotEvents)
|
||||
})
|
||||
|
||||
// Update the registration by adding a check.
|
||||
runStep(t, "update the registration by adding a check", func(t *testing.T) {
|
||||
req.Check = &structs.HealthCheck{
|
||||
Node: "node2",
|
||||
CheckID: types.CheckID("check1"),
|
||||
|
@ -581,6 +591,7 @@ func TestServer_Subscribe_IntegrationWithBackend_ForwardToDC(t *testing.T) {
|
|||
},
|
||||
}
|
||||
assertDeepEqual(t, expectedEvent, event)
|
||||
})
|
||||
}
|
||||
|
||||
// TODO: test case for converting stream.Events to pbsubscribe.Events, including framing events
|
||||
|
@ -592,10 +603,10 @@ func TestServer_Subscribe_IntegrationWithBackend_FilterEventsByACLToken(t *testi
|
|||
|
||||
backend, err := newTestBackend()
|
||||
require.NoError(t, err)
|
||||
srv := NewServer(backend, hclog.New(nil))
|
||||
addr := newTestServer(t, srv)
|
||||
addr := runTestServer(t, NewServer(backend, hclog.New(nil)))
|
||||
token := "this-token-is-good"
|
||||
|
||||
// Create a policy for the test token.
|
||||
runStep(t, "create an ACL policy", func(t *testing.T) {
|
||||
rules := `
|
||||
service "foo" {
|
||||
policy = "write"
|
||||
|
@ -613,17 +624,19 @@ node "node1" {
|
|||
require.Equal(t, acl.Deny, authorizer.NodeRead("denied", nil))
|
||||
|
||||
// TODO: is there any easy way to do this with the acl package?
|
||||
token := "this-token-is-good"
|
||||
backend.authorizer = func(tok string) acl.Authorizer {
|
||||
if tok == token {
|
||||
return authorizer
|
||||
}
|
||||
return acl.DenyAll()
|
||||
}
|
||||
})
|
||||
|
||||
ids := newCounter()
|
||||
{
|
||||
req := &structs.RegisterRequest{
|
||||
var req *structs.RegisterRequest
|
||||
|
||||
runStep(t, "register services", func(t *testing.T) {
|
||||
req = &structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "node1",
|
||||
Address: "127.0.0.1",
|
||||
|
@ -669,7 +682,7 @@ node "node1" {
|
|||
},
|
||||
}
|
||||
require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg3"), req))
|
||||
}
|
||||
})
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
t.Cleanup(cancel)
|
||||
|
@ -679,8 +692,9 @@ node "node1" {
|
|||
t.Cleanup(logError(t, conn.Close))
|
||||
streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn)
|
||||
|
||||
// Start a Subscribe call to our streaming endpoint for the service we have access to.
|
||||
{
|
||||
chEvents := make(chan eventOrError, 0)
|
||||
|
||||
runStep(t, "setup a client, subscribe to a topic, and receive a snapshot", func(t *testing.T) {
|
||||
streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{
|
||||
Topic: pbsubscribe.Topic_ServiceHealth,
|
||||
Key: "foo",
|
||||
|
@ -688,7 +702,6 @@ node "node1" {
|
|||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
chEvents := make(chan eventOrError, 0)
|
||||
go recvEvents(chEvents, streamHandle)
|
||||
|
||||
event := getEvent(t, chEvents)
|
||||
|
@ -696,9 +709,10 @@ node "node1" {
|
|||
require.Equal(t, "node1", event.GetServiceHealth().CheckServiceNode.Node.Node)
|
||||
|
||||
require.True(t, getEvent(t, chEvents).GetEndOfSnapshot())
|
||||
})
|
||||
|
||||
// Update the service with a new port to trigger a new event.
|
||||
req := &structs.RegisterRequest{
|
||||
runStep(t, "update the service to receive an event", func(t *testing.T) {
|
||||
req = &structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "node1",
|
||||
Address: "127.0.0.1",
|
||||
|
@ -718,12 +732,13 @@ node "node1" {
|
|||
}
|
||||
require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg4"), req))
|
||||
|
||||
event = getEvent(t, chEvents)
|
||||
event := getEvent(t, chEvents)
|
||||
service := event.GetServiceHealth().CheckServiceNode.Service
|
||||
require.Equal(t, "foo", service.Service)
|
||||
require.Equal(t, int32(1234), service.Port)
|
||||
})
|
||||
|
||||
// Now update the service on the denied node and make sure we don't see an event.
|
||||
runStep(t, "updates to the service on the denied node, should not send an event", func(t *testing.T) {
|
||||
req = &structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "denied",
|
||||
|
@ -744,15 +759,10 @@ node "node1" {
|
|||
}
|
||||
require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg5"), req))
|
||||
|
||||
select {
|
||||
case event := <-chEvents:
|
||||
t.Fatalf("should not have received event: %v", event)
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
}
|
||||
}
|
||||
assertNoEvents(t, chEvents)
|
||||
})
|
||||
|
||||
// Start another subscribe call for bar, which the token shouldn't have access to.
|
||||
{
|
||||
runStep(t, "subscribe to a topic where events are not visible", func(t *testing.T) {
|
||||
streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{
|
||||
Topic: pbsubscribe.Topic_ServiceHealth,
|
||||
Key: "bar",
|
||||
|
@ -784,21 +794,17 @@ node "node1" {
|
|||
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||
}
|
||||
require.NoError(t, backend.store.EnsureRegistration(ids.Next("reg6"), req))
|
||||
|
||||
select {
|
||||
case event := <-chEvents:
|
||||
t.Fatalf("should not have received event: %v", event)
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
}
|
||||
}
|
||||
assertNoEvents(t, chEvents)
|
||||
})
|
||||
}
|
||||
|
||||
func TestServer_Subscribe_IntegrationWithBackend_ACLUpdate(t *testing.T) {
|
||||
backend, err := newTestBackend()
|
||||
require.NoError(t, err)
|
||||
srv := NewServer(backend, hclog.New(nil))
|
||||
addr := newTestServer(t, srv)
|
||||
addr := runTestServer(t, NewServer(backend, hclog.New(nil)))
|
||||
token := "this-token-is-good"
|
||||
|
||||
runStep(t, "create an ACL policy", func(t *testing.T) {
|
||||
rules := `
|
||||
service "foo" {
|
||||
policy = "write"
|
||||
|
@ -816,24 +822,26 @@ node "node1" {
|
|||
require.Equal(t, acl.Deny, authorizer.NodeRead("denied", nil))
|
||||
|
||||
// TODO: is there any easy way to do this with the acl package?
|
||||
token := "this-token-is-good"
|
||||
backend.authorizer = func(tok string) acl.Authorizer {
|
||||
if tok == token {
|
||||
return authorizer
|
||||
}
|
||||
return acl.DenyAll()
|
||||
}
|
||||
})
|
||||
|
||||
ids := newCounter()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
t.Cleanup(cancel)
|
||||
|
||||
conn, err := gogrpc.DialContext(ctx, addr.String(), gogrpc.WithInsecure())
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(logError(t, conn.Close))
|
||||
streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn)
|
||||
|
||||
chEvents := make(chan eventOrError, 0)
|
||||
|
||||
runStep(t, "setup a client and subscribe to a topic", func(t *testing.T) {
|
||||
streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn)
|
||||
streamHandle, err := streamClient.Subscribe(ctx, &pbsubscribe.SubscribeRequest{
|
||||
Topic: pbsubscribe.Topic_ServiceHealth,
|
||||
Key: "foo",
|
||||
|
@ -841,11 +849,11 @@ node "node1" {
|
|||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
chEvents := make(chan eventOrError, 0)
|
||||
go recvEvents(chEvents, streamHandle)
|
||||
|
||||
require.True(t, getEvent(t, chEvents).GetEndOfSnapshot())
|
||||
})
|
||||
|
||||
runStep(t, "updates to the token should close the stream", func(t *testing.T) {
|
||||
tokenID, err := uuid.GenerateUUID()
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -858,12 +866,22 @@ node "node1" {
|
|||
|
||||
select {
|
||||
case item := <-chEvents:
|
||||
require.Error(t, item.err, "got event: %v", item.event)
|
||||
require.Error(t, item.err, "got event instead of an error: %v", item.event)
|
||||
s, _ := status.FromError(item.err)
|
||||
require.Equal(t, codes.Aborted, s.Code())
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatalf("timeout waiting for aborted error")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func assertNoEvents(t *testing.T, chEvents chan eventOrError) {
|
||||
t.Helper()
|
||||
select {
|
||||
case event := <-chEvents:
|
||||
t.Fatalf("should not have received event: %v", event)
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
}
|
||||
}
|
||||
|
||||
func logError(t *testing.T, f func() error) func() {
|
||||
|
@ -873,3 +891,10 @@ func logError(t *testing.T, f func() error) func() {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func runStep(t *testing.T, name string, fn func(t *testing.T)) {
|
||||
t.Helper()
|
||||
if !t.Run(name, fn) {
|
||||
t.FailNow()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,9 +5,10 @@ import (
|
|||
"fmt"
|
||||
"regexp"
|
||||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/go-msgpack/codec"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
Loading…
Reference in New Issue