mirror of https://github.com/hashicorp/consul
Merge pull request #9188 from hashicorp/dnephin/more-streaming-tests
Add more streaming testspull/9840/head
commit
6b95e8dfe2
|
@ -2089,6 +2089,10 @@ func (s *Store) GatewayServices(ws memdb.WatchSet, gateway string, entMeta *stru
|
|||
// parseCheckServiceNodes is used to parse through a given set of services,
|
||||
// and query for an associated node and a set of checks. This is the inner
|
||||
// method used to return a rich set of results from a more simple query.
|
||||
//
|
||||
// TODO: idx parameter is not used except as a return value. Remove it.
|
||||
// TODO: err parameter is only used for early return. Remove it and check from the
|
||||
// caller.
|
||||
func parseCheckServiceNodes(
|
||||
tx ReadTxn, ws memdb.WatchSet, idx uint64,
|
||||
services structs.ServiceNodes,
|
||||
|
|
|
@ -49,8 +49,6 @@ func (e EventPayloadCheckServiceNode) MatchesKey(key, namespace string) bool {
|
|||
|
||||
// serviceHealthSnapshot returns a stream.SnapshotFunc that provides a snapshot
|
||||
// of stream.Events that describe the current state of a service health query.
|
||||
//
|
||||
// TODO: no tests for this yet
|
||||
func serviceHealthSnapshot(db ReadDB, topic stream.Topic) stream.SnapshotFunc {
|
||||
return func(req stream.SubscribeRequest, buf stream.SnapshotAppender) (index uint64, err error) {
|
||||
tx := db.ReadTxn()
|
||||
|
@ -68,11 +66,17 @@ func serviceHealthSnapshot(db ReadDB, topic stream.Topic) stream.SnapshotFunc {
|
|||
event := stream.Event{
|
||||
Index: idx,
|
||||
Topic: topic,
|
||||
Payload: EventPayloadCheckServiceNode{
|
||||
Op: pbsubscribe.CatalogOp_Register,
|
||||
Value: &n,
|
||||
},
|
||||
}
|
||||
payload := EventPayloadCheckServiceNode{
|
||||
Op: pbsubscribe.CatalogOp_Register,
|
||||
Value: &n,
|
||||
}
|
||||
|
||||
if connect && n.Service.Kind == structs.ServiceKindConnectProxy {
|
||||
payload.key = n.Service.Proxy.DestinationServiceName
|
||||
}
|
||||
|
||||
event.Payload = payload
|
||||
|
||||
// append each event as a separate item so that they can be serialized
|
||||
// separately, to prevent the encoding of one massive message.
|
||||
|
|
|
@ -8,15 +8,171 @@ import (
|
|||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/proto/pbcommon"
|
||||
|
||||
"github.com/hashicorp/consul/agent/consul/stream"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/proto/pbcommon"
|
||||
"github.com/hashicorp/consul/proto/pbsubscribe"
|
||||
"github.com/hashicorp/consul/types"
|
||||
)
|
||||
|
||||
func TestServiceHealthSnapshot(t *testing.T) {
|
||||
store := NewStateStore(nil)
|
||||
|
||||
counter := newIndexCounter()
|
||||
err := store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "db"))
|
||||
require.NoError(t, err)
|
||||
err = store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "web"))
|
||||
require.NoError(t, err)
|
||||
err = store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "web", regNode2))
|
||||
require.NoError(t, err)
|
||||
|
||||
fn := serviceHealthSnapshot((*readDB)(store.db.db), topicServiceHealth)
|
||||
buf := &snapshotAppender{}
|
||||
req := stream.SubscribeRequest{Key: "web"}
|
||||
|
||||
idx, err := fn(req, buf)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, counter.Last(), idx)
|
||||
|
||||
expected := [][]stream.Event{
|
||||
{
|
||||
testServiceHealthEvent(t, "web", func(e *stream.Event) error {
|
||||
e.Index = counter.Last()
|
||||
csn := getPayloadCheckServiceNode(e.Payload)
|
||||
csn.Node.CreateIndex = 1
|
||||
csn.Node.ModifyIndex = 1
|
||||
csn.Service.CreateIndex = 2
|
||||
csn.Service.ModifyIndex = 2
|
||||
csn.Checks[0].CreateIndex = 1
|
||||
csn.Checks[0].ModifyIndex = 1
|
||||
csn.Checks[1].CreateIndex = 2
|
||||
csn.Checks[1].ModifyIndex = 2
|
||||
return nil
|
||||
}),
|
||||
},
|
||||
{
|
||||
testServiceHealthEvent(t, "web", evNode2, func(e *stream.Event) error {
|
||||
e.Index = counter.Last()
|
||||
csn := getPayloadCheckServiceNode(e.Payload)
|
||||
csn.Node.CreateIndex = 3
|
||||
csn.Node.ModifyIndex = 3
|
||||
csn.Service.CreateIndex = 3
|
||||
csn.Service.ModifyIndex = 3
|
||||
for i := range csn.Checks {
|
||||
csn.Checks[i].CreateIndex = 3
|
||||
csn.Checks[i].ModifyIndex = 3
|
||||
}
|
||||
return nil
|
||||
}),
|
||||
},
|
||||
}
|
||||
assertDeepEqual(t, expected, buf.events, cmpEvents)
|
||||
}
|
||||
|
||||
func TestServiceHealthSnapshot_ConnectTopic(t *testing.T) {
|
||||
store := NewStateStore(nil)
|
||||
|
||||
counter := newIndexCounter()
|
||||
err := store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "db"))
|
||||
require.NoError(t, err)
|
||||
err = store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "web"))
|
||||
require.NoError(t, err)
|
||||
err = store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "web", regSidecar))
|
||||
require.NoError(t, err)
|
||||
err = store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "web", regNode2))
|
||||
require.NoError(t, err)
|
||||
err = store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "web", regNode2, regSidecar))
|
||||
require.NoError(t, err)
|
||||
|
||||
fn := serviceHealthSnapshot((*readDB)(store.db.db), topicServiceHealthConnect)
|
||||
buf := &snapshotAppender{}
|
||||
req := stream.SubscribeRequest{Key: "web", Topic: topicServiceHealthConnect}
|
||||
|
||||
idx, err := fn(req, buf)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, counter.Last(), idx)
|
||||
|
||||
expected := [][]stream.Event{
|
||||
{
|
||||
testServiceHealthEvent(t, "web", evSidecar, evConnectTopic, func(e *stream.Event) error {
|
||||
e.Index = counter.Last()
|
||||
ep := e.Payload.(EventPayloadCheckServiceNode)
|
||||
ep.key = "web"
|
||||
e.Payload = ep
|
||||
csn := ep.Value
|
||||
csn.Node.CreateIndex = 1
|
||||
csn.Node.ModifyIndex = 1
|
||||
csn.Service.CreateIndex = 3
|
||||
csn.Service.ModifyIndex = 3
|
||||
csn.Checks[0].CreateIndex = 1
|
||||
csn.Checks[0].ModifyIndex = 1
|
||||
csn.Checks[1].CreateIndex = 3
|
||||
csn.Checks[1].ModifyIndex = 3
|
||||
return nil
|
||||
}),
|
||||
},
|
||||
{
|
||||
testServiceHealthEvent(t, "web", evNode2, evSidecar, evConnectTopic, func(e *stream.Event) error {
|
||||
e.Index = counter.Last()
|
||||
ep := e.Payload.(EventPayloadCheckServiceNode)
|
||||
ep.key = "web"
|
||||
e.Payload = ep
|
||||
csn := ep.Value
|
||||
csn.Node.CreateIndex = 4
|
||||
csn.Node.ModifyIndex = 4
|
||||
csn.Service.CreateIndex = 5
|
||||
csn.Service.ModifyIndex = 5
|
||||
csn.Checks[0].CreateIndex = 4
|
||||
csn.Checks[0].ModifyIndex = 4
|
||||
csn.Checks[1].CreateIndex = 5
|
||||
csn.Checks[1].ModifyIndex = 5
|
||||
return nil
|
||||
}),
|
||||
},
|
||||
}
|
||||
assertDeepEqual(t, expected, buf.events, cmpEvents)
|
||||
}
|
||||
|
||||
type snapshotAppender struct {
|
||||
events [][]stream.Event
|
||||
}
|
||||
|
||||
func (s *snapshotAppender) Append(events []stream.Event) {
|
||||
s.events = append(s.events, events)
|
||||
}
|
||||
|
||||
type indexCounter struct {
|
||||
value uint64
|
||||
}
|
||||
|
||||
func (c *indexCounter) Next() uint64 {
|
||||
c.value++
|
||||
return c.value
|
||||
}
|
||||
|
||||
func (c *indexCounter) Last() uint64 {
|
||||
return c.value
|
||||
}
|
||||
|
||||
func newIndexCounter() *indexCounter {
|
||||
return &indexCounter{}
|
||||
}
|
||||
|
||||
var _ stream.SnapshotAppender = (*snapshotAppender)(nil)
|
||||
|
||||
func evIndexes(idx, create, modify uint64) func(e *stream.Event) error {
|
||||
return func(e *stream.Event) error {
|
||||
e.Index = idx
|
||||
csn := getPayloadCheckServiceNode(e.Payload)
|
||||
csn.Node.CreateIndex = create
|
||||
csn.Node.ModifyIndex = modify
|
||||
csn.Service.CreateIndex = create
|
||||
csn.Service.ModifyIndex = modify
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func TestServiceHealthEventsFromChanges(t *testing.T) {
|
||||
cases := []struct {
|
||||
Name string
|
||||
|
|
|
@ -0,0 +1,462 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
gogrpc "google.golang.org/grpc"
|
||||
grpcresolver "google.golang.org/grpc/resolver"
|
||||
|
||||
grpc "github.com/hashicorp/consul/agent/grpc"
|
||||
"github.com/hashicorp/consul/agent/grpc/resolver"
|
||||
"github.com/hashicorp/consul/agent/router"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/proto/pbservice"
|
||||
"github.com/hashicorp/consul/proto/pbsubscribe"
|
||||
"github.com/hashicorp/consul/testrpc"
|
||||
)
|
||||
|
||||
func TestSubscribeBackend_IntegrationWithServer_TLSEnabled(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
_, conf1 := testServerConfig(t)
|
||||
conf1.VerifyIncoming = true
|
||||
conf1.VerifyOutgoing = true
|
||||
conf1.RPCConfig.EnableStreaming = true
|
||||
configureTLS(conf1)
|
||||
server, err := newServer(t, conf1)
|
||||
require.NoError(t, err)
|
||||
defer server.Shutdown()
|
||||
|
||||
client, builder := newClientWithGRPCResolver(t, configureTLS, clientConfigVerifyOutgoing)
|
||||
|
||||
// Try to join
|
||||
testrpc.WaitForLeader(t, server.RPC, "dc1")
|
||||
joinLAN(t, client, server)
|
||||
testrpc.WaitForTestAgent(t, client.RPC, "dc1")
|
||||
|
||||
// Register a dummy node with our service on it.
|
||||
{
|
||||
req := &structs.RegisterRequest{
|
||||
Node: "node1",
|
||||
Address: "3.4.5.6",
|
||||
Datacenter: "dc1",
|
||||
Service: &structs.NodeService{
|
||||
ID: "redis1",
|
||||
Service: "redis",
|
||||
Address: "3.4.5.6",
|
||||
Port: 8080,
|
||||
},
|
||||
}
|
||||
var out struct{}
|
||||
require.NoError(t, server.RPC("Catalog.Register", &req, &out))
|
||||
}
|
||||
|
||||
// Start a Subscribe call to our streaming endpoint from the client.
|
||||
{
|
||||
pool := grpc.NewClientConnPool(builder, grpc.TLSWrapper(client.tlsConfigurator.OutgoingRPCWrapper()), client.tlsConfigurator.UseTLS)
|
||||
conn, err := pool.ClientConn("dc1")
|
||||
require.NoError(t, err)
|
||||
|
||||
streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
defer cancel()
|
||||
req := &pbsubscribe.SubscribeRequest{Topic: pbsubscribe.Topic_ServiceHealth, Key: "redis"}
|
||||
streamHandle, err := streamClient.Subscribe(ctx, req)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Start a goroutine to read updates off the pbsubscribe.
|
||||
eventCh := make(chan *pbsubscribe.Event, 0)
|
||||
go receiveSubscribeEvents(t, eventCh, streamHandle)
|
||||
|
||||
var snapshotEvents []*pbsubscribe.Event
|
||||
for i := 0; i < 2; i++ {
|
||||
select {
|
||||
case event := <-eventCh:
|
||||
snapshotEvents = append(snapshotEvents, event)
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatalf("did not receive events past %d", len(snapshotEvents))
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure the snapshot events come back with no issues.
|
||||
require.Len(t, snapshotEvents, 2)
|
||||
}
|
||||
|
||||
// Start a Subscribe call to our streaming endpoint from the server's loopback client.
|
||||
{
|
||||
|
||||
pool := grpc.NewClientConnPool(builder, grpc.TLSWrapper(client.tlsConfigurator.OutgoingRPCWrapper()), client.tlsConfigurator.UseTLS)
|
||||
conn, err := pool.ClientConn("dc1")
|
||||
require.NoError(t, err)
|
||||
|
||||
retryFailedConn(t, conn)
|
||||
|
||||
streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
defer cancel()
|
||||
req := &pbsubscribe.SubscribeRequest{Topic: pbsubscribe.Topic_ServiceHealth, Key: "redis"}
|
||||
streamHandle, err := streamClient.Subscribe(ctx, req)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Start a goroutine to read updates off the pbsubscribe.
|
||||
eventCh := make(chan *pbsubscribe.Event, 0)
|
||||
go receiveSubscribeEvents(t, eventCh, streamHandle)
|
||||
|
||||
var snapshotEvents []*pbsubscribe.Event
|
||||
for i := 0; i < 2; i++ {
|
||||
select {
|
||||
case event := <-eventCh:
|
||||
snapshotEvents = append(snapshotEvents, event)
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatalf("did not receive events past %d", len(snapshotEvents))
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure the snapshot events come back with no issues.
|
||||
require.Len(t, snapshotEvents, 2)
|
||||
}
|
||||
}
|
||||
|
||||
// receiveSubscribeEvents and send them to the channel.
|
||||
func receiveSubscribeEvents(t *testing.T, ch chan *pbsubscribe.Event, handle pbsubscribe.StateChangeSubscription_SubscribeClient) {
|
||||
for {
|
||||
event, err := handle.Recv()
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), "context deadline exceeded") ||
|
||||
strings.Contains(err.Error(), "context canceled") {
|
||||
break
|
||||
}
|
||||
t.Log(err)
|
||||
}
|
||||
ch <- event
|
||||
}
|
||||
}
|
||||
|
||||
func TestSubscribeBackend_IntegrationWithServer_TLSReload(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Set up a server with initially bad certificates.
|
||||
_, conf1 := testServerConfig(t)
|
||||
conf1.VerifyIncoming = true
|
||||
conf1.VerifyOutgoing = true
|
||||
conf1.CAFile = "../../test/ca/root.cer"
|
||||
conf1.CertFile = "../../test/key/ssl-cert-snakeoil.pem"
|
||||
conf1.KeyFile = "../../test/key/ssl-cert-snakeoil.key"
|
||||
conf1.RPCConfig.EnableStreaming = true
|
||||
|
||||
server, err := newServer(t, conf1)
|
||||
require.NoError(t, err)
|
||||
defer server.Shutdown()
|
||||
|
||||
// Set up a client with valid certs and verify_outgoing = true
|
||||
client, builder := newClientWithGRPCResolver(t, configureTLS, clientConfigVerifyOutgoing)
|
||||
|
||||
testrpc.WaitForLeader(t, server.RPC, "dc1")
|
||||
|
||||
// Subscribe calls should fail initially
|
||||
joinLAN(t, client, server)
|
||||
|
||||
pool := grpc.NewClientConnPool(builder, grpc.TLSWrapper(client.tlsConfigurator.OutgoingRPCWrapper()), client.tlsConfigurator.UseTLS)
|
||||
conn, err := pool.ClientConn("dc1")
|
||||
require.NoError(t, err)
|
||||
|
||||
streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
req := &pbsubscribe.SubscribeRequest{Topic: pbsubscribe.Topic_ServiceHealth, Key: "redis"}
|
||||
_, err = streamClient.Subscribe(ctx, req)
|
||||
require.Error(t, err)
|
||||
|
||||
// Reload the server with valid certs
|
||||
newConf := server.config.ToTLSUtilConfig()
|
||||
newConf.CertFile = "../../test/key/ourdomain.cer"
|
||||
newConf.KeyFile = "../../test/key/ourdomain.key"
|
||||
server.tlsConfigurator.Update(newConf)
|
||||
|
||||
// Try the subscribe call again
|
||||
retryFailedConn(t, conn)
|
||||
|
||||
streamClient = pbsubscribe.NewStateChangeSubscriptionClient(conn)
|
||||
_, err = streamClient.Subscribe(ctx, req)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func clientConfigVerifyOutgoing(config *Config) {
|
||||
config.VerifyOutgoing = true
|
||||
}
|
||||
|
||||
// retryFailedConn forces the ClientConn to reset its backoff timer and retry the connection,
|
||||
// to simulate the client eventually retrying after the initial failure. This is used both to simulate
|
||||
// retrying after an expected failure as well as to avoid flakiness when running many tests in parallel.
|
||||
func retryFailedConn(t *testing.T, conn *gogrpc.ClientConn) {
|
||||
state := conn.GetState()
|
||||
if state.String() != "TRANSIENT_FAILURE" {
|
||||
return
|
||||
}
|
||||
|
||||
// If the connection has failed, retry and wait for a state change.
|
||||
conn.ResetConnectBackoff()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
require.True(t, conn.WaitForStateChange(ctx, state))
|
||||
}
|
||||
|
||||
func TestSubscribeBackend_IntegrationWithServer_DeliversAllMessages(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for -short run")
|
||||
}
|
||||
// This is a fuzz/probabilistic test to try to provoke streaming into dropping
|
||||
// messages. There is a bug in the initial implementation that should make
|
||||
// this fail. While we can't be certain a pass means it's correct, it is
|
||||
// useful for finding bugs in our concurrency design.
|
||||
|
||||
// The issue is that when updates are coming in fast such that updates occur
|
||||
// in between us making the snapshot and beginning the stream updates, we
|
||||
// shouldn't miss anything.
|
||||
|
||||
// To test this, we will run a background goroutine that will write updates as
|
||||
// fast as possible while we then try to stream the results and ensure that we
|
||||
// see every change. We'll make the updates monotonically increasing so we can
|
||||
// easily tell if we missed one.
|
||||
|
||||
_, server := testServerWithConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc1"
|
||||
c.Bootstrap = true
|
||||
c.RPCConfig.EnableStreaming = true
|
||||
})
|
||||
defer server.Shutdown()
|
||||
codec := rpcClient(t, server)
|
||||
defer codec.Close()
|
||||
|
||||
client, builder := newClientWithGRPCResolver(t)
|
||||
|
||||
// Try to join
|
||||
testrpc.WaitForLeader(t, server.RPC, "dc1")
|
||||
joinLAN(t, client, server)
|
||||
testrpc.WaitForTestAgent(t, client.RPC, "dc1")
|
||||
|
||||
// Register a whole bunch of service instances so that the initial snapshot on
|
||||
// subscribe is big enough to take a bit of time to load giving more
|
||||
// opportunity for missed updates if there is a bug.
|
||||
for i := 0; i < 1000; i++ {
|
||||
req := &structs.RegisterRequest{
|
||||
Node: fmt.Sprintf("node-redis-%03d", i),
|
||||
Address: "3.4.5.6",
|
||||
Datacenter: "dc1",
|
||||
Service: &structs.NodeService{
|
||||
ID: fmt.Sprintf("redis-%03d", i),
|
||||
Service: "redis",
|
||||
Port: 11211,
|
||||
},
|
||||
}
|
||||
var out struct{}
|
||||
require.NoError(t, server.RPC("Catalog.Register", &req, &out))
|
||||
}
|
||||
|
||||
// Start background writer
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
go func() {
|
||||
// Update the registration with a monotonically increasing port as fast as
|
||||
// we can.
|
||||
req := &structs.RegisterRequest{
|
||||
Node: "node1",
|
||||
Address: "3.4.5.6",
|
||||
Datacenter: "dc1",
|
||||
Service: &structs.NodeService{
|
||||
ID: "redis-canary",
|
||||
Service: "redis",
|
||||
Port: 0,
|
||||
},
|
||||
}
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
var out struct{}
|
||||
require.NoError(t, server.RPC("Catalog.Register", &req, &out))
|
||||
req.Service.Port++
|
||||
if req.Service.Port > 100 {
|
||||
return
|
||||
}
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
}
|
||||
}()
|
||||
|
||||
pool := grpc.NewClientConnPool(builder, grpc.TLSWrapper(client.tlsConfigurator.OutgoingRPCWrapper()), client.tlsConfigurator.UseTLS)
|
||||
conn, err := pool.ClientConn("dc1")
|
||||
require.NoError(t, err)
|
||||
|
||||
streamClient := pbsubscribe.NewStateChangeSubscriptionClient(conn)
|
||||
|
||||
// Now start a whole bunch of streamers in parallel to maximise chance of
|
||||
// catching a race.
|
||||
n := 5
|
||||
var wg sync.WaitGroup
|
||||
var updateCount uint64
|
||||
// Buffered error chan so that workers can exit and terminate wg without
|
||||
// blocking on send. We collect errors this way since t isn't thread safe.
|
||||
errCh := make(chan error, n)
|
||||
for i := 0; i < n; i++ {
|
||||
i := i
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
verifyMonotonicStreamUpdates(ctx, t, streamClient, i, &updateCount, errCh)
|
||||
}()
|
||||
}
|
||||
|
||||
// Wait until all subscribers have verified the first bunch of updates all got
|
||||
// delivered.
|
||||
wg.Wait()
|
||||
|
||||
close(errCh)
|
||||
|
||||
// Require that none of them errored. Since we closed the chan above this loop
|
||||
// should terminate immediately if no errors were buffered.
|
||||
for err := range errCh {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// Sanity check that at least some non-snapshot messages were delivered. We
|
||||
// can't know exactly how many because it's timing dependent based on when
|
||||
// each subscribers snapshot occurs.
|
||||
require.True(t, atomic.LoadUint64(&updateCount) > 0,
|
||||
"at least some of the subscribers should have received non-snapshot updates")
|
||||
}
|
||||
|
||||
func newClientWithGRPCResolver(t *testing.T, ops ...func(*Config)) (*Client, *resolver.ServerResolverBuilder) {
|
||||
builder := resolver.NewServerResolverBuilder(resolver.Config{Scheme: t.Name()})
|
||||
registerWithGRPC(builder)
|
||||
|
||||
_, config := testClientConfig(t)
|
||||
for _, op := range ops {
|
||||
op(config)
|
||||
}
|
||||
|
||||
deps := newDefaultDeps(t, config)
|
||||
deps.Router = router.NewRouter(
|
||||
deps.Logger,
|
||||
config.Datacenter,
|
||||
fmt.Sprintf("%s.%s", config.NodeName, config.Datacenter),
|
||||
builder)
|
||||
|
||||
client, err := NewClient(config, deps)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
client.Shutdown()
|
||||
})
|
||||
return client, builder
|
||||
}
|
||||
|
||||
var grpcRegisterLock sync.Mutex
|
||||
|
||||
// registerWithGRPC registers the grpc/resolver.Builder as a grpc/resolver.
|
||||
// This function exists to synchronize registrations with a lock.
|
||||
// grpc/resolver.Register expects all registration to happen at init and does
|
||||
// not allow for concurrent registration. This function exists to support
|
||||
// parallel testing.
|
||||
func registerWithGRPC(b grpcresolver.Builder) {
|
||||
grpcRegisterLock.Lock()
|
||||
defer grpcRegisterLock.Unlock()
|
||||
grpcresolver.Register(b)
|
||||
}
|
||||
|
||||
type testLogger interface {
|
||||
Logf(format string, args ...interface{})
|
||||
}
|
||||
|
||||
func verifyMonotonicStreamUpdates(ctx context.Context, logger testLogger, client pbsubscribe.StateChangeSubscriptionClient, i int, updateCount *uint64, errCh chan<- error) {
|
||||
req := &pbsubscribe.SubscribeRequest{Topic: pbsubscribe.Topic_ServiceHealth, Key: "redis"}
|
||||
streamHandle, err := client.Subscribe(ctx, req)
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), "context deadline exceeded") ||
|
||||
strings.Contains(err.Error(), "context canceled") {
|
||||
logger.Logf("subscriber %05d: context cancelled before loop")
|
||||
return
|
||||
}
|
||||
errCh <- err
|
||||
return
|
||||
}
|
||||
|
||||
snapshotDone := false
|
||||
expectPort := int32(0)
|
||||
for {
|
||||
event, err := streamHandle.Recv()
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), "context deadline exceeded") ||
|
||||
strings.Contains(err.Error(), "context canceled") {
|
||||
break
|
||||
}
|
||||
errCh <- err
|
||||
return
|
||||
}
|
||||
|
||||
switch {
|
||||
case event.GetEndOfSnapshot():
|
||||
snapshotDone = true
|
||||
logger.Logf("subscriber %05d: snapshot done, expect next port to be %d", i, expectPort)
|
||||
case snapshotDone:
|
||||
// Verify we get all updates in order
|
||||
svc, err := svcOrErr(event)
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
return
|
||||
}
|
||||
if expectPort != svc.Port {
|
||||
errCh <- fmt.Errorf("subscriber %05d: missed %d update(s)!", i, svc.Port-expectPort)
|
||||
return
|
||||
}
|
||||
atomic.AddUint64(updateCount, 1)
|
||||
logger.Logf("subscriber %05d: got event with correct port=%d", i, expectPort)
|
||||
expectPort++
|
||||
default:
|
||||
// This is a snapshot update. Check if it's an update for the canary
|
||||
// instance that got applied before our snapshot was sent (likely)
|
||||
svc, err := svcOrErr(event)
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
return
|
||||
}
|
||||
if svc.ID == "redis-canary" {
|
||||
// Update the expected port we see in the next update to be one more
|
||||
// than the port in the snapshot.
|
||||
expectPort = svc.Port + 1
|
||||
logger.Logf("subscriber %05d: saw canary in snapshot with port %d", i, svc.Port)
|
||||
}
|
||||
}
|
||||
if expectPort > 100 {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func svcOrErr(event *pbsubscribe.Event) (*pbservice.NodeService, error) {
|
||||
health := event.GetServiceHealth()
|
||||
if health == nil {
|
||||
return nil, fmt.Errorf("not a health event: %#v", event)
|
||||
}
|
||||
csn := health.CheckServiceNode
|
||||
if csn == nil {
|
||||
return nil, fmt.Errorf("nil CSN: %#v", event)
|
||||
}
|
||||
if csn.Service == nil {
|
||||
return nil, fmt.Errorf("nil service: %#v", event)
|
||||
}
|
||||
return csn.Service, nil
|
||||
}
|
|
@ -10,14 +10,17 @@ import (
|
|||
"reflect"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/serf/coordinate"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
"github.com/hashicorp/consul/testrpc"
|
||||
"github.com/hashicorp/serf/coordinate"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestHealthChecksInState(t *testing.T) {
|
||||
|
@ -717,6 +720,12 @@ func TestHealthServiceNodes(t *testing.T) {
|
|||
if len(nodes) != 2 {
|
||||
r.Fatalf("Want 2 nodes")
|
||||
}
|
||||
header := resp.Header().Get("X-Consul-Index")
|
||||
if header == "" || header == "0" {
|
||||
r.Fatalf("Want non-zero header: %q", header)
|
||||
}
|
||||
_, err = strconv.ParseUint(header, 10, 64)
|
||||
r.Check(err)
|
||||
|
||||
// Should be a cache hit! The data should've updated in the cache
|
||||
// in the background so this should've been fetched directly from
|
||||
|
@ -728,6 +737,166 @@ func TestHealthServiceNodes(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestHealthServiceNodes_Blocking(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
hcl string
|
||||
grpcMetrics bool
|
||||
}{
|
||||
{name: "no streaming"},
|
||||
{
|
||||
name: "streaming",
|
||||
grpcMetrics: true,
|
||||
hcl: `
|
||||
rpc { enable_streaming = true }
|
||||
use_streaming_backend = true
|
||||
`,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
tc := tc
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
|
||||
sink := metrics.NewInmemSink(5*time.Second, time.Minute)
|
||||
metrics.NewGlobal(&metrics.Config{
|
||||
ServiceName: "testing",
|
||||
AllowedPrefixes: []string{"testing.grpc."},
|
||||
}, sink)
|
||||
|
||||
a := NewTestAgent(t, tc.hcl)
|
||||
defer a.Shutdown()
|
||||
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
|
||||
|
||||
// Register some initial service instances
|
||||
for i := 0; i < 2; i++ {
|
||||
args := &structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "bar",
|
||||
Address: "127.0.0.1",
|
||||
Service: &structs.NodeService{
|
||||
ID: fmt.Sprintf("test%03d", i),
|
||||
Service: "test",
|
||||
},
|
||||
}
|
||||
|
||||
var out struct{}
|
||||
require.NoError(t, a.RPC("Catalog.Register", args, &out))
|
||||
}
|
||||
|
||||
// Initial request should return two instances
|
||||
req, _ := http.NewRequest("GET", "/v1/health/service/test?dc=dc1", nil)
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := a.srv.HealthServiceNodes(resp, req)
|
||||
require.NoError(t, err)
|
||||
|
||||
nodes := obj.(structs.CheckServiceNodes)
|
||||
require.Len(t, nodes, 2)
|
||||
|
||||
idx := getIndex(t, resp)
|
||||
require.True(t, idx > 0)
|
||||
|
||||
// errCh collects errors from goroutines since it's unsafe for them to use
|
||||
// t to fail tests directly.
|
||||
errCh := make(chan error, 1)
|
||||
|
||||
checkErrs := func() {
|
||||
// Ensure no errors were sent on errCh and drain any nils we have
|
||||
for {
|
||||
select {
|
||||
case err := <-errCh:
|
||||
require.NoError(t, err)
|
||||
default:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Blocking on that index should block. We test that by launching another
|
||||
// goroutine that will wait a while before updating the registration and
|
||||
// make sure that we unblock before timeout and see the update but that it
|
||||
// takes at least as long as the sleep time.
|
||||
sleep := 200 * time.Millisecond
|
||||
start := time.Now()
|
||||
go func() {
|
||||
time.Sleep(sleep)
|
||||
|
||||
args := &structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "zoo",
|
||||
Address: "127.0.0.3",
|
||||
Service: &structs.NodeService{
|
||||
ID: "test",
|
||||
Service: "test",
|
||||
},
|
||||
}
|
||||
|
||||
var out struct{}
|
||||
errCh <- a.RPC("Catalog.Register", args, &out)
|
||||
}()
|
||||
|
||||
{
|
||||
timeout := 30 * time.Second
|
||||
url := fmt.Sprintf("/v1/health/service/test?dc=dc1&index=%d&wait=%s", idx, timeout)
|
||||
req, _ := http.NewRequest("GET", url, nil)
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := a.srv.HealthServiceNodes(resp, req)
|
||||
require.NoError(t, err)
|
||||
elapsed := time.Since(start)
|
||||
require.True(t, elapsed > sleep, "request should block for at "+
|
||||
" least as long as sleep. sleep=%s, elapsed=%s", sleep, elapsed)
|
||||
|
||||
require.True(t, elapsed < timeout, "request should unblock before"+
|
||||
" it timed out. timeout=%s, elapsed=%s", timeout, elapsed)
|
||||
|
||||
nodes := obj.(structs.CheckServiceNodes)
|
||||
require.Len(t, nodes, 3)
|
||||
|
||||
newIdx := getIndex(t, resp)
|
||||
require.True(t, idx < newIdx, "index should have increased."+
|
||||
"idx=%d, newIdx=%d", idx, newIdx)
|
||||
|
||||
idx = newIdx
|
||||
|
||||
checkErrs()
|
||||
}
|
||||
|
||||
// Blocking should last until timeout in absence of updates
|
||||
start = time.Now()
|
||||
{
|
||||
timeout := 200 * time.Millisecond
|
||||
url := fmt.Sprintf("/v1/health/service/test?dc=dc1&index=%d&wait=%s",
|
||||
idx, timeout)
|
||||
req, _ := http.NewRequest("GET", url, nil)
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := a.srv.HealthServiceNodes(resp, req)
|
||||
require.NoError(t, err)
|
||||
elapsed := time.Since(start)
|
||||
// Note that servers add jitter to timeout requested but don't remove it
|
||||
// so this should always be true.
|
||||
require.True(t, elapsed > timeout, "request should block for at "+
|
||||
" least as long as timeout. timeout=%s, elapsed=%s", timeout, elapsed)
|
||||
|
||||
nodes := obj.(structs.CheckServiceNodes)
|
||||
require.Len(t, nodes, 3)
|
||||
|
||||
newIdx := getIndex(t, resp)
|
||||
require.Equal(t, idx, newIdx)
|
||||
}
|
||||
|
||||
if tc.grpcMetrics {
|
||||
data := sink.Data()
|
||||
if l := len(data); l < 1 {
|
||||
t.Errorf("expected at least 1 metrics interval, got :%v", l)
|
||||
}
|
||||
if count := len(data[0].Gauges); count < 2 {
|
||||
t.Errorf("expected at least 2 grpc gauge metrics, got: %v", count)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for testing.Short")
|
||||
|
|
|
@ -43,7 +43,7 @@ func newLoggerForRequest(l Logger, req *pbsubscribe.SubscribeRequest) Logger {
|
|||
"dc", req.Datacenter,
|
||||
"key", req.Key,
|
||||
"namespace", req.Namespace,
|
||||
"index", req.Index,
|
||||
"request_index", req.Index,
|
||||
"stream_id", &streamID{})
|
||||
}
|
||||
|
||||
|
|
|
@ -21,8 +21,6 @@ import (
|
|||
uuid "github.com/hashicorp/go-uuid"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/config"
|
||||
"github.com/hashicorp/consul/agent/connect"
|
||||
|
@ -30,6 +28,7 @@ import (
|
|||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/sdk/freeport"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
"github.com/hashicorp/consul/tlsutil"
|
||||
)
|
||||
|
@ -55,8 +54,7 @@ type TestAgent struct {
|
|||
// when Shutdown() is called.
|
||||
Config *config.RuntimeConfig
|
||||
|
||||
// LogOutput is the sink for the logs. If nil, logs are written
|
||||
// to os.Stderr.
|
||||
// LogOutput is the sink for the logs. If nil, logs are written to os.Stderr.
|
||||
LogOutput io.Writer
|
||||
|
||||
// DataDir may be set to a directory which exists. If is it not set,
|
||||
|
|
|
@ -2213,7 +2213,7 @@ bind_addr = "{{ GetPrivateInterfaces | include \"network\" \"10.0.0.0/8\" | attr
|
|||
streaming rpc, instead of the traditional blocking queries, for endpoints which support
|
||||
streaming. All servers must have [`rpc.enable_streaming`](#rpc_enable_streaming)
|
||||
enabled before any client can enable `use_streaming_backend`.
|
||||
At least one of [`dns.use_cache`](#dns_use_cache) or
|
||||
At least one of [`dns_config.use_cache`](#dns_use_cache) or
|
||||
[`http_config.use_cache`](#http_config_use_cache) must be enabled, otherwise
|
||||
this setting has no effect.
|
||||
`use_streaming_backend` will default to true in a future version of Consul.
|
||||
|
|
|
@ -280,10 +280,10 @@ These metrics are used to monitor the health of the Consul servers.
|
|||
| `consul.txn.apply` | This measures the time spent applying a transaction operation. | ms | timer |
|
||||
| `consul.txn.read` | This measures the time spent returning a read transaction. | ms | timer |
|
||||
| `consul.grpc.client.request.count` | This metric counts the number of gRPC requests made by the client agent to a Consul server. | requests | counter |
|
||||
| `consul.grpc.client.connect.count` | This metric counts the number of new gRPC connections opened by the client agent to a Consul server. | connections | counter |
|
||||
| `consul.grpc.client.connection.count` | This metric counts the number of new gRPC connections opened by the client agent to a Consul server. | connections | counter |
|
||||
| `consul.grpc.client.connections` | This metric measures the number of active gRPC connections open from the client agent to any Consul servers. | connections | gauge |
|
||||
| `consul.grpc.server.request.count` | This metric counts the number of gRPC requests received by the server. | requests | counter |
|
||||
| `consul.grpc.server.connect.count` | This metric counts the number of new gRPC connections received by the server. | connections | counter |
|
||||
| `consul.grpc.server.connection.count` | This metric counts the number of new gRPC connections received by the server. | connections | counter |
|
||||
| `consul.grpc.server.connections` | This metric measures the number of active gRPC connections open on the server. | connections | gauge |
|
||||
| `consul.grpc.server.stream.count` | This metric counts the number of new gRPC streams received by the server. | streams | counter |
|
||||
| `consul.grpc.server.streams` | This metric measures the number of active gRPC streams handled by the server. | streams | guage |
|
||||
|
|
Loading…
Reference in New Issue