mirror of https://github.com/hashicorp/consul
submatview: test store with Get and Notify calls together
parent
c1063999fc
commit
9854c424eb
|
@ -6,11 +6,14 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
"github.com/hashicorp/consul/lib/ttlcache"
|
||||
)
|
||||
|
||||
type Store struct {
|
||||
logger hclog.Logger
|
||||
lock sync.RWMutex
|
||||
byKey map[string]entry
|
||||
expiryHeap *ttlcache.ExpiryHeap
|
||||
|
@ -25,8 +28,9 @@ type entry struct {
|
|||
requests int
|
||||
}
|
||||
|
||||
func NewStore() *Store {
|
||||
func NewStore(logger hclog.Logger) *Store {
|
||||
return &Store{
|
||||
logger: logger,
|
||||
byKey: make(map[string]entry),
|
||||
expiryHeap: ttlcache.NewExpiryHeap(),
|
||||
}
|
||||
|
@ -117,8 +121,13 @@ func (s *Store) Notify(
|
|||
return
|
||||
case err != nil:
|
||||
// TODO: cache.Notify sends errors on updateCh, should this do the same?
|
||||
// It seems like only fetch errors would ever get sent along.
|
||||
// TODO: log warning
|
||||
// It seems like only fetch errors would ever get sent along and eventually
|
||||
// logged, so sending may not provide any benefit here.
|
||||
|
||||
s.logger.Warn("handling error in Store.Notify",
|
||||
"error", err,
|
||||
"request-type", req.Type(),
|
||||
"index", index)
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
|
@ -10,16 +10,18 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
"github.com/hashicorp/consul/lib/ttlcache"
|
||||
"github.com/hashicorp/consul/proto/pbcommon"
|
||||
"github.com/hashicorp/consul/proto/pbservice"
|
||||
"github.com/hashicorp/consul/proto/pbsubscribe"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
)
|
||||
|
||||
func TestStore_Get(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
store := NewStore()
|
||||
store := NewStore(hclog.New(nil))
|
||||
go store.Run(ctx)
|
||||
|
||||
req := &fakeRequest{
|
||||
|
@ -126,12 +128,17 @@ type resultOrError struct {
|
|||
|
||||
type fakeRequest struct {
|
||||
index uint64
|
||||
key string
|
||||
client *TestStreamingClient
|
||||
}
|
||||
|
||||
func (r *fakeRequest) CacheInfo() cache.RequestInfo {
|
||||
key := r.key
|
||||
if key == "" {
|
||||
key = "key"
|
||||
}
|
||||
return cache.RequestInfo{
|
||||
Key: "key",
|
||||
Key: key,
|
||||
Token: "abcd",
|
||||
Datacenter: "dc1",
|
||||
Timeout: 4 * time.Second,
|
||||
|
@ -203,15 +210,186 @@ func (f *fakeView) Reset() {
|
|||
f.srvs = make(map[string]*pbservice.CheckServiceNode)
|
||||
}
|
||||
|
||||
// TODO: Get with Notify
|
||||
|
||||
func TestStore_Notify(t *testing.T) {
|
||||
// TODO: Notify with no existing entry
|
||||
// TODO: Notify with Get
|
||||
// TODO: Notify multiple times same key
|
||||
// TODO: Notify no update if index is not past MinIndex.
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
store := NewStore(hclog.New(nil))
|
||||
go store.Run(ctx)
|
||||
|
||||
req := &fakeRequest{
|
||||
client: NewTestStreamingClient(pbcommon.DefaultEnterpriseMeta.Namespace),
|
||||
}
|
||||
req.client.QueueEvents(
|
||||
newEndOfSnapshotEvent(2),
|
||||
newEventServiceHealthRegister(10, 1, "srv1"),
|
||||
newEventServiceHealthRegister(22, 2, "srv1"))
|
||||
|
||||
cID := "correlate"
|
||||
ch := make(chan cache.UpdateEvent)
|
||||
|
||||
err := store.Notify(ctx, req, cID, ch)
|
||||
require.NoError(t, err)
|
||||
|
||||
runStep(t, "from empty store, starts materializer", func(t *testing.T) {
|
||||
store.lock.Lock()
|
||||
defer store.lock.Unlock()
|
||||
require.Len(t, store.byKey, 1)
|
||||
e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())]
|
||||
require.Equal(t, ttlcache.NotIndexed, e.expiry.Index())
|
||||
require.Equal(t, 1, e.requests)
|
||||
})
|
||||
|
||||
runStep(t, "updates are received", func(t *testing.T) {
|
||||
select {
|
||||
case update := <-ch:
|
||||
require.NoError(t, update.Err)
|
||||
require.Equal(t, cID, update.CorrelationID)
|
||||
require.Equal(t, uint64(22), update.Meta.Index)
|
||||
require.Equal(t, uint64(22), update.Result.(fakeResult).index)
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Fatalf("expected Get to unblock when new events are received")
|
||||
}
|
||||
|
||||
req.client.QueueEvents(newEventServiceHealthRegister(24, 2, "srv1"))
|
||||
|
||||
select {
|
||||
case update := <-ch:
|
||||
require.NoError(t, update.Err)
|
||||
require.Equal(t, cID, update.CorrelationID)
|
||||
require.Equal(t, uint64(24), update.Meta.Index)
|
||||
require.Equal(t, uint64(24), update.Result.(fakeResult).index)
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Fatalf("expected Get to unblock when new events are received")
|
||||
}
|
||||
})
|
||||
|
||||
runStep(t, "closing the notify starts the expiry counter", func(t *testing.T) {
|
||||
cancel()
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
store.lock.Lock()
|
||||
defer store.lock.Unlock()
|
||||
e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())]
|
||||
require.Equal(r, 0, e.expiry.Index())
|
||||
require.Equal(r, 0, e.requests)
|
||||
require.Equal(r, store.expiryHeap.Next().Entry, e.expiry)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestStore_Notify_ManyRequests(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
store := NewStore(hclog.New(nil))
|
||||
go store.Run(ctx)
|
||||
|
||||
req := &fakeRequest{
|
||||
client: NewTestStreamingClient(pbcommon.DefaultEnterpriseMeta.Namespace),
|
||||
}
|
||||
req.client.QueueEvents(newEndOfSnapshotEvent(2))
|
||||
|
||||
cID := "correlate"
|
||||
ch1 := make(chan cache.UpdateEvent)
|
||||
ch2 := make(chan cache.UpdateEvent)
|
||||
|
||||
require.NoError(t, store.Notify(ctx, req, cID, ch1))
|
||||
assertRequestCount(t, store, req, 1)
|
||||
|
||||
require.NoError(t, store.Notify(ctx, req, cID, ch2))
|
||||
assertRequestCount(t, store, req, 2)
|
||||
|
||||
req.index = 15
|
||||
|
||||
go func() {
|
||||
_, _ = store.Get(ctx, req)
|
||||
}()
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
assertRequestCount(r, store, req, 3)
|
||||
})
|
||||
|
||||
go func() {
|
||||
_, _ = store.Get(ctx, req)
|
||||
}()
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
assertRequestCount(r, store, req, 4)
|
||||
})
|
||||
|
||||
var req2 *fakeRequest
|
||||
|
||||
runStep(t, "Get and Notify with a different key", func(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
req2 = &fakeRequest{client: req.client, key: "key2"}
|
||||
|
||||
require.NoError(t, store.Notify(ctx, req2, cID, ch1))
|
||||
go func() {
|
||||
_, _ = store.Get(ctx, req2)
|
||||
}()
|
||||
|
||||
// the original entry should still be at count 4
|
||||
assertRequestCount(t, store, req, 4)
|
||||
// the new entry should be at count 2
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
assertRequestCount(r, store, req2, 2)
|
||||
})
|
||||
})
|
||||
|
||||
runStep(t, "end all the requests", func(t *testing.T) {
|
||||
req.client.QueueEvents(
|
||||
newEventServiceHealthRegister(10, 1, "srv1"),
|
||||
newEventServiceHealthRegister(12, 2, "srv1"),
|
||||
newEventServiceHealthRegister(13, 1, "srv2"),
|
||||
newEventServiceHealthRegister(16, 3, "srv2"))
|
||||
|
||||
// The two Get requests should exit now that the index has been updated
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
assertRequestCount(r, store, req, 2)
|
||||
})
|
||||
|
||||
// Cancel the context so all requests terminate
|
||||
cancel()
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
assertRequestCount(r, store, req, 0)
|
||||
})
|
||||
})
|
||||
|
||||
runStep(t, "the expiry heap should contain two entries", func(t *testing.T) {
|
||||
store.lock.Lock()
|
||||
defer store.lock.Unlock()
|
||||
e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())]
|
||||
e2 := store.byKey[makeEntryKey(req2.Type(), req2.CacheInfo())]
|
||||
require.Equal(t, 0, e2.expiry.Index())
|
||||
require.Equal(t, 1, e.expiry.Index())
|
||||
|
||||
require.Equal(t, store.expiryHeap.Next().Entry, e2.expiry)
|
||||
})
|
||||
}
|
||||
|
||||
type testingT interface {
|
||||
Helper()
|
||||
Fatalf(string, ...interface{})
|
||||
}
|
||||
|
||||
func assertRequestCount(t testingT, s *Store, req Request, expected int) {
|
||||
t.Helper()
|
||||
|
||||
key := makeEntryKey(req.Type(), req.CacheInfo())
|
||||
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
actual := s.byKey[key].requests
|
||||
if actual != expected {
|
||||
t.Fatalf("expected request count to be %d, got %d", expected, actual)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: test expiration
|
||||
|
||||
func runStep(t *testing.T, name string, fn func(t *testing.T)) {
|
||||
t.Helper()
|
||||
if !t.Run(name, fn) {
|
||||
|
|
Loading…
Reference in New Issue