From 7688178ad20f196797b6b51157592f50aa734c35 Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" <4903+rboyer@users.noreply.github.com> Date: Fri, 22 Sep 2023 13:12:00 -0500 Subject: [PATCH] peerstream: fix flaky test related to autopilot integration (#18979) --- .../services/peerstream/stream_test.go | 12 ++-- .../peerstream/subscription_manager_test.go | 62 +++++++++---------- 2 files changed, 33 insertions(+), 41 deletions(-) diff --git a/agent/grpc-external/services/peerstream/stream_test.go b/agent/grpc-external/services/peerstream/stream_test.go index 8c1e0783bd..ad3399b79f 100644 --- a/agent/grpc-external/services/peerstream/stream_test.go +++ b/agent/grpc-external/services/peerstream/stream_test.go @@ -1434,10 +1434,6 @@ func makeClient(t *testing.T, srv *testServer, peerID string) *MockClient { receivedSub3, err := client.Recv() require.NoError(t, err) - // This is required when the client subscribes to server address replication messages. - // We assert for the handler to be called at least once but the data doesn't matter. - srv.mockSnapshotHandler.expect("", 0, 0, nil) - // Issue services, roots, and server address subscription to server. // Note that server address may not come as an initial message for _, resourceURL := range []string{ @@ -3060,9 +3056,9 @@ func requireEqualInstances(t *testing.T, expect, got structs.CheckServiceNodes) type testServer struct { *Server - // mockSnapshotHandler is solely used for handling autopilot events + // readyServersSnapshotHandler is solely used for handling autopilot events // which don't come from the state store. - mockSnapshotHandler *mockSnapshotHandler + readyServersSnapshotHandler *dummyReadyServersSnapshotHandler } func newTestServer(t *testing.T, configFn func(c *Config)) (*testServer, *state.Store) { @@ -3104,8 +3100,8 @@ func newTestServer(t *testing.T, configFn func(c *Config)) (*testServer, *state. t.Cleanup(grpcServer.Stop) return &testServer{ - Server: srv, - mockSnapshotHandler: handler, + Server: srv, + readyServersSnapshotHandler: handler, }, store } diff --git a/agent/grpc-external/services/peerstream/subscription_manager_test.go b/agent/grpc-external/services/peerstream/subscription_manager_test.go index cc2afee0d5..2abd40be95 100644 --- a/agent/grpc-external/services/peerstream/subscription_manager_test.go +++ b/agent/grpc-external/services/peerstream/subscription_manager_test.go @@ -5,12 +5,12 @@ package peerstream import ( "context" + "fmt" "sort" "sync" "testing" "time" - "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/hashicorp/consul/acl" @@ -681,7 +681,7 @@ func TestSubscriptionManager_ServerAddrs(t *testing.T) { }, } // mock handler only gets called once during the initial subscription - backend.handler.expect("", 0, 1, payload) + backend.handler.SetPayload(1, payload) // Only configure a tracker for server address events. tracker := newResourceSubscriptionTracker() @@ -1016,7 +1016,7 @@ func TestFlattenChecks(t *testing.T) { type testSubscriptionBackend struct { state.EventPublisher store *state.Store - handler *mockSnapshotHandler + handler *dummyReadyServersSnapshotHandler lastIdx uint64 } @@ -1133,11 +1133,11 @@ func setupTestPeering(t *testing.T, store *state.Store, name string, index uint6 return p.ID } -func newStateStore(t *testing.T, publisher *stream.EventPublisher) (*state.Store, *mockSnapshotHandler) { +func newStateStore(t *testing.T, publisher *stream.EventPublisher) (*state.Store, *dummyReadyServersSnapshotHandler) { gc, err := state.NewTombstoneGC(time.Second, time.Millisecond) require.NoError(t, err) - handler := newMockSnapshotHandler(t) + handler := &dummyReadyServersSnapshotHandler{} store := state.NewStateStoreWithEventPublisher(gc, publisher) require.NoError(t, publisher.RegisterHandler(state.EventTopicServiceHealth, store.ServiceHealthSnapshot, false)) @@ -1297,38 +1297,34 @@ func pbCheck(node, svcID, svcName, status string, entMeta *pbcommon.EnterpriseMe } } -// mockSnapshotHandler is copied from server_discovery/server_test.go -type mockSnapshotHandler struct { - mock.Mock +type dummyReadyServersSnapshotHandler struct { + lock sync.Mutex + eventIndex uint64 + payload autopilotevents.EventPayloadReadyServers } -func newMockSnapshotHandler(t *testing.T) *mockSnapshotHandler { - handler := &mockSnapshotHandler{} - t.Cleanup(func() { - handler.AssertExpectations(t) - }) - return handler +func (h *dummyReadyServersSnapshotHandler) SetPayload(idx uint64, payload autopilotevents.EventPayloadReadyServers) { + h.lock.Lock() + defer h.lock.Unlock() + h.eventIndex = idx + h.payload = payload } -func (m *mockSnapshotHandler) handle(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) { - ret := m.Called(req, buf) - return ret.Get(0).(uint64), ret.Error(1) -} +func (h *dummyReadyServersSnapshotHandler) handle(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) { + if req.Topic != autopilotevents.EventTopicReadyServers { + return 0, fmt.Errorf("bad request") + } + if req.Subject != stream.SubjectNone { + return 0, fmt.Errorf("bad request") + } -func (m *mockSnapshotHandler) expect(token string, requestIndex uint64, eventIndex uint64, payload autopilotevents.EventPayloadReadyServers) { - m.On("handle", stream.SubscribeRequest{ + h.lock.Lock() + defer h.lock.Unlock() + buf.Append([]stream.Event{{ Topic: autopilotevents.EventTopicReadyServers, - Subject: stream.SubjectNone, - Token: token, - Index: requestIndex, - }, mock.Anything).Run(func(args mock.Arguments) { - buf := args.Get(1).(stream.SnapshotAppender) - buf.Append([]stream.Event{ - { - Topic: autopilotevents.EventTopicReadyServers, - Index: eventIndex, - Payload: payload, - }, - }) - }).Return(eventIndex, nil) + Index: h.eventIndex, + Payload: h.payload, + }}) + + return h.eventIndex, nil }