|
|
@ -5,12 +5,12 @@ package peerstream |
|
|
|
|
|
|
|
|
|
|
|
import ( |
|
|
|
import ( |
|
|
|
"context" |
|
|
|
"context" |
|
|
|
|
|
|
|
"fmt" |
|
|
|
"sort" |
|
|
|
"sort" |
|
|
|
"sync" |
|
|
|
"sync" |
|
|
|
"testing" |
|
|
|
"testing" |
|
|
|
"time" |
|
|
|
"time" |
|
|
|
|
|
|
|
|
|
|
|
"github.com/stretchr/testify/mock" |
|
|
|
|
|
|
|
"github.com/stretchr/testify/require" |
|
|
|
"github.com/stretchr/testify/require" |
|
|
|
|
|
|
|
|
|
|
|
"github.com/hashicorp/consul/acl" |
|
|
|
"github.com/hashicorp/consul/acl" |
|
|
@ -681,7 +681,7 @@ func TestSubscriptionManager_ServerAddrs(t *testing.T) { |
|
|
|
}, |
|
|
|
}, |
|
|
|
} |
|
|
|
} |
|
|
|
// mock handler only gets called once during the initial subscription
|
|
|
|
// mock handler only gets called once during the initial subscription
|
|
|
|
backend.handler.expect("", 0, 1, payload) |
|
|
|
backend.handler.SetPayload(1, payload) |
|
|
|
|
|
|
|
|
|
|
|
// Only configure a tracker for server address events.
|
|
|
|
// Only configure a tracker for server address events.
|
|
|
|
tracker := newResourceSubscriptionTracker() |
|
|
|
tracker := newResourceSubscriptionTracker() |
|
|
@ -1016,7 +1016,7 @@ func TestFlattenChecks(t *testing.T) { |
|
|
|
type testSubscriptionBackend struct { |
|
|
|
type testSubscriptionBackend struct { |
|
|
|
state.EventPublisher |
|
|
|
state.EventPublisher |
|
|
|
store *state.Store |
|
|
|
store *state.Store |
|
|
|
handler *mockSnapshotHandler |
|
|
|
handler *dummyReadyServersSnapshotHandler |
|
|
|
|
|
|
|
|
|
|
|
lastIdx uint64 |
|
|
|
lastIdx uint64 |
|
|
|
} |
|
|
|
} |
|
|
@ -1133,11 +1133,11 @@ func setupTestPeering(t *testing.T, store *state.Store, name string, index uint6 |
|
|
|
return p.ID |
|
|
|
return p.ID |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func newStateStore(t *testing.T, publisher *stream.EventPublisher) (*state.Store, *mockSnapshotHandler) { |
|
|
|
func newStateStore(t *testing.T, publisher *stream.EventPublisher) (*state.Store, *dummyReadyServersSnapshotHandler) { |
|
|
|
gc, err := state.NewTombstoneGC(time.Second, time.Millisecond) |
|
|
|
gc, err := state.NewTombstoneGC(time.Second, time.Millisecond) |
|
|
|
require.NoError(t, err) |
|
|
|
require.NoError(t, err) |
|
|
|
|
|
|
|
|
|
|
|
handler := newMockSnapshotHandler(t) |
|
|
|
handler := &dummyReadyServersSnapshotHandler{} |
|
|
|
|
|
|
|
|
|
|
|
store := state.NewStateStoreWithEventPublisher(gc, publisher) |
|
|
|
store := state.NewStateStoreWithEventPublisher(gc, publisher) |
|
|
|
require.NoError(t, publisher.RegisterHandler(state.EventTopicServiceHealth, store.ServiceHealthSnapshot, false)) |
|
|
|
require.NoError(t, publisher.RegisterHandler(state.EventTopicServiceHealth, store.ServiceHealthSnapshot, false)) |
|
|
@ -1297,38 +1297,34 @@ func pbCheck(node, svcID, svcName, status string, entMeta *pbcommon.EnterpriseMe |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// mockSnapshotHandler is copied from server_discovery/server_test.go
|
|
|
|
type dummyReadyServersSnapshotHandler struct { |
|
|
|
type mockSnapshotHandler struct { |
|
|
|
lock sync.Mutex |
|
|
|
mock.Mock |
|
|
|
eventIndex uint64 |
|
|
|
|
|
|
|
payload autopilotevents.EventPayloadReadyServers |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func newMockSnapshotHandler(t *testing.T) *mockSnapshotHandler { |
|
|
|
func (h *dummyReadyServersSnapshotHandler) SetPayload(idx uint64, payload autopilotevents.EventPayloadReadyServers) { |
|
|
|
handler := &mockSnapshotHandler{} |
|
|
|
h.lock.Lock() |
|
|
|
t.Cleanup(func() { |
|
|
|
defer h.lock.Unlock() |
|
|
|
handler.AssertExpectations(t) |
|
|
|
h.eventIndex = idx |
|
|
|
}) |
|
|
|
h.payload = payload |
|
|
|
return handler |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func (m *mockSnapshotHandler) handle(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) { |
|
|
|
func (h *dummyReadyServersSnapshotHandler) handle(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) { |
|
|
|
ret := m.Called(req, buf) |
|
|
|
if req.Topic != autopilotevents.EventTopicReadyServers { |
|
|
|
return ret.Get(0).(uint64), ret.Error(1) |
|
|
|
return 0, fmt.Errorf("bad request") |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
if req.Subject != stream.SubjectNone { |
|
|
|
|
|
|
|
return 0, fmt.Errorf("bad request") |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func (m *mockSnapshotHandler) expect(token string, requestIndex uint64, eventIndex uint64, payload autopilotevents.EventPayloadReadyServers) { |
|
|
|
h.lock.Lock() |
|
|
|
m.On("handle", stream.SubscribeRequest{ |
|
|
|
defer h.lock.Unlock() |
|
|
|
|
|
|
|
buf.Append([]stream.Event{{ |
|
|
|
Topic: autopilotevents.EventTopicReadyServers, |
|
|
|
Topic: autopilotevents.EventTopicReadyServers, |
|
|
|
Subject: stream.SubjectNone, |
|
|
|
Index: h.eventIndex, |
|
|
|
Token: token, |
|
|
|
Payload: h.payload, |
|
|
|
Index: requestIndex, |
|
|
|
}}) |
|
|
|
}, mock.Anything).Run(func(args mock.Arguments) { |
|
|
|
|
|
|
|
buf := args.Get(1).(stream.SnapshotAppender) |
|
|
|
return h.eventIndex, nil |
|
|
|
buf.Append([]stream.Event{ |
|
|
|
|
|
|
|
{ |
|
|
|
|
|
|
|
Topic: autopilotevents.EventTopicReadyServers, |
|
|
|
|
|
|
|
Index: eventIndex, |
|
|
|
|
|
|
|
Payload: payload, |
|
|
|
|
|
|
|
}, |
|
|
|
|
|
|
|
}) |
|
|
|
|
|
|
|
}).Return(eventIndex, nil) |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|