diff --git a/.changelog/15789.txt b/.changelog/15789.txt new file mode 100644 index 0000000000..682b4bd1e2 --- /dev/null +++ b/.changelog/15789.txt @@ -0,0 +1,3 @@ +```release-note:bug +xds: fix bug where sessions for locally-managed services could fail with "this server has too many xDS streams open" +``` diff --git a/GNUmakefile b/GNUmakefile index 926e1a3e73..fc79d47518 100644 --- a/GNUmakefile +++ b/GNUmakefile @@ -8,7 +8,7 @@ SHELL = bash # or the string @DEV to imply use what is currently installed locally. ### GOLANGCI_LINT_VERSION='v1.50.1' -MOCKERY_VERSION='v2.12.2' +MOCKERY_VERSION='v2.15.0' BUF_VERSION='v1.4.0' PROTOC_GEN_GO_GRPC_VERSION="v1.2.0" MOG_VERSION='v0.3.0' diff --git a/agent/agent.go b/agent/agent.go index a17d8d5317..4ecd2a66ca 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -835,6 +835,7 @@ func (a *Agent) listenAndServeGRPC() error { Manager: a.proxyConfig, GetStore: func() catalogproxycfg.Store { return server.FSM().State() }, Logger: a.proxyConfig.Logger.Named("server-catalog"), + SessionLimiter: a.baseDeps.XDSStreamLimiter, }) go func() { <-a.shutdownCh @@ -851,7 +852,6 @@ func (a *Agent) listenAndServeGRPC() error { return a.delegate.ResolveTokenAndDefaultMeta(id, nil, nil) }, a, - a.baseDeps.XDSStreamLimiter, ) a.xdsServer.Register(a.externalGRPCServer) diff --git a/agent/cache/mock_Request.go b/agent/cache/mock_Request.go index dd585c57e0..30fda56461 100644 --- a/agent/cache/mock_Request.go +++ b/agent/cache/mock_Request.go @@ -1,12 +1,8 @@ -// Code generated by mockery v2.12.2. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package cache -import ( - testing "testing" - - mock "github.com/stretchr/testify/mock" -) +import mock "github.com/stretchr/testify/mock" // MockRequest is an autogenerated mock type for the Request type type MockRequest struct { @@ -27,8 +23,13 @@ func (_m *MockRequest) CacheInfo() RequestInfo { return r0 } -// NewMockRequest creates a new instance of MockRequest. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockRequest(t testing.TB) *MockRequest { +type mockConstructorTestingTNewMockRequest interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockRequest creates a new instance of MockRequest. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockRequest(t mockConstructorTestingTNewMockRequest) *MockRequest { mock := &MockRequest{} mock.Mock.Test(t) diff --git a/agent/cache/mock_Type.go b/agent/cache/mock_Type.go index 76642bb5ce..9f1a324222 100644 --- a/agent/cache/mock_Type.go +++ b/agent/cache/mock_Type.go @@ -1,12 +1,8 @@ -// Code generated by mockery v2.12.2. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package cache -import ( - testing "testing" - - mock "github.com/stretchr/testify/mock" -) +import mock "github.com/stretchr/testify/mock" // MockType is an autogenerated mock type for the Type type type MockType struct { @@ -48,8 +44,13 @@ func (_m *MockType) RegisterOptions() RegisterOptions { return r0 } -// NewMockType creates a new instance of MockType. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockType(t testing.TB) *MockType { +type mockConstructorTestingTNewMockType interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockType creates a new instance of MockType. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockType(t mockConstructorTestingTNewMockType) *MockType { mock := &MockType{} mock.Mock.Test(t) diff --git a/agent/connect/ca/mock_Provider.go b/agent/connect/ca/mock_Provider.go index 5c9bfcfcd2..50136cbc74 100644 --- a/agent/connect/ca/mock_Provider.go +++ b/agent/connect/ca/mock_Provider.go @@ -1,13 +1,11 @@ -// Code generated by mockery v2.11.0. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package ca import ( - testing "testing" + x509 "crypto/x509" mock "github.com/stretchr/testify/mock" - - x509 "crypto/x509" ) // MockProvider is an autogenerated mock type for the Provider type @@ -155,13 +153,13 @@ func (_m *MockProvider) GenerateRoot() (RootResult, error) { return r0, r1 } -// SetIntermediate provides a mock function with given fields: intermediatePEM, rootPEM -func (_m *MockProvider) SetIntermediate(intermediatePEM string, rootPEM string, keyId string) error { - ret := _m.Called(intermediatePEM, rootPEM, keyId) +// SetIntermediate provides a mock function with given fields: intermediatePEM, rootPEM, opaque +func (_m *MockProvider) SetIntermediate(intermediatePEM string, rootPEM string, opaque string) error { + ret := _m.Called(intermediatePEM, rootPEM, opaque) var r0 error if rf, ok := ret.Get(0).(func(string, string, string) error); ok { - r0 = rf(intermediatePEM, rootPEM, keyId) + r0 = rf(intermediatePEM, rootPEM, opaque) } else { r0 = ret.Error(0) } @@ -255,9 +253,15 @@ func (_m *MockProvider) SupportsCrossSigning() (bool, error) { return r0, r1 } -// NewMockProvider creates a new instance of MockProvider. It also registers a cleanup function to assert the mocks expectations. -func NewMockProvider(t testing.TB) *MockProvider { +type mockConstructorTestingTNewMockProvider interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockProvider creates a new instance of MockProvider. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockProvider(t mockConstructorTestingTNewMockProvider) *MockProvider { mock := &MockProvider{} + mock.Mock.Test(t) t.Cleanup(func() { mock.AssertExpectations(t) }) diff --git a/agent/consul/auth/mock_ACLCache.go b/agent/consul/auth/mock_ACLCache.go index e8e5c68283..2df0b0e382 100644 --- a/agent/consul/auth/mock_ACLCache.go +++ b/agent/consul/auth/mock_ACLCache.go @@ -1,12 +1,8 @@ -// Code generated by mockery v2.12.0. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package auth -import ( - testing "testing" - - mock "github.com/stretchr/testify/mock" -) +import mock "github.com/stretchr/testify/mock" // MockACLCache is an autogenerated mock type for the ACLCache type type MockACLCache struct { @@ -18,8 +14,13 @@ func (_m *MockACLCache) RemoveIdentityWithSecretToken(secretToken string) { _m.Called(secretToken) } -// NewMockACLCache creates a new instance of MockACLCache. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockACLCache(t testing.TB) *MockACLCache { +type mockConstructorTestingTNewMockACLCache interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockACLCache creates a new instance of MockACLCache. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockACLCache(t mockConstructorTestingTNewMockACLCache) *MockACLCache { mock := &MockACLCache{} mock.Mock.Test(t) diff --git a/agent/consul/autopilotevents/mock_Publisher_test.go b/agent/consul/autopilotevents/mock_Publisher_test.go index c0a736be3d..aeaf39f5b4 100644 --- a/agent/consul/autopilotevents/mock_Publisher_test.go +++ b/agent/consul/autopilotevents/mock_Publisher_test.go @@ -1,10 +1,8 @@ -// Code generated by mockery v2.12.2. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package autopilotevents import ( - testing "testing" - stream "github.com/hashicorp/consul/agent/consul/stream" mock "github.com/stretchr/testify/mock" ) @@ -19,8 +17,13 @@ func (_m *MockPublisher) Publish(_a0 []stream.Event) { _m.Called(_a0) } -// NewMockPublisher creates a new instance of MockPublisher. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockPublisher(t testing.TB) *MockPublisher { +type mockConstructorTestingTNewMockPublisher interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockPublisher creates a new instance of MockPublisher. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockPublisher(t mockConstructorTestingTNewMockPublisher) *MockPublisher { mock := &MockPublisher{} mock.Mock.Test(t) diff --git a/agent/consul/autopilotevents/mock_StateStore_test.go b/agent/consul/autopilotevents/mock_StateStore_test.go index dd048e58eb..9391f21da9 100644 --- a/agent/consul/autopilotevents/mock_StateStore_test.go +++ b/agent/consul/autopilotevents/mock_StateStore_test.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.12.2. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package autopilotevents @@ -10,8 +10,6 @@ import ( structs "github.com/hashicorp/consul/agent/structs" - testing "testing" - types "github.com/hashicorp/consul/types" ) @@ -80,8 +78,13 @@ func (_m *MockStateStore) NodeService(ws memdb.WatchSet, nodeName string, servic return r0, r1, r2 } -// NewMockStateStore creates a new instance of MockStateStore. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockStateStore(t testing.TB) *MockStateStore { +type mockConstructorTestingTNewMockStateStore interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockStateStore creates a new instance of MockStateStore. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockStateStore(t mockConstructorTestingTNewMockStateStore) *MockStateStore { mock := &MockStateStore{} mock.Mock.Test(t) diff --git a/agent/consul/autopilotevents/mock_timeProvider_test.go b/agent/consul/autopilotevents/mock_timeProvider_test.go index 4640fadde0..ccc312df3e 100644 --- a/agent/consul/autopilotevents/mock_timeProvider_test.go +++ b/agent/consul/autopilotevents/mock_timeProvider_test.go @@ -1,13 +1,11 @@ -// Code generated by mockery v2.12.2. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package autopilotevents import ( - testing "testing" + time "time" mock "github.com/stretchr/testify/mock" - - time "time" ) // mockTimeProvider is an autogenerated mock type for the timeProvider type @@ -29,8 +27,13 @@ func (_m *mockTimeProvider) Now() time.Time { return r0 } -// newMockTimeProvider creates a new instance of mockTimeProvider. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func newMockTimeProvider(t testing.TB) *mockTimeProvider { +type mockConstructorTestingTnewMockTimeProvider interface { + mock.TestingT + Cleanup(func()) +} + +// newMockTimeProvider creates a new instance of mockTimeProvider. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func newMockTimeProvider(t mockConstructorTestingTnewMockTimeProvider) *mockTimeProvider { mock := &mockTimeProvider{} mock.Mock.Test(t) diff --git a/agent/consul/watch/mock_StateStore_test.go b/agent/consul/watch/mock_StateStore_test.go index 08d58e2f04..37cc383669 100644 --- a/agent/consul/watch/mock_StateStore_test.go +++ b/agent/consul/watch/mock_StateStore_test.go @@ -1,12 +1,8 @@ -// Code generated by mockery v2.12.2. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package watch -import ( - testing "testing" - - mock "github.com/stretchr/testify/mock" -) +import mock "github.com/stretchr/testify/mock" // MockStateStore is an autogenerated mock type for the StateStore type type MockStateStore struct { @@ -29,8 +25,13 @@ func (_m *MockStateStore) AbandonCh() <-chan struct{} { return r0 } -// NewMockStateStore creates a new instance of MockStateStore. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockStateStore(t testing.TB) *MockStateStore { +type mockConstructorTestingTNewMockStateStore interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockStateStore creates a new instance of MockStateStore. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockStateStore(t mockConstructorTestingTNewMockStateStore) *MockStateStore { mock := &MockStateStore{} mock.Mock.Test(t) diff --git a/agent/grpc-external/limiter/limiter.go b/agent/grpc-external/limiter/limiter.go index 115ce0623b..9c0fcf795b 100644 --- a/agent/grpc-external/limiter/limiter.go +++ b/agent/grpc-external/limiter/limiter.go @@ -213,6 +213,10 @@ func (l *SessionLimiter) deleteSessionWithID(id uint64) { l.deleteSessionLocked(idx, id) } +// SessionTerminatedChan is a channel that will be closed to notify session- +// holders that a session has been terminated. +type SessionTerminatedChan <-chan struct{} + // Session allows its holder to perform an operation (e.g. serve a gRPC stream) // concurrenly with other session-holders. Sessions may be terminated abruptly // by the SessionLimiter, so it is the responsibility of the holder to receive @@ -228,7 +232,7 @@ type Session interface { // // The session-holder MUST receive on it and exit (e.g. close the gRPC stream) // when it is closed. - Terminated() <-chan struct{} + Terminated() SessionTerminatedChan } type session struct { @@ -240,6 +244,6 @@ type session struct { func (s *session) End() { s.l.deleteSessionWithID(s.id) } -func (s *session) Terminated() <-chan struct{} { return s.termCh } +func (s *session) Terminated() SessionTerminatedChan { return s.termCh } func (s *session) terminate() { close(s.termCh) } diff --git a/agent/grpc-external/services/acl/mock_Login.go b/agent/grpc-external/services/acl/mock_Login.go index 3c33169a86..d20d676d40 100644 --- a/agent/grpc-external/services/acl/mock_Login.go +++ b/agent/grpc-external/services/acl/mock_Login.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.12.0. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package acl @@ -7,8 +7,6 @@ import ( mock "github.com/stretchr/testify/mock" structs "github.com/hashicorp/consul/agent/structs" - - testing "testing" ) // MockLogin is an autogenerated mock type for the Login type @@ -39,8 +37,13 @@ func (_m *MockLogin) TokenForVerifiedIdentity(identity *authmethod.Identity, aut return r0, r1 } -// NewMockLogin creates a new instance of MockLogin. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockLogin(t testing.TB) *MockLogin { +type mockConstructorTestingTNewMockLogin interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockLogin creates a new instance of MockLogin. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockLogin(t mockConstructorTestingTNewMockLogin) *MockLogin { mock := &MockLogin{} mock.Mock.Test(t) diff --git a/agent/grpc-external/services/acl/mock_TokenWriter.go b/agent/grpc-external/services/acl/mock_TokenWriter.go index 19408afc88..a0a31b948c 100644 --- a/agent/grpc-external/services/acl/mock_TokenWriter.go +++ b/agent/grpc-external/services/acl/mock_TokenWriter.go @@ -1,12 +1,8 @@ -// Code generated by mockery v2.12.0. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package acl -import ( - testing "testing" - - mock "github.com/stretchr/testify/mock" -) +import mock "github.com/stretchr/testify/mock" // MockTokenWriter is an autogenerated mock type for the TokenWriter type type MockTokenWriter struct { @@ -27,8 +23,13 @@ func (_m *MockTokenWriter) Delete(secretID string, fromLogout bool) error { return r0 } -// NewMockTokenWriter creates a new instance of MockTokenWriter. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockTokenWriter(t testing.TB) *MockTokenWriter { +type mockConstructorTestingTNewMockTokenWriter interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockTokenWriter creates a new instance of MockTokenWriter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockTokenWriter(t mockConstructorTestingTNewMockTokenWriter) *MockTokenWriter { mock := &MockTokenWriter{} mock.Mock.Test(t) diff --git a/agent/grpc-external/services/acl/mock_Validator.go b/agent/grpc-external/services/acl/mock_Validator.go index 3c27ec38ba..3436762d24 100644 --- a/agent/grpc-external/services/acl/mock_Validator.go +++ b/agent/grpc-external/services/acl/mock_Validator.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.12.0. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package acl @@ -8,8 +8,6 @@ import ( authmethod "github.com/hashicorp/consul/agent/consul/authmethod" mock "github.com/stretchr/testify/mock" - - testing "testing" ) // MockValidator is an autogenerated mock type for the Validator type @@ -40,8 +38,13 @@ func (_m *MockValidator) ValidateLogin(ctx context.Context, loginToken string) ( return r0, r1 } -// NewMockValidator creates a new instance of MockValidator. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockValidator(t testing.TB) *MockValidator { +type mockConstructorTestingTNewMockValidator interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockValidator creates a new instance of MockValidator. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockValidator(t mockConstructorTestingTNewMockValidator) *MockValidator { mock := &MockValidator{} mock.Mock.Test(t) diff --git a/agent/grpc-external/services/connectca/mock_ACLResolver.go b/agent/grpc-external/services/connectca/mock_ACLResolver.go index 24fb26a225..9c2f2cac2c 100644 --- a/agent/grpc-external/services/connectca/mock_ACLResolver.go +++ b/agent/grpc-external/services/connectca/mock_ACLResolver.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.12.0. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package connectca @@ -7,8 +7,6 @@ import ( mock "github.com/stretchr/testify/mock" resolver "github.com/hashicorp/consul/acl/resolver" - - testing "testing" ) // MockACLResolver is an autogenerated mock type for the ACLResolver type @@ -37,8 +35,13 @@ func (_m *MockACLResolver) ResolveTokenAndDefaultMeta(token string, entMeta *acl return r0, r1 } -// NewMockACLResolver creates a new instance of MockACLResolver. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockACLResolver(t testing.TB) *MockACLResolver { +type mockConstructorTestingTNewMockACLResolver interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockACLResolver creates a new instance of MockACLResolver. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockACLResolver(t mockConstructorTestingTNewMockACLResolver) *MockACLResolver { mock := &MockACLResolver{} mock.Mock.Test(t) diff --git a/agent/grpc-external/services/connectca/mock_CAManager.go b/agent/grpc-external/services/connectca/mock_CAManager.go index 2667692c33..296186bbe5 100644 --- a/agent/grpc-external/services/connectca/mock_CAManager.go +++ b/agent/grpc-external/services/connectca/mock_CAManager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.12.0. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package connectca @@ -8,8 +8,6 @@ import ( structs "github.com/hashicorp/consul/agent/structs" - testing "testing" - x509 "crypto/x509" ) @@ -41,8 +39,13 @@ func (_m *MockCAManager) AuthorizeAndSignCertificate(csr *x509.CertificateReques return r0, r1 } -// NewMockCAManager creates a new instance of MockCAManager. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockCAManager(t testing.TB) *MockCAManager { +type mockConstructorTestingTNewMockCAManager interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockCAManager creates a new instance of MockCAManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockCAManager(t mockConstructorTestingTNewMockCAManager) *MockCAManager { mock := &MockCAManager{} mock.Mock.Test(t) diff --git a/agent/grpc-external/services/dataplane/mock_ACLResolver.go b/agent/grpc-external/services/dataplane/mock_ACLResolver.go index 0408d3a50c..7818116240 100644 --- a/agent/grpc-external/services/dataplane/mock_ACLResolver.go +++ b/agent/grpc-external/services/dataplane/mock_ACLResolver.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.12.0. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package dataplane @@ -7,8 +7,6 @@ import ( mock "github.com/stretchr/testify/mock" resolver "github.com/hashicorp/consul/acl/resolver" - - testing "testing" ) // MockACLResolver is an autogenerated mock type for the ACLResolver type @@ -37,8 +35,13 @@ func (_m *MockACLResolver) ResolveTokenAndDefaultMeta(_a0 string, _a1 *acl.Enter return r0, r1 } -// NewMockACLResolver creates a new instance of MockACLResolver. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockACLResolver(t testing.TB) *MockACLResolver { +type mockConstructorTestingTNewMockACLResolver interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockACLResolver creates a new instance of MockACLResolver. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockACLResolver(t mockConstructorTestingTNewMockACLResolver) *MockACLResolver { mock := &MockACLResolver{} mock.Mock.Test(t) diff --git a/agent/grpc-external/services/peerstream/mock_ACLResolver.go b/agent/grpc-external/services/peerstream/mock_ACLResolver.go index d0e6720887..e4027a5da5 100644 --- a/agent/grpc-external/services/peerstream/mock_ACLResolver.go +++ b/agent/grpc-external/services/peerstream/mock_ACLResolver.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.12.2. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package peerstream @@ -7,8 +7,6 @@ import ( mock "github.com/stretchr/testify/mock" resolver "github.com/hashicorp/consul/acl/resolver" - - testing "testing" ) // MockACLResolver is an autogenerated mock type for the ACLResolver type @@ -37,8 +35,13 @@ func (_m *MockACLResolver) ResolveTokenAndDefaultMeta(_a0 string, _a1 *acl.Enter return r0, r1 } -// NewMockACLResolver creates a new instance of MockACLResolver. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockACLResolver(t testing.TB) *MockACLResolver { +type mockConstructorTestingTNewMockACLResolver interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockACLResolver creates a new instance of MockACLResolver. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockACLResolver(t mockConstructorTestingTNewMockACLResolver) *MockACLResolver { mock := &MockACLResolver{} mock.Mock.Test(t) diff --git a/agent/grpc-external/services/serverdiscovery/mock_ACLResolver.go b/agent/grpc-external/services/serverdiscovery/mock_ACLResolver.go index 850ec8bb95..4192be1ab9 100644 --- a/agent/grpc-external/services/serverdiscovery/mock_ACLResolver.go +++ b/agent/grpc-external/services/serverdiscovery/mock_ACLResolver.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.12.0. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package serverdiscovery @@ -7,8 +7,6 @@ import ( mock "github.com/stretchr/testify/mock" resolver "github.com/hashicorp/consul/acl/resolver" - - testing "testing" ) // MockACLResolver is an autogenerated mock type for the ACLResolver type @@ -37,8 +35,13 @@ func (_m *MockACLResolver) ResolveTokenAndDefaultMeta(_a0 string, _a1 *acl.Enter return r0, r1 } -// NewMockACLResolver creates a new instance of MockACLResolver. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockACLResolver(t testing.TB) *MockACLResolver { +type mockConstructorTestingTNewMockACLResolver interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockACLResolver creates a new instance of MockACLResolver. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockACLResolver(t mockConstructorTestingTNewMockACLResolver) *MockACLResolver { mock := &MockACLResolver{} mock.Mock.Test(t) diff --git a/agent/hcp/mock_Client.go b/agent/hcp/mock_Client.go index 4466bf2382..29bd27cbf1 100644 --- a/agent/hcp/mock_Client.go +++ b/agent/hcp/mock_Client.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.13.1. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package hcp diff --git a/agent/hcp/scada/mock_Provider.go b/agent/hcp/scada/mock_Provider.go index 6e12d63f62..251178095b 100644 --- a/agent/hcp/scada/mock_Provider.go +++ b/agent/hcp/scada/mock_Provider.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.14.0. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package scada diff --git a/agent/proxycfg-sources/catalog/config_source.go b/agent/proxycfg-sources/catalog/config_source.go index a6d60c328f..1080c64cec 100644 --- a/agent/proxycfg-sources/catalog/config_source.go +++ b/agent/proxycfg-sources/catalog/config_source.go @@ -10,6 +10,7 @@ import ( "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/configentry" + "github.com/hashicorp/consul/agent/grpc-external/limiter" "github.com/hashicorp/consul/agent/local" "github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/structs" @@ -44,13 +45,24 @@ func NewConfigSource(cfg Config) *ConfigSource { // Watch wraps the underlying proxycfg.Manager and dynamically registers // services from the catalog with it when requested by the xDS server. -func (m *ConfigSource) Watch(serviceID structs.ServiceID, nodeName string, token string) (<-chan *proxycfg.ConfigSnapshot, proxycfg.CancelFunc, error) { +func (m *ConfigSource) Watch(serviceID structs.ServiceID, nodeName string, token string) (<-chan *proxycfg.ConfigSnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) { // If the service is registered to the local agent, use the LocalConfigSource // rather than trying to configure it from the catalog. if nodeName == m.NodeName && m.LocalState.ServiceExists(serviceID) { return m.LocalConfigSource.Watch(serviceID, nodeName, token) } + // Begin a session with the xDS session concurrency limiter. + // + // We do this here rather than in the xDS server because we don't want to apply + // the limit to services from the LocalConfigSource. + // + // See: https://github.com/hashicorp/consul/issues/15753 + session, err := m.SessionLimiter.BeginSession() + if err != nil { + return nil, nil, nil, err + } + proxyID := proxycfg.ProxyID{ ServiceID: serviceID, NodeName: nodeName, @@ -66,6 +78,7 @@ func (m *ConfigSource) Watch(serviceID structs.ServiceID, nodeName string, token cancelOnce.Do(func() { cancelWatch() m.cleanup(proxyID) + session.End() }) } @@ -82,11 +95,12 @@ func (m *ConfigSource) Watch(serviceID structs.ServiceID, nodeName string, token if err := m.startSync(w.closeCh, proxyID); err != nil { delete(m.watches, proxyID) cancelWatch() - return nil, nil, err + session.End() + return nil, nil, nil, err } } - return snapCh, cancel, nil + return snapCh, session.Terminated(), cancel, nil } func (m *ConfigSource) Shutdown() { @@ -261,6 +275,9 @@ type Config struct { // Logger will be used to write log messages. Logger hclog.Logger + + // SessionLimiter is used to enforce xDS concurrency limits. + SessionLimiter SessionLimiter } //go:generate mockery --name ConfigManager --inpackage @@ -278,5 +295,10 @@ type Store interface { //go:generate mockery --name Watcher --inpackage type Watcher interface { - Watch(proxyID structs.ServiceID, nodeName string, token string) (<-chan *proxycfg.ConfigSnapshot, proxycfg.CancelFunc, error) + Watch(proxyID structs.ServiceID, nodeName string, token string) (<-chan *proxycfg.ConfigSnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) +} + +//go:generate mockery --name SessionLimiter --inpackage +type SessionLimiter interface { + BeginSession() (limiter.Session, error) } diff --git a/agent/proxycfg-sources/catalog/config_source_test.go b/agent/proxycfg-sources/catalog/config_source_test.go index cf460a0e42..e9ecbc532c 100644 --- a/agent/proxycfg-sources/catalog/config_source_test.go +++ b/agent/proxycfg-sources/catalog/config_source_test.go @@ -11,6 +11,7 @@ import ( "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/stream" + "github.com/hashicorp/consul/agent/grpc-external/limiter" "github.com/hashicorp/consul/agent/local" "github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/structs" @@ -47,16 +48,33 @@ func TestConfigSource_Success(t *testing.T) { // behavior without sleeping which leads to slow/racy tests. cfgMgr := testConfigManager(t, serviceID, nodeName, token) + lim := NewMockSessionLimiter(t) + + session1 := newMockSession(t) + session1TermCh := make(limiter.SessionTerminatedChan) + session1.On("Terminated").Return(session1TermCh) + session1.On("End").Return() + + session2 := newMockSession(t) + session2TermCh := make(limiter.SessionTerminatedChan) + session2.On("Terminated").Return(session2TermCh) + session2.On("End").Return() + + lim.On("BeginSession").Return(session1, nil).Once() + lim.On("BeginSession").Return(session2, nil).Once() + mgr := NewConfigSource(Config{ - Manager: cfgMgr, - LocalState: testLocalState(t), - Logger: hclog.NewNullLogger(), - GetStore: func() Store { return store }, + Manager: cfgMgr, + LocalState: testLocalState(t), + Logger: hclog.NewNullLogger(), + GetStore: func() Store { return store }, + SessionLimiter: lim, }) t.Cleanup(mgr.Shutdown) - snapCh, cancelWatch1, err := mgr.Watch(serviceID, nodeName, token) + snapCh, termCh, cancelWatch1, err := mgr.Watch(serviceID, nodeName, token) require.NoError(t, err) + require.Equal(t, session1TermCh, termCh) // Expect Register to have been called with the proxy's inital port. select { @@ -111,8 +129,9 @@ func TestConfigSource_Success(t *testing.T) { } // Start another watch. - _, cancelWatch2, err := mgr.Watch(serviceID, nodeName, token) + _, termCh2, cancelWatch2, err := mgr.Watch(serviceID, nodeName, token) require.NoError(t, err) + require.Equal(t, session2TermCh, termCh2) // Expect the service to have not been re-registered by the second watch. select { @@ -137,6 +156,9 @@ func TestConfigSource_Success(t *testing.T) { case <-time.After(100 * time.Millisecond): t.Fatal("timeout waiting for service to be de-registered") } + + session1.AssertCalled(t, "End") + session2.AssertCalled(t, "End") } func TestConfigSource_LocallyManagedService(t *testing.T) { @@ -149,7 +171,7 @@ func TestConfigSource_LocallyManagedService(t *testing.T) { localWatcher := NewMockWatcher(t) localWatcher.On("Watch", serviceID, nodeName, token). - Return(make(<-chan *proxycfg.ConfigSnapshot), proxycfg.CancelFunc(func() {}), nil) + Return(make(<-chan *proxycfg.ConfigSnapshot), nil, proxycfg.CancelFunc(func() {}), nil) mgr := NewConfigSource(Config{ NodeName: nodeName, @@ -157,10 +179,11 @@ func TestConfigSource_LocallyManagedService(t *testing.T) { LocalConfigSource: localWatcher, Logger: hclog.NewNullLogger(), GetStore: func() Store { panic("state store shouldn't have been used") }, + SessionLimiter: nullSessionLimiter{}, }) t.Cleanup(mgr.Shutdown) - _, _, err := mgr.Watch(serviceID, nodeName, token) + _, _, _, err := mgr.Watch(serviceID, nodeName, token) require.NoError(t, err) } @@ -192,17 +215,26 @@ func TestConfigSource_ErrorRegisteringService(t *testing.T) { cfgMgr.On("Register", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(errors.New("KABOOM")) + session := newMockSession(t) + session.On("End").Return() + + lim := NewMockSessionLimiter(t) + lim.On("BeginSession").Return(session, nil) + mgr := NewConfigSource(Config{ - Manager: cfgMgr, - LocalState: testLocalState(t), - Logger: hclog.NewNullLogger(), - GetStore: func() Store { return store }, + Manager: cfgMgr, + LocalState: testLocalState(t), + Logger: hclog.NewNullLogger(), + GetStore: func() Store { return store }, + SessionLimiter: lim, }) t.Cleanup(mgr.Shutdown) - _, _, err := mgr.Watch(serviceID, nodeName, token) + _, _, _, err := mgr.Watch(serviceID, nodeName, token) require.Error(t, err) require.True(t, canceledWatch, "watch should've been canceled") + + session.AssertCalled(t, "End") } func TestConfigSource_NotProxyService(t *testing.T) { @@ -231,19 +263,38 @@ func TestConfigSource_NotProxyService(t *testing.T) { Return(make(<-chan *proxycfg.ConfigSnapshot), cancel) mgr := NewConfigSource(Config{ - Manager: cfgMgr, - LocalState: testLocalState(t), - Logger: hclog.NewNullLogger(), - GetStore: func() Store { return store }, + Manager: cfgMgr, + LocalState: testLocalState(t), + Logger: hclog.NewNullLogger(), + GetStore: func() Store { return store }, + SessionLimiter: nullSessionLimiter{}, }) t.Cleanup(mgr.Shutdown) - _, _, err := mgr.Watch(serviceID, nodeName, token) + _, _, _, err := mgr.Watch(serviceID, nodeName, token) require.Error(t, err) require.Contains(t, err.Error(), "must be a sidecar proxy or gateway") require.True(t, canceledWatch, "watch should've been canceled") } +func TestConfigSource_SessionLimiterError(t *testing.T) { + lim := NewMockSessionLimiter(t) + lim.On("BeginSession").Return(nil, limiter.ErrCapacityReached) + + src := NewConfigSource(Config{ + LocalState: testLocalState(t), + SessionLimiter: lim, + }) + t.Cleanup(src.Shutdown) + + _, _, _, err := src.Watch( + structs.NewServiceID("web-sidecar-proxy-1", nil), + "node-name", + "token", + ) + require.Equal(t, limiter.ErrCapacityReached, err) +} + func testConfigManager(t *testing.T, serviceID structs.ServiceID, nodeName string, token string) ConfigManager { t.Helper() @@ -294,3 +345,34 @@ func testLocalState(t *testing.T) *local.State { l.TriggerSyncChanges = func() {} return l } + +type nullSessionLimiter struct{} + +func (nullSessionLimiter) BeginSession() (limiter.Session, error) { + return nullSession{}, nil +} + +type nullSession struct{} + +func (nullSession) End() {} + +func (nullSession) Terminated() limiter.SessionTerminatedChan { return nil } + +type mockSession struct { + mock.Mock +} + +func newMockSession(t *testing.T) *mockSession { + m := &mockSession{} + m.Mock.Test(t) + + t.Cleanup(func() { m.AssertExpectations(t) }) + + return m +} + +func (m *mockSession) End() { m.Called() } + +func (m *mockSession) Terminated() limiter.SessionTerminatedChan { + return m.Called().Get(0).(limiter.SessionTerminatedChan) +} diff --git a/agent/proxycfg-sources/catalog/mock_ConfigManager.go b/agent/proxycfg-sources/catalog/mock_ConfigManager.go index 047b61c878..3ae51c5f6a 100644 --- a/agent/proxycfg-sources/catalog/mock_ConfigManager.go +++ b/agent/proxycfg-sources/catalog/mock_ConfigManager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.12.0. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package catalog @@ -7,8 +7,6 @@ import ( mock "github.com/stretchr/testify/mock" structs "github.com/hashicorp/consul/agent/structs" - - testing "testing" ) // MockConfigManager is an autogenerated mock type for the ConfigManager type @@ -60,8 +58,13 @@ func (_m *MockConfigManager) Watch(req proxycfg.ProxyID) (<-chan *proxycfg.Confi return r0, r1 } -// NewMockConfigManager creates a new instance of MockConfigManager. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockConfigManager(t testing.TB) *MockConfigManager { +type mockConstructorTestingTNewMockConfigManager interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockConfigManager creates a new instance of MockConfigManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockConfigManager(t mockConstructorTestingTNewMockConfigManager) *MockConfigManager { mock := &MockConfigManager{} mock.Mock.Test(t) diff --git a/agent/proxycfg-sources/catalog/mock_SessionLimiter.go b/agent/proxycfg-sources/catalog/mock_SessionLimiter.go new file mode 100644 index 0000000000..3b7147cb06 --- /dev/null +++ b/agent/proxycfg-sources/catalog/mock_SessionLimiter.go @@ -0,0 +1,51 @@ +// Code generated by mockery v2.15.0. DO NOT EDIT. + +package catalog + +import ( + limiter "github.com/hashicorp/consul/agent/grpc-external/limiter" + mock "github.com/stretchr/testify/mock" +) + +// MockSessionLimiter is an autogenerated mock type for the SessionLimiter type +type MockSessionLimiter struct { + mock.Mock +} + +// BeginSession provides a mock function with given fields: +func (_m *MockSessionLimiter) BeginSession() (limiter.Session, error) { + ret := _m.Called() + + var r0 limiter.Session + if rf, ok := ret.Get(0).(func() limiter.Session); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(limiter.Session) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +type mockConstructorTestingTNewMockSessionLimiter interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockSessionLimiter creates a new instance of MockSessionLimiter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockSessionLimiter(t mockConstructorTestingTNewMockSessionLimiter) *MockSessionLimiter { + mock := &MockSessionLimiter{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/agent/proxycfg-sources/catalog/mock_Watcher.go b/agent/proxycfg-sources/catalog/mock_Watcher.go index 193b7d844f..d5ca046a40 100644 --- a/agent/proxycfg-sources/catalog/mock_Watcher.go +++ b/agent/proxycfg-sources/catalog/mock_Watcher.go @@ -1,14 +1,14 @@ -// Code generated by mockery v2.12.0. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package catalog import ( - proxycfg "github.com/hashicorp/consul/agent/proxycfg" + limiter "github.com/hashicorp/consul/agent/grpc-external/limiter" mock "github.com/stretchr/testify/mock" - structs "github.com/hashicorp/consul/agent/structs" + proxycfg "github.com/hashicorp/consul/agent/proxycfg" - testing "testing" + structs "github.com/hashicorp/consul/agent/structs" ) // MockWatcher is an autogenerated mock type for the Watcher type @@ -17,7 +17,7 @@ type MockWatcher struct { } // Watch provides a mock function with given fields: proxyID, nodeName, token -func (_m *MockWatcher) Watch(proxyID structs.ServiceID, nodeName string, token string) (<-chan *proxycfg.ConfigSnapshot, proxycfg.CancelFunc, error) { +func (_m *MockWatcher) Watch(proxyID structs.ServiceID, nodeName string, token string) (<-chan *proxycfg.ConfigSnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) { ret := _m.Called(proxyID, nodeName, token) var r0 <-chan *proxycfg.ConfigSnapshot @@ -29,27 +29,41 @@ func (_m *MockWatcher) Watch(proxyID structs.ServiceID, nodeName string, token s } } - var r1 proxycfg.CancelFunc - if rf, ok := ret.Get(1).(func(structs.ServiceID, string, string) proxycfg.CancelFunc); ok { + var r1 limiter.SessionTerminatedChan + if rf, ok := ret.Get(1).(func(structs.ServiceID, string, string) limiter.SessionTerminatedChan); ok { r1 = rf(proxyID, nodeName, token) } else { if ret.Get(1) != nil { - r1 = ret.Get(1).(proxycfg.CancelFunc) + r1 = ret.Get(1).(limiter.SessionTerminatedChan) } } - var r2 error - if rf, ok := ret.Get(2).(func(structs.ServiceID, string, string) error); ok { + var r2 proxycfg.CancelFunc + if rf, ok := ret.Get(2).(func(structs.ServiceID, string, string) proxycfg.CancelFunc); ok { r2 = rf(proxyID, nodeName, token) } else { - r2 = ret.Error(2) + if ret.Get(2) != nil { + r2 = ret.Get(2).(proxycfg.CancelFunc) + } } - return r0, r1, r2 + var r3 error + if rf, ok := ret.Get(3).(func(structs.ServiceID, string, string) error); ok { + r3 = rf(proxyID, nodeName, token) + } else { + r3 = ret.Error(3) + } + + return r0, r1, r2, r3 } -// NewMockWatcher creates a new instance of MockWatcher. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockWatcher(t testing.TB) *MockWatcher { +type mockConstructorTestingTNewMockWatcher interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockWatcher creates a new instance of MockWatcher. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockWatcher(t mockConstructorTestingTNewMockWatcher) *MockWatcher { mock := &MockWatcher{} mock.Mock.Test(t) diff --git a/agent/proxycfg-sources/local/config_source.go b/agent/proxycfg-sources/local/config_source.go index b23316d536..903ad7a6b1 100644 --- a/agent/proxycfg-sources/local/config_source.go +++ b/agent/proxycfg-sources/local/config_source.go @@ -1,6 +1,7 @@ package local import ( + "github.com/hashicorp/consul/agent/grpc-external/limiter" "github.com/hashicorp/consul/agent/proxycfg" structs "github.com/hashicorp/consul/agent/structs" ) @@ -16,7 +17,7 @@ func NewConfigSource(cfgMgr ConfigManager) *ConfigSource { return &ConfigSource{cfgMgr} } -func (m *ConfigSource) Watch(serviceID structs.ServiceID, nodeName string, _ string) (<-chan *proxycfg.ConfigSnapshot, proxycfg.CancelFunc, error) { +func (m *ConfigSource) Watch(serviceID structs.ServiceID, nodeName string, _ string) (<-chan *proxycfg.ConfigSnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) { watchCh, cancelWatch := m.manager.Watch(proxycfg.ProxyID{ ServiceID: serviceID, NodeName: nodeName, @@ -27,5 +28,5 @@ func (m *ConfigSource) Watch(serviceID structs.ServiceID, nodeName string, _ str // is checked before the watch is created). Token: "", }) - return watchCh, cancelWatch, nil + return watchCh, nil, cancelWatch, nil } diff --git a/agent/proxycfg-sources/local/mock_ConfigManager.go b/agent/proxycfg-sources/local/mock_ConfigManager.go index 0f77ce0651..8f2c8fc6c8 100644 --- a/agent/proxycfg-sources/local/mock_ConfigManager.go +++ b/agent/proxycfg-sources/local/mock_ConfigManager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.12.0. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package local @@ -7,8 +7,6 @@ import ( mock "github.com/stretchr/testify/mock" structs "github.com/hashicorp/consul/agent/structs" - - testing "testing" ) // MockConfigManager is an autogenerated mock type for the ConfigManager type @@ -76,8 +74,13 @@ func (_m *MockConfigManager) Watch(id proxycfg.ProxyID) (<-chan *proxycfg.Config return r0, r1 } -// NewMockConfigManager creates a new instance of MockConfigManager. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockConfigManager(t testing.TB) *MockConfigManager { +type mockConstructorTestingTNewMockConfigManager interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockConfigManager creates a new instance of MockConfigManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockConfigManager(t mockConstructorTestingTNewMockConfigManager) *MockConfigManager { mock := &MockConfigManager{} mock.Mock.Test(t) diff --git a/agent/submatview/mock_ACLResolver.go b/agent/submatview/mock_ACLResolver.go index 70ac4ac33f..7e1d810c64 100644 --- a/agent/submatview/mock_ACLResolver.go +++ b/agent/submatview/mock_ACLResolver.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.12.0. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package submatview @@ -7,8 +7,6 @@ import ( mock "github.com/stretchr/testify/mock" resolver "github.com/hashicorp/consul/acl/resolver" - - testing "testing" ) // MockACLResolver is an autogenerated mock type for the ACLResolver type @@ -37,8 +35,13 @@ func (_m *MockACLResolver) ResolveTokenAndDefaultMeta(token string, entMeta *acl return r0, r1 } -// NewMockACLResolver creates a new instance of MockACLResolver. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockACLResolver(t testing.TB) *MockACLResolver { +type mockConstructorTestingTNewMockACLResolver interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockACLResolver creates a new instance of MockACLResolver. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockACLResolver(t mockConstructorTestingTNewMockACLResolver) *MockACLResolver { mock := &MockACLResolver{} mock.Mock.Test(t) diff --git a/agent/xds/delta.go b/agent/xds/delta.go index 4c042eeb96..3594a7f7dd 100644 --- a/agent/xds/delta.go +++ b/agent/xds/delta.go @@ -3,6 +3,7 @@ package xds import ( "crypto/sha256" "encoding/hex" + "errors" "fmt" "sync" "sync/atomic" @@ -23,6 +24,7 @@ import ( "google.golang.org/grpc/status" external "github.com/hashicorp/consul/agent/grpc-external" + "github.com/hashicorp/consul/agent/grpc-external/limiter" "github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/xds/serverlessplugin" @@ -89,17 +91,12 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove return err } - session, err := s.SessionLimiter.BeginSession() - if err != nil { - return errOverwhelmed - } - defer session.End() - // Loop state var ( cfgSnap *proxycfg.ConfigSnapshot node *envoy_config_core_v3.Node stateCh <-chan *proxycfg.ConfigSnapshot + drainCh limiter.SessionTerminatedChan watchCancel func() proxyID structs.ServiceID nonce uint64 // xDS requires a unique nonce to correlate response/request pairs @@ -177,7 +174,7 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove for { select { - case <-session.Terminated(): + case <-drainCh: generator.Logger.Debug("draining stream to rebalance load") metrics.IncrCounter([]string{"xds", "server", "streamDrained"}, 1) return errOverwhelmed @@ -296,8 +293,11 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove return status.Errorf(codes.Internal, "failed to watch proxy service: %s", err) } - stateCh, watchCancel, err = s.CfgSrc.Watch(proxyID, nodeName, options.Token) - if err != nil { + stateCh, drainCh, watchCancel, err = s.CfgSrc.Watch(proxyID, nodeName, options.Token) + switch { + case errors.Is(err, limiter.ErrCapacityReached): + return errOverwhelmed + case err != nil: return status.Errorf(codes.Internal, "failed to watch proxy service: %s", err) } // Note that in this case we _intend_ the defer to only be triggered when diff --git a/agent/xds/delta_test.go b/agent/xds/delta_test.go index 7e57e3dbad..3de5e128e8 100644 --- a/agent/xds/delta_test.go +++ b/agent/xds/delta_test.go @@ -32,12 +32,11 @@ import ( func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) { for _, serverlessPluginEnabled := range []bool{false, true} { t.Run(fmt.Sprintf("serverless patcher: %t", serverlessPluginEnabled), func(t *testing.T) { - aclResolve := func(id string) (acl.Authorizer, error) { // Allow all return acl.RootAuthorizer("manage"), nil } - scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, serverlessPluginEnabled, nil) + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, serverlessPluginEnabled) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy sid := structs.NewServiceID("web-sidecar-proxy", nil) @@ -238,7 +237,7 @@ func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) { // Allow all return acl.RootAuthorizer("manage"), nil } - scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, nil) + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy sid := structs.NewServiceID("web-sidecar-proxy", nil) @@ -370,7 +369,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) { // Allow all return acl.RootAuthorizer("manage"), nil } - scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, nil) + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy sid := structs.NewServiceID("web-sidecar-proxy", nil) @@ -522,7 +521,7 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T) // Allow all return acl.RootAuthorizer("manage"), nil } - scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, nil) + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false) server, mgr, errCh, envoy := scenario.server, scenario.mgr, scenario.errCh, scenario.envoy // This mutateFn causes any endpoint with a name containing "geo-cache" to be @@ -667,7 +666,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa // Allow all return acl.RootAuthorizer("manage"), nil } - scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, nil) + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy sid := structs.NewServiceID("web-sidecar-proxy", nil) @@ -804,7 +803,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan // Allow all return acl.RootAuthorizer("manage"), nil } - scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, nil) + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy sid := structs.NewServiceID("web-sidecar-proxy", nil) @@ -1062,7 +1061,7 @@ func TestServer_DeltaAggregatedResources_v3_ACLEnforcement(t *testing.T) { return acl.NewPolicyAuthorizerWithDefaults(acl.RootAuthorizer("deny"), []*acl.Policy{policy}, nil) } - scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", tt.token, 0, false, nil) + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", tt.token, 0, false) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy sid := structs.NewServiceID("web-sidecar-proxy", nil) @@ -1140,7 +1139,6 @@ func TestServer_DeltaAggregatedResources_v3_ACLTokenDeleted_StreamTerminatedDuri scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", token, 100*time.Millisecond, // Make this short. false, - nil, ) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy @@ -1240,7 +1238,6 @@ func TestServer_DeltaAggregatedResources_v3_ACLTokenDeleted_StreamTerminatedInBa scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", token, 100*time.Millisecond, // Make this short. false, - nil, ) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy @@ -1321,7 +1318,7 @@ func TestServer_DeltaAggregatedResources_v3_IngressEmptyResponse(t *testing.T) { // Allow all return acl.RootAuthorizer("manage"), nil } - scenario := newTestServerDeltaScenario(t, aclResolve, "ingress-gateway", "", 0, false, nil) + scenario := newTestServerDeltaScenario(t, aclResolve, "ingress-gateway", "", 0, false) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy sid := structs.NewServiceID("ingress-gateway", nil) @@ -1376,12 +1373,13 @@ func TestServer_DeltaAggregatedResources_v3_IngressEmptyResponse(t *testing.T) { func TestServer_DeltaAggregatedResources_v3_CapacityReached(t *testing.T) { aclResolve := func(id string) (acl.Authorizer, error) { return acl.ManageAll(), nil } - scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, capacityReachedLimiter{}) + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy sid := structs.NewServiceID("web-sidecar-proxy", nil) mgr.RegisterProxy(t, sid) + mgr.DrainStreams(sid) snap := newTestSnapshot(t, nil, "") @@ -1407,10 +1405,8 @@ func (capacityReachedLimiter) BeginSession() (limiter.Session, error) { } func TestServer_DeltaAggregatedResources_v3_StreamDrained(t *testing.T) { - limiter := &testLimiter{} - aclResolve := func(id string) (acl.Authorizer, error) { return acl.ManageAll(), nil } - scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, limiter) + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy sid := structs.NewServiceID("web-sidecar-proxy", nil) @@ -1439,7 +1435,7 @@ func TestServer_DeltaAggregatedResources_v3_StreamDrained(t *testing.T) { }) testutil.RunStep(t, "terminate limiter session", func(t *testing.T) { - limiter.TerminateSession() + mgr.DrainStreams(sid) select { case err := <-errCh: @@ -1476,25 +1472,6 @@ func TestServer_DeltaAggregatedResources_v3_StreamDrained(t *testing.T) { } -type testLimiter struct { - termCh chan struct{} -} - -func (t *testLimiter) BeginSession() (limiter.Session, error) { - t.termCh = make(chan struct{}) - return &testSession{termCh: t.termCh}, nil -} - -func (t *testLimiter) TerminateSession() { close(t.termCh) } - -type testSession struct { - termCh chan struct{} -} - -func (t *testSession) Terminated() <-chan struct{} { return t.termCh } - -func (*testSession) End() {} - func assertDeltaChanBlocked(t *testing.T, ch chan *envoy_discovery_v3.DeltaDiscoveryResponse) { t.Helper() select { diff --git a/agent/xds/server.go b/agent/xds/server.go index 828a4e2021..ee6db90460 100644 --- a/agent/xds/server.go +++ b/agent/xds/server.go @@ -109,13 +109,7 @@ type ConfigFetcher interface { // ProxyConfigSource is the interface xds.Server requires to consume proxy // config updates. type ProxyConfigSource interface { - Watch(id structs.ServiceID, nodeName string, token string) (<-chan *proxycfg.ConfigSnapshot, proxycfg.CancelFunc, error) -} - -// SessionLimiter is the interface exposed by limiter.SessionLimiter. We depend -// on an interface rather than the concrete type so we can mock it in tests. -type SessionLimiter interface { - BeginSession() (limiter.Session, error) + Watch(id structs.ServiceID, nodeName string, token string) (<-chan *proxycfg.ConfigSnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) } // Server represents a gRPC server that can handle xDS requests from Envoy. All @@ -124,12 +118,11 @@ type SessionLimiter interface { // A full description of the XDS protocol can be found at // https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol type Server struct { - NodeName string - Logger hclog.Logger - CfgSrc ProxyConfigSource - ResolveToken ACLResolverFunc - CfgFetcher ConfigFetcher - SessionLimiter SessionLimiter + NodeName string + Logger hclog.Logger + CfgSrc ProxyConfigSource + ResolveToken ACLResolverFunc + CfgFetcher ConfigFetcher // AuthCheckFrequency is how often we should re-check the credentials used // during a long-lived gRPC Stream after it has been initially established. @@ -181,7 +174,6 @@ func NewServer( cfgMgr ProxyConfigSource, resolveToken ACLResolverFunc, cfgFetcher ConfigFetcher, - limiter SessionLimiter, ) *Server { return &Server{ NodeName: nodeName, @@ -189,7 +181,6 @@ func NewServer( CfgSrc: cfgMgr, ResolveToken: resolveToken, CfgFetcher: cfgFetcher, - SessionLimiter: limiter, AuthCheckFrequency: DefaultAuthCheckFrequency, activeStreams: &activeStreamCounters{}, serverlessPluginEnabled: serverlessPluginEnabled, diff --git a/agent/xds/xds_protocol_helpers_test.go b/agent/xds/xds_protocol_helpers_test.go index 7cb413defb..8b04f30a0c 100644 --- a/agent/xds/xds_protocol_helpers_test.go +++ b/agent/xds/xds_protocol_helpers_test.go @@ -67,14 +67,16 @@ func newTestSnapshot( // testing. It also implements ConnectAuthz to allow control over authorization. type testManager struct { sync.Mutex - chans map[structs.ServiceID]chan *proxycfg.ConfigSnapshot - cancels chan structs.ServiceID + stateChans map[structs.ServiceID]chan *proxycfg.ConfigSnapshot + drainChans map[structs.ServiceID]chan struct{} + cancels chan structs.ServiceID } func newTestManager(t *testing.T) *testManager { return &testManager{ - chans: map[structs.ServiceID]chan *proxycfg.ConfigSnapshot{}, - cancels: make(chan structs.ServiceID, 10), + stateChans: map[structs.ServiceID]chan *proxycfg.ConfigSnapshot{}, + drainChans: map[structs.ServiceID]chan struct{}{}, + cancels: make(chan structs.ServiceID, 10), } } @@ -82,7 +84,8 @@ func newTestManager(t *testing.T) *testManager { func (m *testManager) RegisterProxy(t *testing.T, proxyID structs.ServiceID) { m.Lock() defer m.Unlock() - m.chans[proxyID] = make(chan *proxycfg.ConfigSnapshot, 1) + m.stateChans[proxyID] = make(chan *proxycfg.ConfigSnapshot, 1) + m.drainChans[proxyID] = make(chan struct{}) } // Deliver simulates a proxy registration @@ -91,18 +94,42 @@ func (m *testManager) DeliverConfig(t *testing.T, proxyID structs.ServiceID, cfg m.Lock() defer m.Unlock() select { - case m.chans[proxyID] <- cfg: + case m.stateChans[proxyID] <- cfg: case <-time.After(10 * time.Millisecond): t.Fatalf("took too long to deliver config") } } -// Watch implements ConfigManager -func (m *testManager) Watch(proxyID structs.ServiceID, _ string, _ string) (<-chan *proxycfg.ConfigSnapshot, proxycfg.CancelFunc, error) { +// DrainStreams drains any open streams for the given proxyID. If there aren't +// any open streams, it'll create a marker so that future attempts to watch the +// given proxyID will return limiter.ErrCapacityReached. +func (m *testManager) DrainStreams(proxyID structs.ServiceID) { m.Lock() defer m.Unlock() + + ch, ok := m.drainChans[proxyID] + if !ok { + ch = make(chan struct{}) + m.drainChans[proxyID] = ch + } + close(ch) +} + +// Watch implements ConfigManager +func (m *testManager) Watch(proxyID structs.ServiceID, _ string, _ string) (<-chan *proxycfg.ConfigSnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) { + m.Lock() + defer m.Unlock() + + // If the drain chan has already been closed, return limiter.ErrCapacityReached. + drainCh := m.drainChans[proxyID] + select { + case <-drainCh: + return nil, nil, nil, limiter.ErrCapacityReached + default: + } + // ch might be nil but then it will just block forever - return m.chans[proxyID], func() { + return m.stateChans[proxyID], drainCh, func() { m.cancels <- proxyID }, nil } @@ -137,7 +164,6 @@ func newTestServerDeltaScenario( token string, authCheckFrequency time.Duration, serverlessPluginEnabled bool, - sessionLimiter SessionLimiter, ) *testServerScenario { mgr := newTestManager(t) envoy := NewTestEnvoy(t, proxyID, token) @@ -156,10 +182,6 @@ func newTestServerDeltaScenario( metrics.NewGlobal(cfg, sink) }) - if sessionLimiter == nil { - sessionLimiter = limiter.NewSessionLimiter() - } - s := NewServer( "node-123", testutil.Logger(t), @@ -167,7 +189,6 @@ func newTestServerDeltaScenario( mgr, resolveToken, nil, /*cfgFetcher ConfigFetcher*/ - sessionLimiter, ) if authCheckFrequency > 0 { s.AuthCheckFrequency = authCheckFrequency diff --git a/proto-public/pbdns/mock_DNSServiceClient.go b/proto-public/pbdns/mock_DNSServiceClient.go index a11f1e963e..24906ab854 100644 --- a/proto-public/pbdns/mock_DNSServiceClient.go +++ b/proto-public/pbdns/mock_DNSServiceClient.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.12.2. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package pbdns @@ -8,8 +8,6 @@ import ( grpc "google.golang.org/grpc" mock "github.com/stretchr/testify/mock" - - testing "testing" ) // MockDNSServiceClient is an autogenerated mock type for the DNSServiceClient type @@ -47,8 +45,13 @@ func (_m *MockDNSServiceClient) Query(ctx context.Context, in *QueryRequest, opt return r0, r1 } -// NewMockDNSServiceClient creates a new instance of MockDNSServiceClient. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockDNSServiceClient(t testing.TB) *MockDNSServiceClient { +type mockConstructorTestingTNewMockDNSServiceClient interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockDNSServiceClient creates a new instance of MockDNSServiceClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockDNSServiceClient(t mockConstructorTestingTNewMockDNSServiceClient) *MockDNSServiceClient { mock := &MockDNSServiceClient{} mock.Mock.Test(t) diff --git a/proto-public/pbdns/mock_DNSServiceServer.go b/proto-public/pbdns/mock_DNSServiceServer.go index 97b98dddbd..e9bd338daf 100644 --- a/proto-public/pbdns/mock_DNSServiceServer.go +++ b/proto-public/pbdns/mock_DNSServiceServer.go @@ -1,10 +1,9 @@ -// Code generated by mockery v2.12.2. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package pbdns import ( context "context" - testing "testing" mock "github.com/stretchr/testify/mock" ) @@ -37,8 +36,13 @@ func (_m *MockDNSServiceServer) Query(_a0 context.Context, _a1 *QueryRequest) (* return r0, r1 } -// NewMockDNSServiceServer creates a new instance of MockDNSServiceServer. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockDNSServiceServer(t testing.TB) *MockDNSServiceServer { +type mockConstructorTestingTNewMockDNSServiceServer interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockDNSServiceServer creates a new instance of MockDNSServiceServer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockDNSServiceServer(t mockConstructorTestingTNewMockDNSServiceServer) *MockDNSServiceServer { mock := &MockDNSServiceServer{} mock.Mock.Test(t) diff --git a/proto-public/pbdns/mock_UnsafeDNSServiceServer.go b/proto-public/pbdns/mock_UnsafeDNSServiceServer.go index a56e55bcb6..0a6c47c2cb 100644 --- a/proto-public/pbdns/mock_UnsafeDNSServiceServer.go +++ b/proto-public/pbdns/mock_UnsafeDNSServiceServer.go @@ -1,12 +1,8 @@ -// Code generated by mockery v2.12.2. DO NOT EDIT. +// Code generated by mockery v2.15.0. DO NOT EDIT. package pbdns -import ( - testing "testing" - - mock "github.com/stretchr/testify/mock" -) +import mock "github.com/stretchr/testify/mock" // MockUnsafeDNSServiceServer is an autogenerated mock type for the UnsafeDNSServiceServer type type MockUnsafeDNSServiceServer struct { @@ -18,8 +14,13 @@ func (_m *MockUnsafeDNSServiceServer) mustEmbedUnimplementedDNSServiceServer() { _m.Called() } -// NewMockUnsafeDNSServiceServer creates a new instance of MockUnsafeDNSServiceServer. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. -func NewMockUnsafeDNSServiceServer(t testing.TB) *MockUnsafeDNSServiceServer { +type mockConstructorTestingTNewMockUnsafeDNSServiceServer interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockUnsafeDNSServiceServer creates a new instance of MockUnsafeDNSServiceServer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockUnsafeDNSServiceServer(t mockConstructorTestingTNewMockUnsafeDNSServiceServer) *MockUnsafeDNSServiceServer { mock := &MockUnsafeDNSServiceServer{} mock.Mock.Test(t)