mirror of https://github.com/hashicorp/consul
Port over the index 0 -> 1 code that lived in the old rpc setQueryMeta function. (#13561)
parent
5de75550d3
commit
7a4d13b0b2
|
@ -135,6 +135,16 @@ func ServerLocalBlockingQuery[ResultType any, StoreType StateStore](
|
|||
ws.Add(state.AbandonCh())
|
||||
|
||||
index, result, err := query(ws, state)
|
||||
// Always set a non-zero index. Generally we expect the index
|
||||
// to be set to Raft index which can never be 0. If the query
|
||||
// returned no results we expect it to be set to the max index of the table,
|
||||
// however we can't guarantee this always happens.
|
||||
// To prevent a client from accidentally performing many non-blocking queries
|
||||
// (which causes lots of unnecessary load), we always set a default value of 1.
|
||||
// This is sufficient to prevent the unnecessary load in most cases.
|
||||
if index < 1 {
|
||||
index = 1
|
||||
}
|
||||
|
||||
switch {
|
||||
case errors.Is(err, ErrorNotFound):
|
||||
|
|
|
@ -103,6 +103,34 @@ func TestServerLocalBlockingQuery_NonBlocking(t *testing.T) {
|
|||
require.Equal(t, &testResult{value: "foo"}, result)
|
||||
}
|
||||
|
||||
func TestServerLocalBlockingQuery_Index0(t *testing.T) {
|
||||
abandonCh := make(chan struct{})
|
||||
t.Cleanup(func() { close(abandonCh) })
|
||||
|
||||
store := NewMockStateStore(t)
|
||||
store.On("AbandonCh").
|
||||
Return(closeChan(abandonCh)).
|
||||
Once()
|
||||
|
||||
provider := newMockStoreProvider(t)
|
||||
provider.On("getStore").Return(store).Once()
|
||||
provider.On("query", mock.Anything, store).
|
||||
// the index 0 returned here should get translated to 1 by ServerLocalBlockingQuery
|
||||
Return(uint64(0), &testResult{value: "foo"}, nil).
|
||||
Once()
|
||||
|
||||
idx, result, err := ServerLocalBlockingQuery(
|
||||
context.Background(),
|
||||
provider.getStore,
|
||||
0,
|
||||
true,
|
||||
provider.query,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 1, idx)
|
||||
require.Equal(t, &testResult{value: "foo"}, result)
|
||||
}
|
||||
|
||||
func TestServerLocalBlockingQuery_NotFound(t *testing.T) {
|
||||
abandonCh := make(chan struct{})
|
||||
t.Cleanup(func() { close(abandonCh) })
|
||||
|
@ -387,8 +415,10 @@ func TestServerLocalNotify_internal(t *testing.T) {
|
|||
provider.On("query", mock.Anything, store).
|
||||
Return(uint64(0), nilResult, fmt.Errorf("injected error")).
|
||||
Times(3)
|
||||
// we should only notify the first time as the index of 1 wont exceed the min index
|
||||
// after the second two queries.
|
||||
provider.On("notify", ctx, "test", nilResult, fmt.Errorf("injected error")).
|
||||
Times(3)
|
||||
Once()
|
||||
provider.On("query", mock.Anything, store).
|
||||
Return(uint64(7), &testResult{value: "foo"}, nil).
|
||||
Once()
|
||||
|
|
Loading…
Reference in New Issue