mirror of https://github.com/hashicorp/consul
Fix ConnectQueryBlocking test
parent
86342e4bca
commit
9f233dece2
|
@ -2022,7 +2022,7 @@ func (s *Store) checkServiceNodesTxn(tx *memdb.Txn, ws memdb.WatchSet, serviceNa
|
|||
// Gateways are tracked in a separate table, and we append them to the result set.
|
||||
// We append rather than replace since it allows users to migrate a service
|
||||
// to the mesh with a mix of sidecars and gateways until all its instances have a sidecar.
|
||||
var gatewayChan <-chan struct{}
|
||||
var gatewayNodesCh <-chan struct{}
|
||||
if connect {
|
||||
// Look up gateway nodes associated with the service
|
||||
_, nodes, _, err := s.serviceGatewayNodes(tx, serviceName, structs.ServiceKindTerminatingGateway, entMeta)
|
||||
|
@ -2092,7 +2092,11 @@ func (s *Store) checkServiceNodesTxn(tx *memdb.Txn, ws memdb.WatchSet, serviceNa
|
|||
fallbackWS = ws
|
||||
// We also need to watch the iterator from earlier too.
|
||||
fallbackWS.Add(iter.WatchCh())
|
||||
fallbackWS.Add(gatewayChan)
|
||||
|
||||
// This channel will be nil if there are no known associations between the service and a gateway
|
||||
if gatewayNodesCh != nil {
|
||||
fallbackWS.Add(gatewayNodesCh)
|
||||
}
|
||||
} else if connect {
|
||||
// If this is a connect query then there is a subtlety to watch out for.
|
||||
// In addition to watching the proxy service indexes for changes above, we
|
||||
|
|
|
@ -3014,16 +3014,16 @@ func TestStateStore_ConnectQueryBlocking(t *testing.T) {
|
|||
setupFn: nil,
|
||||
svc: "test",
|
||||
wantBeforeResLen: 0,
|
||||
// Only the connect index iterator is watched
|
||||
wantBeforeWatchSetSize: 1,
|
||||
// The connect index and gateway-services iterators are watched
|
||||
wantBeforeWatchSetSize: 2,
|
||||
updateFn: func(s *Store) {
|
||||
testRegisterService(t, s, 4, "node1", "test")
|
||||
},
|
||||
shouldFire: false,
|
||||
wantAfterIndex: 4, // No results falls back to global service index
|
||||
wantAfterResLen: 0,
|
||||
// Only the connect index iterator is watched
|
||||
wantAfterWatchSetSize: 1,
|
||||
// The connect index and gateway-services iterators are watched
|
||||
wantAfterWatchSetSize: 2,
|
||||
},
|
||||
{
|
||||
name: "not affected by non-connect-enabled target service de-registration",
|
||||
|
@ -3032,8 +3032,8 @@ func TestStateStore_ConnectQueryBlocking(t *testing.T) {
|
|||
},
|
||||
svc: "test",
|
||||
wantBeforeResLen: 0,
|
||||
// Only the connect index iterator is watched
|
||||
wantBeforeWatchSetSize: 1,
|
||||
// The connect index and gateway-services iterators are watched
|
||||
wantBeforeWatchSetSize: 2,
|
||||
updateFn: func(s *Store) {
|
||||
require.NoError(t, s.DeleteService(5, "node1", "test", nil))
|
||||
},
|
||||
|
@ -3044,25 +3044,25 @@ func TestStateStore_ConnectQueryBlocking(t *testing.T) {
|
|||
shouldFire: false,
|
||||
wantAfterIndex: 5, // No results falls back to global service index
|
||||
wantAfterResLen: 0,
|
||||
// Only the connect index iterator is watched
|
||||
wantAfterWatchSetSize: 1,
|
||||
// The connect index and gateway-services iterators are watched
|
||||
wantAfterWatchSetSize: 2,
|
||||
},
|
||||
{
|
||||
name: "unblocks on first connect-native service registration",
|
||||
setupFn: nil,
|
||||
svc: "test",
|
||||
wantBeforeResLen: 0,
|
||||
// Only the connect index iterator is watched
|
||||
wantBeforeWatchSetSize: 1,
|
||||
// The connect index and gateway-services iterators are watched
|
||||
wantBeforeWatchSetSize: 2,
|
||||
updateFn: func(s *Store) {
|
||||
testRegisterConnectNativeService(t, s, 4, "node1", "test")
|
||||
},
|
||||
shouldFire: true,
|
||||
wantAfterIndex: 4,
|
||||
wantAfterResLen: 1,
|
||||
// Should take the optimized path where we only watch the service index
|
||||
// and the connect index iterator.
|
||||
wantAfterWatchSetSize: 2,
|
||||
// Should take the optimized path where we only watch the service index,
|
||||
// connect index iterator, and gateway-services iterator.
|
||||
wantAfterWatchSetSize: 3,
|
||||
},
|
||||
{
|
||||
name: "unblocks on subsequent connect-native service registration",
|
||||
|
@ -3071,18 +3071,18 @@ func TestStateStore_ConnectQueryBlocking(t *testing.T) {
|
|||
},
|
||||
svc: "test",
|
||||
wantBeforeResLen: 1,
|
||||
// Should take the optimized path where we only watch the service index
|
||||
// and the connect index iterator.
|
||||
wantBeforeWatchSetSize: 2,
|
||||
// Should take the optimized path where we only watch the service index,
|
||||
// connect index iterator, and gateway-services iterator.
|
||||
wantBeforeWatchSetSize: 3,
|
||||
updateFn: func(s *Store) {
|
||||
testRegisterConnectNativeService(t, s, 5, "node2", "test")
|
||||
},
|
||||
shouldFire: true,
|
||||
wantAfterIndex: 5,
|
||||
wantAfterResLen: 2,
|
||||
// Should take the optimized path where we only watch the service index
|
||||
// and the connect index iterator.
|
||||
wantAfterWatchSetSize: 2,
|
||||
// Should take the optimized path where we only watch the service index,
|
||||
// connect index iterator, and gateway-services iterator.
|
||||
wantAfterWatchSetSize: 3,
|
||||
},
|
||||
{
|
||||
name: "unblocks on connect-native service de-registration",
|
||||
|
@ -3092,18 +3092,18 @@ func TestStateStore_ConnectQueryBlocking(t *testing.T) {
|
|||
},
|
||||
svc: "test",
|
||||
wantBeforeResLen: 2,
|
||||
// Should take the optimized path where we only watch the service index
|
||||
// and the connect index iterator.
|
||||
wantBeforeWatchSetSize: 2,
|
||||
// Should take the optimized path where we only watch the service index,
|
||||
// connect index iterator, and gateway-services iterator.
|
||||
wantBeforeWatchSetSize: 3,
|
||||
updateFn: func(s *Store) {
|
||||
require.NoError(t, s.DeleteService(6, "node2", "test", nil))
|
||||
},
|
||||
shouldFire: true,
|
||||
wantAfterIndex: 6,
|
||||
wantAfterResLen: 1,
|
||||
// Should take the optimized path where we only watch the service index
|
||||
// and the connect index iterator.
|
||||
wantAfterWatchSetSize: 2,
|
||||
// Should take the optimized path where we only watch the service index,
|
||||
// connect index iterator, and gateway-services iterator.
|
||||
wantAfterWatchSetSize: 3,
|
||||
},
|
||||
{
|
||||
name: "unblocks on last connect-native service de-registration",
|
||||
|
@ -3112,34 +3112,34 @@ func TestStateStore_ConnectQueryBlocking(t *testing.T) {
|
|||
},
|
||||
svc: "test",
|
||||
wantBeforeResLen: 1,
|
||||
// Should take the optimized path where we only watch the service index
|
||||
// and the connect index iterator.
|
||||
wantBeforeWatchSetSize: 2,
|
||||
// Should take the optimized path where we only watch the service index,
|
||||
// connect index iterator, and gateway-services iterator.
|
||||
wantBeforeWatchSetSize: 3,
|
||||
updateFn: func(s *Store) {
|
||||
require.NoError(t, s.DeleteService(6, "node1", "test", nil))
|
||||
},
|
||||
shouldFire: true,
|
||||
wantAfterIndex: 6,
|
||||
wantAfterResLen: 0,
|
||||
// Only the connect index iterator is watched
|
||||
wantAfterWatchSetSize: 1,
|
||||
// The connect index and gateway-services iterators are watched
|
||||
wantAfterWatchSetSize: 2,
|
||||
},
|
||||
{
|
||||
name: "unblocks on first proxy service registration",
|
||||
setupFn: nil,
|
||||
svc: "test",
|
||||
wantBeforeResLen: 0,
|
||||
// Only the connect index iterator is watched
|
||||
wantBeforeWatchSetSize: 1,
|
||||
// The connect index and gateway-services iterators are watched
|
||||
wantBeforeWatchSetSize: 2,
|
||||
updateFn: func(s *Store) {
|
||||
testRegisterSidecarProxy(t, s, 4, "node1", "test")
|
||||
},
|
||||
shouldFire: true,
|
||||
wantAfterIndex: 4,
|
||||
wantAfterResLen: 1,
|
||||
// Should take the optimized path where we only watch the service index
|
||||
// and the connect index iterator.
|
||||
wantAfterWatchSetSize: 2,
|
||||
// Should take the optimized path where we only watch the service index,
|
||||
// connect index iterator, and gateway-services iterator.
|
||||
wantAfterWatchSetSize: 3,
|
||||
},
|
||||
{
|
||||
name: "unblocks on subsequent proxy service registration",
|
||||
|
@ -3148,18 +3148,18 @@ func TestStateStore_ConnectQueryBlocking(t *testing.T) {
|
|||
},
|
||||
svc: "test",
|
||||
wantBeforeResLen: 1,
|
||||
// Should take the optimized path where we only watch the service index
|
||||
// and the connect index iterator.
|
||||
wantBeforeWatchSetSize: 2,
|
||||
// Should take the optimized path where we only watch the service index,
|
||||
// connect index iterator, and gateway-services iterator.
|
||||
wantBeforeWatchSetSize: 3,
|
||||
updateFn: func(s *Store) {
|
||||
testRegisterSidecarProxy(t, s, 5, "node2", "test")
|
||||
},
|
||||
shouldFire: true,
|
||||
wantAfterIndex: 5,
|
||||
wantAfterResLen: 2,
|
||||
// Should take the optimized path where we only watch the service index
|
||||
// and the connect index iterator.
|
||||
wantAfterWatchSetSize: 2,
|
||||
// Should take the optimized path where we only watch the service index,
|
||||
// connect index iterator, and gateway-services iterator.
|
||||
wantAfterWatchSetSize: 3,
|
||||
},
|
||||
{
|
||||
name: "unblocks on proxy service de-registration",
|
||||
|
@ -3169,18 +3169,18 @@ func TestStateStore_ConnectQueryBlocking(t *testing.T) {
|
|||
},
|
||||
svc: "test",
|
||||
wantBeforeResLen: 2,
|
||||
// Should take the optimized path where we only watch the service index
|
||||
// and the connect index iterator.
|
||||
wantBeforeWatchSetSize: 2,
|
||||
// Should take the optimized path where we only watch the service index,
|
||||
// connect index iterator, and gateway-services iterator.
|
||||
wantBeforeWatchSetSize: 3,
|
||||
updateFn: func(s *Store) {
|
||||
require.NoError(t, s.DeleteService(6, "node2", "test-sidecar-proxy", nil))
|
||||
},
|
||||
shouldFire: true,
|
||||
wantAfterIndex: 6,
|
||||
wantAfterResLen: 1,
|
||||
// Should take the optimized path where we only watch the service index
|
||||
// and the connect index iterator.
|
||||
wantAfterWatchSetSize: 2,
|
||||
// Should take the optimized path where we only watch the service index,
|
||||
// connect index iterator, and gateway-services iterator.
|
||||
wantAfterWatchSetSize: 3,
|
||||
},
|
||||
{
|
||||
name: "unblocks on last proxy service de-registration",
|
||||
|
@ -3189,17 +3189,17 @@ func TestStateStore_ConnectQueryBlocking(t *testing.T) {
|
|||
},
|
||||
svc: "test",
|
||||
wantBeforeResLen: 1,
|
||||
// Should take the optimized path where we only watch the service index
|
||||
// and the connect index iterator.
|
||||
wantBeforeWatchSetSize: 2,
|
||||
// Should take the optimized path where we only watch the service index,
|
||||
// connect index iterator, and gateway-services iterator.
|
||||
wantBeforeWatchSetSize: 3,
|
||||
updateFn: func(s *Store) {
|
||||
require.NoError(t, s.DeleteService(6, "node1", "test-sidecar-proxy", nil))
|
||||
},
|
||||
shouldFire: true,
|
||||
wantAfterIndex: 6,
|
||||
wantAfterResLen: 0,
|
||||
// Only the connect index iterator is watched
|
||||
wantAfterWatchSetSize: 1,
|
||||
// The connect index and gateway-services iterators are watched
|
||||
wantAfterWatchSetSize: 2,
|
||||
},
|
||||
{
|
||||
name: "unblocks on connect-native service health check change",
|
||||
|
@ -3209,18 +3209,18 @@ func TestStateStore_ConnectQueryBlocking(t *testing.T) {
|
|||
},
|
||||
svc: "test",
|
||||
wantBeforeResLen: 1,
|
||||
// Should take the optimized path where we only watch the service index
|
||||
// and the connect index iterator.
|
||||
wantBeforeWatchSetSize: 2,
|
||||
// Should take the optimized path where we only watch the service index,
|
||||
// connect index iterator, and gateway-services iterator.
|
||||
wantBeforeWatchSetSize: 3,
|
||||
updateFn: func(s *Store) {
|
||||
testRegisterCheck(t, s, 7, "node1", "test", "check1", "critical")
|
||||
},
|
||||
shouldFire: true,
|
||||
wantAfterIndex: 7,
|
||||
wantAfterResLen: 1, // critical filtering doesn't happen in the state store method.
|
||||
// Should take the optimized path where we only watch the service index
|
||||
// and the connect index iterator.
|
||||
wantAfterWatchSetSize: 2,
|
||||
// Should take the optimized path where we only watch the service index,
|
||||
// connect index iterator, and gateway-services iterator.
|
||||
wantAfterWatchSetSize: 3,
|
||||
},
|
||||
{
|
||||
name: "unblocks on proxy service health check change",
|
||||
|
@ -3230,18 +3230,18 @@ func TestStateStore_ConnectQueryBlocking(t *testing.T) {
|
|||
},
|
||||
svc: "test",
|
||||
wantBeforeResLen: 1,
|
||||
// Should take the optimized path where we only watch the service index
|
||||
// and the connect index iterator.
|
||||
wantBeforeWatchSetSize: 2,
|
||||
// Should take the optimized path where we only watch the service index,
|
||||
// connect index iterator, and gateway-services iterator.
|
||||
wantBeforeWatchSetSize: 3,
|
||||
updateFn: func(s *Store) {
|
||||
testRegisterCheck(t, s, 7, "node1", "test-sidecar-proxy", "check1", "critical")
|
||||
},
|
||||
shouldFire: true,
|
||||
wantAfterIndex: 7,
|
||||
wantAfterResLen: 1, // critical filtering doesn't happen in the state store method.
|
||||
// Should take the optimized path where we only watch the service index
|
||||
// and the connect index iterator.
|
||||
wantAfterWatchSetSize: 2,
|
||||
// Should take the optimized path where we only watch the service index,
|
||||
// connect index iterator, and gateway-services iterator.
|
||||
wantAfterWatchSetSize: 3,
|
||||
},
|
||||
{
|
||||
name: "unblocks on connect-native node health check change",
|
||||
|
@ -3251,18 +3251,18 @@ func TestStateStore_ConnectQueryBlocking(t *testing.T) {
|
|||
},
|
||||
svc: "test",
|
||||
wantBeforeResLen: 1,
|
||||
// Should take the optimized path where we only watch the service index
|
||||
// and the connect index iterator.
|
||||
wantBeforeWatchSetSize: 2,
|
||||
// Should take the optimized path where we only watch the service index,
|
||||
// connect index iterator, and gateway-services iterator.
|
||||
wantBeforeWatchSetSize: 3,
|
||||
updateFn: func(s *Store) {
|
||||
testRegisterCheck(t, s, 7, "node1", "", "check1", "critical")
|
||||
},
|
||||
shouldFire: true,
|
||||
wantAfterIndex: 7,
|
||||
wantAfterResLen: 1, // critical filtering doesn't happen in the state store method.
|
||||
// Should take the optimized path where we only watch the service index
|
||||
// and the connect index iterator.
|
||||
wantAfterWatchSetSize: 2,
|
||||
// Should take the optimized path where we only watch the service index,
|
||||
// connect index iterator, and gateway-services iterator.
|
||||
wantAfterWatchSetSize: 3,
|
||||
},
|
||||
{
|
||||
name: "unblocks on proxy service health check change",
|
||||
|
@ -3272,18 +3272,18 @@ func TestStateStore_ConnectQueryBlocking(t *testing.T) {
|
|||
},
|
||||
svc: "test",
|
||||
wantBeforeResLen: 1,
|
||||
// Should take the optimized path where we only watch the service index
|
||||
// and the connect index iterator.
|
||||
wantBeforeWatchSetSize: 2,
|
||||
// Should take the optimized path where we only watch the service index,
|
||||
// connect index iterator, and gateway-services iterator.
|
||||
wantBeforeWatchSetSize: 3,
|
||||
updateFn: func(s *Store) {
|
||||
testRegisterCheck(t, s, 7, "node1", "", "check1", "critical")
|
||||
},
|
||||
shouldFire: true,
|
||||
wantAfterIndex: 7,
|
||||
wantAfterResLen: 1, // critical filtering doesn't happen in the state store method.
|
||||
// Should take the optimized path where we only watch the service index
|
||||
// and the connect index iterator.
|
||||
wantAfterWatchSetSize: 2,
|
||||
// Should take the optimized path where we only watch the service index,
|
||||
// connect index iterator, and gateway-services iterator.
|
||||
wantAfterWatchSetSize: 3,
|
||||
},
|
||||
{
|
||||
// See https://github.com/hashicorp/consul/issues/5506. The issue is cause
|
||||
|
@ -3302,18 +3302,18 @@ func TestStateStore_ConnectQueryBlocking(t *testing.T) {
|
|||
},
|
||||
svc: "test",
|
||||
wantBeforeResLen: 1,
|
||||
// Should take the optimized path where we only watch the service index
|
||||
// and the connect index iterator.
|
||||
wantBeforeWatchSetSize: 2,
|
||||
// Should take the optimized path where we only watch the service index,
|
||||
// connect index iterator, and gateway-services iterator.
|
||||
wantBeforeWatchSetSize: 3,
|
||||
updateFn: func(s *Store) {
|
||||
testRegisterCheck(t, s, 7, "node1", "test-sidecar-proxy", "check1", "critical")
|
||||
},
|
||||
shouldFire: true,
|
||||
wantAfterIndex: 7,
|
||||
wantAfterResLen: 1, // critical filtering doesn't happen in the state store method.
|
||||
// Should take the optimized path where we only watch the service index
|
||||
// and the connect index iterator.
|
||||
wantAfterWatchSetSize: 2,
|
||||
// Should take the optimized path where we only watch the service index,
|
||||
// connect index iterator, and gateway-services iterator.
|
||||
wantAfterWatchSetSize: 3,
|
||||
},
|
||||
{
|
||||
// See https://github.com/hashicorp/consul/issues/5506. This is the edge
|
||||
|
@ -3324,9 +3324,9 @@ func TestStateStore_ConnectQueryBlocking(t *testing.T) {
|
|||
},
|
||||
svc: "test",
|
||||
wantBeforeResLen: 1,
|
||||
// Should take the optimized path where we only watch the service index
|
||||
// and the connect index iterator.
|
||||
wantBeforeWatchSetSize: 2,
|
||||
// Should take the optimized path where we only watch the service index,
|
||||
// connect index iterator, and gateway-services iterator.
|
||||
wantBeforeWatchSetSize: 3,
|
||||
updateFn: func(s *Store) {
|
||||
// Register a new result with a different service name could be another
|
||||
// proxy with a different name, but a native instance works too.
|
||||
|
@ -3335,9 +3335,9 @@ func TestStateStore_ConnectQueryBlocking(t *testing.T) {
|
|||
shouldFire: true,
|
||||
wantAfterIndex: 5,
|
||||
wantAfterResLen: 2,
|
||||
// Should take the optimized path where we only watch the teo service
|
||||
// indexes and the connect index iterator.
|
||||
wantAfterWatchSetSize: 3,
|
||||
// Should take the optimized path where we only watch the service indexes,
|
||||
// connect index iterator, and gateway-services iterator.
|
||||
wantAfterWatchSetSize: 4,
|
||||
},
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue