xds: don't attempt to load-balance sessions for local proxies (#15789) (#16004)

Previously, we'd begin a session with the xDS concurrency limiter
regardless of whether the proxy was registered in the catalog or in
the server's local agent state.

This caused problems for users who run `consul connect envoy` directly
against a server rather than a client agent, as the server's locally
registered proxies wouldn't be included in the limiter's capacity.

Now, the `ConfigSource` is responsible for beginning the session and we
only do so for services in the catalog.

Fixes: https://github.com/hashicorp/consul/issues/15753

Co-authored-by: Dan Upton <daniel@floppy.co>
pull/16012/head
hc-github-team-consul-core 2023-01-19 05:34:01 -05:00 committed by GitHub
parent aa3f89496d
commit 3e3ab25f9b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 473 additions and 251 deletions

3
.changelog/15789.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
xds: fix bug where sessions for locally-managed services could fail with "this server has too many xDS streams open"
```

View File

@ -8,7 +8,7 @@ SHELL = bash
# or the string @DEV to imply use what is currently installed locally.
###
GOLANGCI_LINT_VERSION='v1.50.1'
MOCKERY_VERSION='v2.12.2'
MOCKERY_VERSION='v2.15.0'
BUF_VERSION='v1.4.0'
PROTOC_GEN_GO_GRPC_VERSION="v1.2.0"
MOG_VERSION='v0.3.0'

View File

@ -835,6 +835,7 @@ func (a *Agent) listenAndServeGRPC() error {
Manager: a.proxyConfig,
GetStore: func() catalogproxycfg.Store { return server.FSM().State() },
Logger: a.proxyConfig.Logger.Named("server-catalog"),
SessionLimiter: a.baseDeps.XDSStreamLimiter,
})
go func() {
<-a.shutdownCh
@ -851,7 +852,6 @@ func (a *Agent) listenAndServeGRPC() error {
return a.delegate.ResolveTokenAndDefaultMeta(id, nil, nil)
},
a,
a.baseDeps.XDSStreamLimiter,
)
a.xdsServer.Register(a.externalGRPCServer)

View File

@ -1,12 +1,8 @@
// Code generated by mockery v2.12.2. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package cache
import (
testing "testing"
mock "github.com/stretchr/testify/mock"
)
import mock "github.com/stretchr/testify/mock"
// MockRequest is an autogenerated mock type for the Request type
type MockRequest struct {
@ -27,8 +23,13 @@ func (_m *MockRequest) CacheInfo() RequestInfo {
return r0
}
// NewMockRequest creates a new instance of MockRequest. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockRequest(t testing.TB) *MockRequest {
type mockConstructorTestingTNewMockRequest interface {
mock.TestingT
Cleanup(func())
}
// NewMockRequest creates a new instance of MockRequest. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockRequest(t mockConstructorTestingTNewMockRequest) *MockRequest {
mock := &MockRequest{}
mock.Mock.Test(t)

View File

@ -1,12 +1,8 @@
// Code generated by mockery v2.12.2. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package cache
import (
testing "testing"
mock "github.com/stretchr/testify/mock"
)
import mock "github.com/stretchr/testify/mock"
// MockType is an autogenerated mock type for the Type type
type MockType struct {
@ -48,8 +44,13 @@ func (_m *MockType) RegisterOptions() RegisterOptions {
return r0
}
// NewMockType creates a new instance of MockType. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockType(t testing.TB) *MockType {
type mockConstructorTestingTNewMockType interface {
mock.TestingT
Cleanup(func())
}
// NewMockType creates a new instance of MockType. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockType(t mockConstructorTestingTNewMockType) *MockType {
mock := &MockType{}
mock.Mock.Test(t)

View File

@ -1,13 +1,11 @@
// Code generated by mockery v2.11.0. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package ca
import (
testing "testing"
x509 "crypto/x509"
mock "github.com/stretchr/testify/mock"
x509 "crypto/x509"
)
// MockProvider is an autogenerated mock type for the Provider type
@ -155,13 +153,13 @@ func (_m *MockProvider) GenerateRoot() (RootResult, error) {
return r0, r1
}
// SetIntermediate provides a mock function with given fields: intermediatePEM, rootPEM
func (_m *MockProvider) SetIntermediate(intermediatePEM string, rootPEM string, keyId string) error {
ret := _m.Called(intermediatePEM, rootPEM, keyId)
// SetIntermediate provides a mock function with given fields: intermediatePEM, rootPEM, opaque
func (_m *MockProvider) SetIntermediate(intermediatePEM string, rootPEM string, opaque string) error {
ret := _m.Called(intermediatePEM, rootPEM, opaque)
var r0 error
if rf, ok := ret.Get(0).(func(string, string, string) error); ok {
r0 = rf(intermediatePEM, rootPEM, keyId)
r0 = rf(intermediatePEM, rootPEM, opaque)
} else {
r0 = ret.Error(0)
}
@ -255,9 +253,15 @@ func (_m *MockProvider) SupportsCrossSigning() (bool, error) {
return r0, r1
}
// NewMockProvider creates a new instance of MockProvider. It also registers a cleanup function to assert the mocks expectations.
func NewMockProvider(t testing.TB) *MockProvider {
type mockConstructorTestingTNewMockProvider interface {
mock.TestingT
Cleanup(func())
}
// NewMockProvider creates a new instance of MockProvider. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockProvider(t mockConstructorTestingTNewMockProvider) *MockProvider {
mock := &MockProvider{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })

View File

@ -1,12 +1,8 @@
// Code generated by mockery v2.12.0. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package auth
import (
testing "testing"
mock "github.com/stretchr/testify/mock"
)
import mock "github.com/stretchr/testify/mock"
// MockACLCache is an autogenerated mock type for the ACLCache type
type MockACLCache struct {
@ -18,8 +14,13 @@ func (_m *MockACLCache) RemoveIdentityWithSecretToken(secretToken string) {
_m.Called(secretToken)
}
// NewMockACLCache creates a new instance of MockACLCache. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockACLCache(t testing.TB) *MockACLCache {
type mockConstructorTestingTNewMockACLCache interface {
mock.TestingT
Cleanup(func())
}
// NewMockACLCache creates a new instance of MockACLCache. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockACLCache(t mockConstructorTestingTNewMockACLCache) *MockACLCache {
mock := &MockACLCache{}
mock.Mock.Test(t)

View File

@ -1,10 +1,8 @@
// Code generated by mockery v2.12.2. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package autopilotevents
import (
testing "testing"
stream "github.com/hashicorp/consul/agent/consul/stream"
mock "github.com/stretchr/testify/mock"
)
@ -19,8 +17,13 @@ func (_m *MockPublisher) Publish(_a0 []stream.Event) {
_m.Called(_a0)
}
// NewMockPublisher creates a new instance of MockPublisher. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockPublisher(t testing.TB) *MockPublisher {
type mockConstructorTestingTNewMockPublisher interface {
mock.TestingT
Cleanup(func())
}
// NewMockPublisher creates a new instance of MockPublisher. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockPublisher(t mockConstructorTestingTNewMockPublisher) *MockPublisher {
mock := &MockPublisher{}
mock.Mock.Test(t)

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.12.2. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package autopilotevents
@ -10,8 +10,6 @@ import (
structs "github.com/hashicorp/consul/agent/structs"
testing "testing"
types "github.com/hashicorp/consul/types"
)
@ -80,8 +78,13 @@ func (_m *MockStateStore) NodeService(ws memdb.WatchSet, nodeName string, servic
return r0, r1, r2
}
// NewMockStateStore creates a new instance of MockStateStore. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockStateStore(t testing.TB) *MockStateStore {
type mockConstructorTestingTNewMockStateStore interface {
mock.TestingT
Cleanup(func())
}
// NewMockStateStore creates a new instance of MockStateStore. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockStateStore(t mockConstructorTestingTNewMockStateStore) *MockStateStore {
mock := &MockStateStore{}
mock.Mock.Test(t)

View File

@ -1,13 +1,11 @@
// Code generated by mockery v2.12.2. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package autopilotevents
import (
testing "testing"
time "time"
mock "github.com/stretchr/testify/mock"
time "time"
)
// mockTimeProvider is an autogenerated mock type for the timeProvider type
@ -29,8 +27,13 @@ func (_m *mockTimeProvider) Now() time.Time {
return r0
}
// newMockTimeProvider creates a new instance of mockTimeProvider. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func newMockTimeProvider(t testing.TB) *mockTimeProvider {
type mockConstructorTestingTnewMockTimeProvider interface {
mock.TestingT
Cleanup(func())
}
// newMockTimeProvider creates a new instance of mockTimeProvider. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func newMockTimeProvider(t mockConstructorTestingTnewMockTimeProvider) *mockTimeProvider {
mock := &mockTimeProvider{}
mock.Mock.Test(t)

View File

@ -1,12 +1,8 @@
// Code generated by mockery v2.12.2. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package watch
import (
testing "testing"
mock "github.com/stretchr/testify/mock"
)
import mock "github.com/stretchr/testify/mock"
// MockStateStore is an autogenerated mock type for the StateStore type
type MockStateStore struct {
@ -29,8 +25,13 @@ func (_m *MockStateStore) AbandonCh() <-chan struct{} {
return r0
}
// NewMockStateStore creates a new instance of MockStateStore. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockStateStore(t testing.TB) *MockStateStore {
type mockConstructorTestingTNewMockStateStore interface {
mock.TestingT
Cleanup(func())
}
// NewMockStateStore creates a new instance of MockStateStore. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockStateStore(t mockConstructorTestingTNewMockStateStore) *MockStateStore {
mock := &MockStateStore{}
mock.Mock.Test(t)

View File

@ -213,6 +213,10 @@ func (l *SessionLimiter) deleteSessionWithID(id uint64) {
l.deleteSessionLocked(idx, id)
}
// SessionTerminatedChan is a channel that will be closed to notify session-
// holders that a session has been terminated.
type SessionTerminatedChan <-chan struct{}
// Session allows its holder to perform an operation (e.g. serve a gRPC stream)
// concurrenly with other session-holders. Sessions may be terminated abruptly
// by the SessionLimiter, so it is the responsibility of the holder to receive
@ -228,7 +232,7 @@ type Session interface {
//
// The session-holder MUST receive on it and exit (e.g. close the gRPC stream)
// when it is closed.
Terminated() <-chan struct{}
Terminated() SessionTerminatedChan
}
type session struct {
@ -240,6 +244,6 @@ type session struct {
func (s *session) End() { s.l.deleteSessionWithID(s.id) }
func (s *session) Terminated() <-chan struct{} { return s.termCh }
func (s *session) Terminated() SessionTerminatedChan { return s.termCh }
func (s *session) terminate() { close(s.termCh) }

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.12.0. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package acl
@ -7,8 +7,6 @@ import (
mock "github.com/stretchr/testify/mock"
structs "github.com/hashicorp/consul/agent/structs"
testing "testing"
)
// MockLogin is an autogenerated mock type for the Login type
@ -39,8 +37,13 @@ func (_m *MockLogin) TokenForVerifiedIdentity(identity *authmethod.Identity, aut
return r0, r1
}
// NewMockLogin creates a new instance of MockLogin. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockLogin(t testing.TB) *MockLogin {
type mockConstructorTestingTNewMockLogin interface {
mock.TestingT
Cleanup(func())
}
// NewMockLogin creates a new instance of MockLogin. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockLogin(t mockConstructorTestingTNewMockLogin) *MockLogin {
mock := &MockLogin{}
mock.Mock.Test(t)

View File

@ -1,12 +1,8 @@
// Code generated by mockery v2.12.0. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package acl
import (
testing "testing"
mock "github.com/stretchr/testify/mock"
)
import mock "github.com/stretchr/testify/mock"
// MockTokenWriter is an autogenerated mock type for the TokenWriter type
type MockTokenWriter struct {
@ -27,8 +23,13 @@ func (_m *MockTokenWriter) Delete(secretID string, fromLogout bool) error {
return r0
}
// NewMockTokenWriter creates a new instance of MockTokenWriter. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockTokenWriter(t testing.TB) *MockTokenWriter {
type mockConstructorTestingTNewMockTokenWriter interface {
mock.TestingT
Cleanup(func())
}
// NewMockTokenWriter creates a new instance of MockTokenWriter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockTokenWriter(t mockConstructorTestingTNewMockTokenWriter) *MockTokenWriter {
mock := &MockTokenWriter{}
mock.Mock.Test(t)

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.12.0. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package acl
@ -8,8 +8,6 @@ import (
authmethod "github.com/hashicorp/consul/agent/consul/authmethod"
mock "github.com/stretchr/testify/mock"
testing "testing"
)
// MockValidator is an autogenerated mock type for the Validator type
@ -40,8 +38,13 @@ func (_m *MockValidator) ValidateLogin(ctx context.Context, loginToken string) (
return r0, r1
}
// NewMockValidator creates a new instance of MockValidator. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockValidator(t testing.TB) *MockValidator {
type mockConstructorTestingTNewMockValidator interface {
mock.TestingT
Cleanup(func())
}
// NewMockValidator creates a new instance of MockValidator. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockValidator(t mockConstructorTestingTNewMockValidator) *MockValidator {
mock := &MockValidator{}
mock.Mock.Test(t)

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.12.0. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package connectca
@ -7,8 +7,6 @@ import (
mock "github.com/stretchr/testify/mock"
resolver "github.com/hashicorp/consul/acl/resolver"
testing "testing"
)
// MockACLResolver is an autogenerated mock type for the ACLResolver type
@ -37,8 +35,13 @@ func (_m *MockACLResolver) ResolveTokenAndDefaultMeta(token string, entMeta *acl
return r0, r1
}
// NewMockACLResolver creates a new instance of MockACLResolver. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockACLResolver(t testing.TB) *MockACLResolver {
type mockConstructorTestingTNewMockACLResolver interface {
mock.TestingT
Cleanup(func())
}
// NewMockACLResolver creates a new instance of MockACLResolver. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockACLResolver(t mockConstructorTestingTNewMockACLResolver) *MockACLResolver {
mock := &MockACLResolver{}
mock.Mock.Test(t)

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.12.0. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package connectca
@ -8,8 +8,6 @@ import (
structs "github.com/hashicorp/consul/agent/structs"
testing "testing"
x509 "crypto/x509"
)
@ -41,8 +39,13 @@ func (_m *MockCAManager) AuthorizeAndSignCertificate(csr *x509.CertificateReques
return r0, r1
}
// NewMockCAManager creates a new instance of MockCAManager. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockCAManager(t testing.TB) *MockCAManager {
type mockConstructorTestingTNewMockCAManager interface {
mock.TestingT
Cleanup(func())
}
// NewMockCAManager creates a new instance of MockCAManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockCAManager(t mockConstructorTestingTNewMockCAManager) *MockCAManager {
mock := &MockCAManager{}
mock.Mock.Test(t)

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.12.0. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package dataplane
@ -7,8 +7,6 @@ import (
mock "github.com/stretchr/testify/mock"
resolver "github.com/hashicorp/consul/acl/resolver"
testing "testing"
)
// MockACLResolver is an autogenerated mock type for the ACLResolver type
@ -37,8 +35,13 @@ func (_m *MockACLResolver) ResolveTokenAndDefaultMeta(_a0 string, _a1 *acl.Enter
return r0, r1
}
// NewMockACLResolver creates a new instance of MockACLResolver. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockACLResolver(t testing.TB) *MockACLResolver {
type mockConstructorTestingTNewMockACLResolver interface {
mock.TestingT
Cleanup(func())
}
// NewMockACLResolver creates a new instance of MockACLResolver. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockACLResolver(t mockConstructorTestingTNewMockACLResolver) *MockACLResolver {
mock := &MockACLResolver{}
mock.Mock.Test(t)

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.12.2. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package peerstream
@ -7,8 +7,6 @@ import (
mock "github.com/stretchr/testify/mock"
resolver "github.com/hashicorp/consul/acl/resolver"
testing "testing"
)
// MockACLResolver is an autogenerated mock type for the ACLResolver type
@ -37,8 +35,13 @@ func (_m *MockACLResolver) ResolveTokenAndDefaultMeta(_a0 string, _a1 *acl.Enter
return r0, r1
}
// NewMockACLResolver creates a new instance of MockACLResolver. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockACLResolver(t testing.TB) *MockACLResolver {
type mockConstructorTestingTNewMockACLResolver interface {
mock.TestingT
Cleanup(func())
}
// NewMockACLResolver creates a new instance of MockACLResolver. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockACLResolver(t mockConstructorTestingTNewMockACLResolver) *MockACLResolver {
mock := &MockACLResolver{}
mock.Mock.Test(t)

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.12.0. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package serverdiscovery
@ -7,8 +7,6 @@ import (
mock "github.com/stretchr/testify/mock"
resolver "github.com/hashicorp/consul/acl/resolver"
testing "testing"
)
// MockACLResolver is an autogenerated mock type for the ACLResolver type
@ -37,8 +35,13 @@ func (_m *MockACLResolver) ResolveTokenAndDefaultMeta(_a0 string, _a1 *acl.Enter
return r0, r1
}
// NewMockACLResolver creates a new instance of MockACLResolver. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockACLResolver(t testing.TB) *MockACLResolver {
type mockConstructorTestingTNewMockACLResolver interface {
mock.TestingT
Cleanup(func())
}
// NewMockACLResolver creates a new instance of MockACLResolver. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockACLResolver(t mockConstructorTestingTNewMockACLResolver) *MockACLResolver {
mock := &MockACLResolver{}
mock.Mock.Test(t)

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.13.1. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package hcp

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.14.0. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package scada

View File

@ -10,6 +10,7 @@ import (
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
@ -44,13 +45,24 @@ func NewConfigSource(cfg Config) *ConfigSource {
// Watch wraps the underlying proxycfg.Manager and dynamically registers
// services from the catalog with it when requested by the xDS server.
func (m *ConfigSource) Watch(serviceID structs.ServiceID, nodeName string, token string) (<-chan *proxycfg.ConfigSnapshot, proxycfg.CancelFunc, error) {
func (m *ConfigSource) Watch(serviceID structs.ServiceID, nodeName string, token string) (<-chan *proxycfg.ConfigSnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) {
// If the service is registered to the local agent, use the LocalConfigSource
// rather than trying to configure it from the catalog.
if nodeName == m.NodeName && m.LocalState.ServiceExists(serviceID) {
return m.LocalConfigSource.Watch(serviceID, nodeName, token)
}
// Begin a session with the xDS session concurrency limiter.
//
// We do this here rather than in the xDS server because we don't want to apply
// the limit to services from the LocalConfigSource.
//
// See: https://github.com/hashicorp/consul/issues/15753
session, err := m.SessionLimiter.BeginSession()
if err != nil {
return nil, nil, nil, err
}
proxyID := proxycfg.ProxyID{
ServiceID: serviceID,
NodeName: nodeName,
@ -66,6 +78,7 @@ func (m *ConfigSource) Watch(serviceID structs.ServiceID, nodeName string, token
cancelOnce.Do(func() {
cancelWatch()
m.cleanup(proxyID)
session.End()
})
}
@ -82,11 +95,12 @@ func (m *ConfigSource) Watch(serviceID structs.ServiceID, nodeName string, token
if err := m.startSync(w.closeCh, proxyID); err != nil {
delete(m.watches, proxyID)
cancelWatch()
return nil, nil, err
session.End()
return nil, nil, nil, err
}
}
return snapCh, cancel, nil
return snapCh, session.Terminated(), cancel, nil
}
func (m *ConfigSource) Shutdown() {
@ -261,6 +275,9 @@ type Config struct {
// Logger will be used to write log messages.
Logger hclog.Logger
// SessionLimiter is used to enforce xDS concurrency limits.
SessionLimiter SessionLimiter
}
//go:generate mockery --name ConfigManager --inpackage
@ -278,5 +295,10 @@ type Store interface {
//go:generate mockery --name Watcher --inpackage
type Watcher interface {
Watch(proxyID structs.ServiceID, nodeName string, token string) (<-chan *proxycfg.ConfigSnapshot, proxycfg.CancelFunc, error)
Watch(proxyID structs.ServiceID, nodeName string, token string) (<-chan *proxycfg.ConfigSnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error)
}
//go:generate mockery --name SessionLimiter --inpackage
type SessionLimiter interface {
BeginSession() (limiter.Session, error)
}

View File

@ -11,6 +11,7 @@ import (
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
@ -47,16 +48,33 @@ func TestConfigSource_Success(t *testing.T) {
// behavior without sleeping which leads to slow/racy tests.
cfgMgr := testConfigManager(t, serviceID, nodeName, token)
lim := NewMockSessionLimiter(t)
session1 := newMockSession(t)
session1TermCh := make(limiter.SessionTerminatedChan)
session1.On("Terminated").Return(session1TermCh)
session1.On("End").Return()
session2 := newMockSession(t)
session2TermCh := make(limiter.SessionTerminatedChan)
session2.On("Terminated").Return(session2TermCh)
session2.On("End").Return()
lim.On("BeginSession").Return(session1, nil).Once()
lim.On("BeginSession").Return(session2, nil).Once()
mgr := NewConfigSource(Config{
Manager: cfgMgr,
LocalState: testLocalState(t),
Logger: hclog.NewNullLogger(),
GetStore: func() Store { return store },
Manager: cfgMgr,
LocalState: testLocalState(t),
Logger: hclog.NewNullLogger(),
GetStore: func() Store { return store },
SessionLimiter: lim,
})
t.Cleanup(mgr.Shutdown)
snapCh, cancelWatch1, err := mgr.Watch(serviceID, nodeName, token)
snapCh, termCh, cancelWatch1, err := mgr.Watch(serviceID, nodeName, token)
require.NoError(t, err)
require.Equal(t, session1TermCh, termCh)
// Expect Register to have been called with the proxy's inital port.
select {
@ -111,8 +129,9 @@ func TestConfigSource_Success(t *testing.T) {
}
// Start another watch.
_, cancelWatch2, err := mgr.Watch(serviceID, nodeName, token)
_, termCh2, cancelWatch2, err := mgr.Watch(serviceID, nodeName, token)
require.NoError(t, err)
require.Equal(t, session2TermCh, termCh2)
// Expect the service to have not been re-registered by the second watch.
select {
@ -137,6 +156,9 @@ func TestConfigSource_Success(t *testing.T) {
case <-time.After(100 * time.Millisecond):
t.Fatal("timeout waiting for service to be de-registered")
}
session1.AssertCalled(t, "End")
session2.AssertCalled(t, "End")
}
func TestConfigSource_LocallyManagedService(t *testing.T) {
@ -149,7 +171,7 @@ func TestConfigSource_LocallyManagedService(t *testing.T) {
localWatcher := NewMockWatcher(t)
localWatcher.On("Watch", serviceID, nodeName, token).
Return(make(<-chan *proxycfg.ConfigSnapshot), proxycfg.CancelFunc(func() {}), nil)
Return(make(<-chan *proxycfg.ConfigSnapshot), nil, proxycfg.CancelFunc(func() {}), nil)
mgr := NewConfigSource(Config{
NodeName: nodeName,
@ -157,10 +179,11 @@ func TestConfigSource_LocallyManagedService(t *testing.T) {
LocalConfigSource: localWatcher,
Logger: hclog.NewNullLogger(),
GetStore: func() Store { panic("state store shouldn't have been used") },
SessionLimiter: nullSessionLimiter{},
})
t.Cleanup(mgr.Shutdown)
_, _, err := mgr.Watch(serviceID, nodeName, token)
_, _, _, err := mgr.Watch(serviceID, nodeName, token)
require.NoError(t, err)
}
@ -192,17 +215,26 @@ func TestConfigSource_ErrorRegisteringService(t *testing.T) {
cfgMgr.On("Register", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(errors.New("KABOOM"))
session := newMockSession(t)
session.On("End").Return()
lim := NewMockSessionLimiter(t)
lim.On("BeginSession").Return(session, nil)
mgr := NewConfigSource(Config{
Manager: cfgMgr,
LocalState: testLocalState(t),
Logger: hclog.NewNullLogger(),
GetStore: func() Store { return store },
Manager: cfgMgr,
LocalState: testLocalState(t),
Logger: hclog.NewNullLogger(),
GetStore: func() Store { return store },
SessionLimiter: lim,
})
t.Cleanup(mgr.Shutdown)
_, _, err := mgr.Watch(serviceID, nodeName, token)
_, _, _, err := mgr.Watch(serviceID, nodeName, token)
require.Error(t, err)
require.True(t, canceledWatch, "watch should've been canceled")
session.AssertCalled(t, "End")
}
func TestConfigSource_NotProxyService(t *testing.T) {
@ -231,19 +263,38 @@ func TestConfigSource_NotProxyService(t *testing.T) {
Return(make(<-chan *proxycfg.ConfigSnapshot), cancel)
mgr := NewConfigSource(Config{
Manager: cfgMgr,
LocalState: testLocalState(t),
Logger: hclog.NewNullLogger(),
GetStore: func() Store { return store },
Manager: cfgMgr,
LocalState: testLocalState(t),
Logger: hclog.NewNullLogger(),
GetStore: func() Store { return store },
SessionLimiter: nullSessionLimiter{},
})
t.Cleanup(mgr.Shutdown)
_, _, err := mgr.Watch(serviceID, nodeName, token)
_, _, _, err := mgr.Watch(serviceID, nodeName, token)
require.Error(t, err)
require.Contains(t, err.Error(), "must be a sidecar proxy or gateway")
require.True(t, canceledWatch, "watch should've been canceled")
}
func TestConfigSource_SessionLimiterError(t *testing.T) {
lim := NewMockSessionLimiter(t)
lim.On("BeginSession").Return(nil, limiter.ErrCapacityReached)
src := NewConfigSource(Config{
LocalState: testLocalState(t),
SessionLimiter: lim,
})
t.Cleanup(src.Shutdown)
_, _, _, err := src.Watch(
structs.NewServiceID("web-sidecar-proxy-1", nil),
"node-name",
"token",
)
require.Equal(t, limiter.ErrCapacityReached, err)
}
func testConfigManager(t *testing.T, serviceID structs.ServiceID, nodeName string, token string) ConfigManager {
t.Helper()
@ -294,3 +345,34 @@ func testLocalState(t *testing.T) *local.State {
l.TriggerSyncChanges = func() {}
return l
}
type nullSessionLimiter struct{}
func (nullSessionLimiter) BeginSession() (limiter.Session, error) {
return nullSession{}, nil
}
type nullSession struct{}
func (nullSession) End() {}
func (nullSession) Terminated() limiter.SessionTerminatedChan { return nil }
type mockSession struct {
mock.Mock
}
func newMockSession(t *testing.T) *mockSession {
m := &mockSession{}
m.Mock.Test(t)
t.Cleanup(func() { m.AssertExpectations(t) })
return m
}
func (m *mockSession) End() { m.Called() }
func (m *mockSession) Terminated() limiter.SessionTerminatedChan {
return m.Called().Get(0).(limiter.SessionTerminatedChan)
}

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.12.0. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package catalog
@ -7,8 +7,6 @@ import (
mock "github.com/stretchr/testify/mock"
structs "github.com/hashicorp/consul/agent/structs"
testing "testing"
)
// MockConfigManager is an autogenerated mock type for the ConfigManager type
@ -60,8 +58,13 @@ func (_m *MockConfigManager) Watch(req proxycfg.ProxyID) (<-chan *proxycfg.Confi
return r0, r1
}
// NewMockConfigManager creates a new instance of MockConfigManager. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockConfigManager(t testing.TB) *MockConfigManager {
type mockConstructorTestingTNewMockConfigManager interface {
mock.TestingT
Cleanup(func())
}
// NewMockConfigManager creates a new instance of MockConfigManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockConfigManager(t mockConstructorTestingTNewMockConfigManager) *MockConfigManager {
mock := &MockConfigManager{}
mock.Mock.Test(t)

View File

@ -0,0 +1,51 @@
// Code generated by mockery v2.15.0. DO NOT EDIT.
package catalog
import (
limiter "github.com/hashicorp/consul/agent/grpc-external/limiter"
mock "github.com/stretchr/testify/mock"
)
// MockSessionLimiter is an autogenerated mock type for the SessionLimiter type
type MockSessionLimiter struct {
mock.Mock
}
// BeginSession provides a mock function with given fields:
func (_m *MockSessionLimiter) BeginSession() (limiter.Session, error) {
ret := _m.Called()
var r0 limiter.Session
if rf, ok := ret.Get(0).(func() limiter.Session); ok {
r0 = rf()
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(limiter.Session)
}
}
var r1 error
if rf, ok := ret.Get(1).(func() error); ok {
r1 = rf()
} else {
r1 = ret.Error(1)
}
return r0, r1
}
type mockConstructorTestingTNewMockSessionLimiter interface {
mock.TestingT
Cleanup(func())
}
// NewMockSessionLimiter creates a new instance of MockSessionLimiter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockSessionLimiter(t mockConstructorTestingTNewMockSessionLimiter) *MockSessionLimiter {
mock := &MockSessionLimiter{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -1,14 +1,14 @@
// Code generated by mockery v2.12.0. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package catalog
import (
proxycfg "github.com/hashicorp/consul/agent/proxycfg"
limiter "github.com/hashicorp/consul/agent/grpc-external/limiter"
mock "github.com/stretchr/testify/mock"
structs "github.com/hashicorp/consul/agent/structs"
proxycfg "github.com/hashicorp/consul/agent/proxycfg"
testing "testing"
structs "github.com/hashicorp/consul/agent/structs"
)
// MockWatcher is an autogenerated mock type for the Watcher type
@ -17,7 +17,7 @@ type MockWatcher struct {
}
// Watch provides a mock function with given fields: proxyID, nodeName, token
func (_m *MockWatcher) Watch(proxyID structs.ServiceID, nodeName string, token string) (<-chan *proxycfg.ConfigSnapshot, proxycfg.CancelFunc, error) {
func (_m *MockWatcher) Watch(proxyID structs.ServiceID, nodeName string, token string) (<-chan *proxycfg.ConfigSnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) {
ret := _m.Called(proxyID, nodeName, token)
var r0 <-chan *proxycfg.ConfigSnapshot
@ -29,27 +29,41 @@ func (_m *MockWatcher) Watch(proxyID structs.ServiceID, nodeName string, token s
}
}
var r1 proxycfg.CancelFunc
if rf, ok := ret.Get(1).(func(structs.ServiceID, string, string) proxycfg.CancelFunc); ok {
var r1 limiter.SessionTerminatedChan
if rf, ok := ret.Get(1).(func(structs.ServiceID, string, string) limiter.SessionTerminatedChan); ok {
r1 = rf(proxyID, nodeName, token)
} else {
if ret.Get(1) != nil {
r1 = ret.Get(1).(proxycfg.CancelFunc)
r1 = ret.Get(1).(limiter.SessionTerminatedChan)
}
}
var r2 error
if rf, ok := ret.Get(2).(func(structs.ServiceID, string, string) error); ok {
var r2 proxycfg.CancelFunc
if rf, ok := ret.Get(2).(func(structs.ServiceID, string, string) proxycfg.CancelFunc); ok {
r2 = rf(proxyID, nodeName, token)
} else {
r2 = ret.Error(2)
if ret.Get(2) != nil {
r2 = ret.Get(2).(proxycfg.CancelFunc)
}
}
return r0, r1, r2
var r3 error
if rf, ok := ret.Get(3).(func(structs.ServiceID, string, string) error); ok {
r3 = rf(proxyID, nodeName, token)
} else {
r3 = ret.Error(3)
}
return r0, r1, r2, r3
}
// NewMockWatcher creates a new instance of MockWatcher. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockWatcher(t testing.TB) *MockWatcher {
type mockConstructorTestingTNewMockWatcher interface {
mock.TestingT
Cleanup(func())
}
// NewMockWatcher creates a new instance of MockWatcher. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockWatcher(t mockConstructorTestingTNewMockWatcher) *MockWatcher {
mock := &MockWatcher{}
mock.Mock.Test(t)

View File

@ -1,6 +1,7 @@
package local
import (
"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/agent/proxycfg"
structs "github.com/hashicorp/consul/agent/structs"
)
@ -16,7 +17,7 @@ func NewConfigSource(cfgMgr ConfigManager) *ConfigSource {
return &ConfigSource{cfgMgr}
}
func (m *ConfigSource) Watch(serviceID structs.ServiceID, nodeName string, _ string) (<-chan *proxycfg.ConfigSnapshot, proxycfg.CancelFunc, error) {
func (m *ConfigSource) Watch(serviceID structs.ServiceID, nodeName string, _ string) (<-chan *proxycfg.ConfigSnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) {
watchCh, cancelWatch := m.manager.Watch(proxycfg.ProxyID{
ServiceID: serviceID,
NodeName: nodeName,
@ -27,5 +28,5 @@ func (m *ConfigSource) Watch(serviceID structs.ServiceID, nodeName string, _ str
// is checked before the watch is created).
Token: "",
})
return watchCh, cancelWatch, nil
return watchCh, nil, cancelWatch, nil
}

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.12.0. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package local
@ -7,8 +7,6 @@ import (
mock "github.com/stretchr/testify/mock"
structs "github.com/hashicorp/consul/agent/structs"
testing "testing"
)
// MockConfigManager is an autogenerated mock type for the ConfigManager type
@ -76,8 +74,13 @@ func (_m *MockConfigManager) Watch(id proxycfg.ProxyID) (<-chan *proxycfg.Config
return r0, r1
}
// NewMockConfigManager creates a new instance of MockConfigManager. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockConfigManager(t testing.TB) *MockConfigManager {
type mockConstructorTestingTNewMockConfigManager interface {
mock.TestingT
Cleanup(func())
}
// NewMockConfigManager creates a new instance of MockConfigManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockConfigManager(t mockConstructorTestingTNewMockConfigManager) *MockConfigManager {
mock := &MockConfigManager{}
mock.Mock.Test(t)

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.12.0. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package submatview
@ -7,8 +7,6 @@ import (
mock "github.com/stretchr/testify/mock"
resolver "github.com/hashicorp/consul/acl/resolver"
testing "testing"
)
// MockACLResolver is an autogenerated mock type for the ACLResolver type
@ -37,8 +35,13 @@ func (_m *MockACLResolver) ResolveTokenAndDefaultMeta(token string, entMeta *acl
return r0, r1
}
// NewMockACLResolver creates a new instance of MockACLResolver. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockACLResolver(t testing.TB) *MockACLResolver {
type mockConstructorTestingTNewMockACLResolver interface {
mock.TestingT
Cleanup(func())
}
// NewMockACLResolver creates a new instance of MockACLResolver. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockACLResolver(t mockConstructorTestingTNewMockACLResolver) *MockACLResolver {
mock := &MockACLResolver{}
mock.Mock.Test(t)

View File

@ -3,6 +3,7 @@ package xds
import (
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"sync"
"sync/atomic"
@ -23,6 +24,7 @@ import (
"google.golang.org/grpc/status"
external "github.com/hashicorp/consul/agent/grpc-external"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/xds/serverlessplugin"
@ -89,17 +91,12 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
return err
}
session, err := s.SessionLimiter.BeginSession()
if err != nil {
return errOverwhelmed
}
defer session.End()
// Loop state
var (
cfgSnap *proxycfg.ConfigSnapshot
node *envoy_config_core_v3.Node
stateCh <-chan *proxycfg.ConfigSnapshot
drainCh limiter.SessionTerminatedChan
watchCancel func()
proxyID structs.ServiceID
nonce uint64 // xDS requires a unique nonce to correlate response/request pairs
@ -177,7 +174,7 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
for {
select {
case <-session.Terminated():
case <-drainCh:
generator.Logger.Debug("draining stream to rebalance load")
metrics.IncrCounter([]string{"xds", "server", "streamDrained"}, 1)
return errOverwhelmed
@ -296,8 +293,11 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
return status.Errorf(codes.Internal, "failed to watch proxy service: %s", err)
}
stateCh, watchCancel, err = s.CfgSrc.Watch(proxyID, nodeName, options.Token)
if err != nil {
stateCh, drainCh, watchCancel, err = s.CfgSrc.Watch(proxyID, nodeName, options.Token)
switch {
case errors.Is(err, limiter.ErrCapacityReached):
return errOverwhelmed
case err != nil:
return status.Errorf(codes.Internal, "failed to watch proxy service: %s", err)
}
// Note that in this case we _intend_ the defer to only be triggered when

View File

@ -32,12 +32,11 @@ import (
func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
for _, serverlessPluginEnabled := range []bool{false, true} {
t.Run(fmt.Sprintf("serverless patcher: %t", serverlessPluginEnabled), func(t *testing.T) {
aclResolve := func(id string) (acl.Authorizer, error) {
// Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, serverlessPluginEnabled, nil)
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, serverlessPluginEnabled)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("web-sidecar-proxy", nil)
@ -238,7 +237,7 @@ func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) {
// Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, nil)
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("web-sidecar-proxy", nil)
@ -370,7 +369,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) {
// Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, nil)
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("web-sidecar-proxy", nil)
@ -522,7 +521,7 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T)
// Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, nil)
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false)
server, mgr, errCh, envoy := scenario.server, scenario.mgr, scenario.errCh, scenario.envoy
// This mutateFn causes any endpoint with a name containing "geo-cache" to be
@ -667,7 +666,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa
// Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, nil)
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("web-sidecar-proxy", nil)
@ -804,7 +803,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan
// Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, nil)
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("web-sidecar-proxy", nil)
@ -1062,7 +1061,7 @@ func TestServer_DeltaAggregatedResources_v3_ACLEnforcement(t *testing.T) {
return acl.NewPolicyAuthorizerWithDefaults(acl.RootAuthorizer("deny"), []*acl.Policy{policy}, nil)
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", tt.token, 0, false, nil)
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", tt.token, 0, false)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("web-sidecar-proxy", nil)
@ -1140,7 +1139,6 @@ func TestServer_DeltaAggregatedResources_v3_ACLTokenDeleted_StreamTerminatedDuri
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", token,
100*time.Millisecond, // Make this short.
false,
nil,
)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
@ -1240,7 +1238,6 @@ func TestServer_DeltaAggregatedResources_v3_ACLTokenDeleted_StreamTerminatedInBa
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", token,
100*time.Millisecond, // Make this short.
false,
nil,
)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
@ -1321,7 +1318,7 @@ func TestServer_DeltaAggregatedResources_v3_IngressEmptyResponse(t *testing.T) {
// Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerDeltaScenario(t, aclResolve, "ingress-gateway", "", 0, false, nil)
scenario := newTestServerDeltaScenario(t, aclResolve, "ingress-gateway", "", 0, false)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("ingress-gateway", nil)
@ -1376,12 +1373,13 @@ func TestServer_DeltaAggregatedResources_v3_IngressEmptyResponse(t *testing.T) {
func TestServer_DeltaAggregatedResources_v3_CapacityReached(t *testing.T) {
aclResolve := func(id string) (acl.Authorizer, error) { return acl.ManageAll(), nil }
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, capacityReachedLimiter{})
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("web-sidecar-proxy", nil)
mgr.RegisterProxy(t, sid)
mgr.DrainStreams(sid)
snap := newTestSnapshot(t, nil, "")
@ -1407,10 +1405,8 @@ func (capacityReachedLimiter) BeginSession() (limiter.Session, error) {
}
func TestServer_DeltaAggregatedResources_v3_StreamDrained(t *testing.T) {
limiter := &testLimiter{}
aclResolve := func(id string) (acl.Authorizer, error) { return acl.ManageAll(), nil }
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, limiter)
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("web-sidecar-proxy", nil)
@ -1439,7 +1435,7 @@ func TestServer_DeltaAggregatedResources_v3_StreamDrained(t *testing.T) {
})
testutil.RunStep(t, "terminate limiter session", func(t *testing.T) {
limiter.TerminateSession()
mgr.DrainStreams(sid)
select {
case err := <-errCh:
@ -1476,25 +1472,6 @@ func TestServer_DeltaAggregatedResources_v3_StreamDrained(t *testing.T) {
}
type testLimiter struct {
termCh chan struct{}
}
func (t *testLimiter) BeginSession() (limiter.Session, error) {
t.termCh = make(chan struct{})
return &testSession{termCh: t.termCh}, nil
}
func (t *testLimiter) TerminateSession() { close(t.termCh) }
type testSession struct {
termCh chan struct{}
}
func (t *testSession) Terminated() <-chan struct{} { return t.termCh }
func (*testSession) End() {}
func assertDeltaChanBlocked(t *testing.T, ch chan *envoy_discovery_v3.DeltaDiscoveryResponse) {
t.Helper()
select {

View File

@ -109,13 +109,7 @@ type ConfigFetcher interface {
// ProxyConfigSource is the interface xds.Server requires to consume proxy
// config updates.
type ProxyConfigSource interface {
Watch(id structs.ServiceID, nodeName string, token string) (<-chan *proxycfg.ConfigSnapshot, proxycfg.CancelFunc, error)
}
// SessionLimiter is the interface exposed by limiter.SessionLimiter. We depend
// on an interface rather than the concrete type so we can mock it in tests.
type SessionLimiter interface {
BeginSession() (limiter.Session, error)
Watch(id structs.ServiceID, nodeName string, token string) (<-chan *proxycfg.ConfigSnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error)
}
// Server represents a gRPC server that can handle xDS requests from Envoy. All
@ -124,12 +118,11 @@ type SessionLimiter interface {
// A full description of the XDS protocol can be found at
// https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol
type Server struct {
NodeName string
Logger hclog.Logger
CfgSrc ProxyConfigSource
ResolveToken ACLResolverFunc
CfgFetcher ConfigFetcher
SessionLimiter SessionLimiter
NodeName string
Logger hclog.Logger
CfgSrc ProxyConfigSource
ResolveToken ACLResolverFunc
CfgFetcher ConfigFetcher
// AuthCheckFrequency is how often we should re-check the credentials used
// during a long-lived gRPC Stream after it has been initially established.
@ -181,7 +174,6 @@ func NewServer(
cfgMgr ProxyConfigSource,
resolveToken ACLResolverFunc,
cfgFetcher ConfigFetcher,
limiter SessionLimiter,
) *Server {
return &Server{
NodeName: nodeName,
@ -189,7 +181,6 @@ func NewServer(
CfgSrc: cfgMgr,
ResolveToken: resolveToken,
CfgFetcher: cfgFetcher,
SessionLimiter: limiter,
AuthCheckFrequency: DefaultAuthCheckFrequency,
activeStreams: &activeStreamCounters{},
serverlessPluginEnabled: serverlessPluginEnabled,

View File

@ -67,14 +67,16 @@ func newTestSnapshot(
// testing. It also implements ConnectAuthz to allow control over authorization.
type testManager struct {
sync.Mutex
chans map[structs.ServiceID]chan *proxycfg.ConfigSnapshot
cancels chan structs.ServiceID
stateChans map[structs.ServiceID]chan *proxycfg.ConfigSnapshot
drainChans map[structs.ServiceID]chan struct{}
cancels chan structs.ServiceID
}
func newTestManager(t *testing.T) *testManager {
return &testManager{
chans: map[structs.ServiceID]chan *proxycfg.ConfigSnapshot{},
cancels: make(chan structs.ServiceID, 10),
stateChans: map[structs.ServiceID]chan *proxycfg.ConfigSnapshot{},
drainChans: map[structs.ServiceID]chan struct{}{},
cancels: make(chan structs.ServiceID, 10),
}
}
@ -82,7 +84,8 @@ func newTestManager(t *testing.T) *testManager {
func (m *testManager) RegisterProxy(t *testing.T, proxyID structs.ServiceID) {
m.Lock()
defer m.Unlock()
m.chans[proxyID] = make(chan *proxycfg.ConfigSnapshot, 1)
m.stateChans[proxyID] = make(chan *proxycfg.ConfigSnapshot, 1)
m.drainChans[proxyID] = make(chan struct{})
}
// Deliver simulates a proxy registration
@ -91,18 +94,42 @@ func (m *testManager) DeliverConfig(t *testing.T, proxyID structs.ServiceID, cfg
m.Lock()
defer m.Unlock()
select {
case m.chans[proxyID] <- cfg:
case m.stateChans[proxyID] <- cfg:
case <-time.After(10 * time.Millisecond):
t.Fatalf("took too long to deliver config")
}
}
// Watch implements ConfigManager
func (m *testManager) Watch(proxyID structs.ServiceID, _ string, _ string) (<-chan *proxycfg.ConfigSnapshot, proxycfg.CancelFunc, error) {
// DrainStreams drains any open streams for the given proxyID. If there aren't
// any open streams, it'll create a marker so that future attempts to watch the
// given proxyID will return limiter.ErrCapacityReached.
func (m *testManager) DrainStreams(proxyID structs.ServiceID) {
m.Lock()
defer m.Unlock()
ch, ok := m.drainChans[proxyID]
if !ok {
ch = make(chan struct{})
m.drainChans[proxyID] = ch
}
close(ch)
}
// Watch implements ConfigManager
func (m *testManager) Watch(proxyID structs.ServiceID, _ string, _ string) (<-chan *proxycfg.ConfigSnapshot, limiter.SessionTerminatedChan, proxycfg.CancelFunc, error) {
m.Lock()
defer m.Unlock()
// If the drain chan has already been closed, return limiter.ErrCapacityReached.
drainCh := m.drainChans[proxyID]
select {
case <-drainCh:
return nil, nil, nil, limiter.ErrCapacityReached
default:
}
// ch might be nil but then it will just block forever
return m.chans[proxyID], func() {
return m.stateChans[proxyID], drainCh, func() {
m.cancels <- proxyID
}, nil
}
@ -137,7 +164,6 @@ func newTestServerDeltaScenario(
token string,
authCheckFrequency time.Duration,
serverlessPluginEnabled bool,
sessionLimiter SessionLimiter,
) *testServerScenario {
mgr := newTestManager(t)
envoy := NewTestEnvoy(t, proxyID, token)
@ -156,10 +182,6 @@ func newTestServerDeltaScenario(
metrics.NewGlobal(cfg, sink)
})
if sessionLimiter == nil {
sessionLimiter = limiter.NewSessionLimiter()
}
s := NewServer(
"node-123",
testutil.Logger(t),
@ -167,7 +189,6 @@ func newTestServerDeltaScenario(
mgr,
resolveToken,
nil, /*cfgFetcher ConfigFetcher*/
sessionLimiter,
)
if authCheckFrequency > 0 {
s.AuthCheckFrequency = authCheckFrequency

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.12.2. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package pbdns
@ -8,8 +8,6 @@ import (
grpc "google.golang.org/grpc"
mock "github.com/stretchr/testify/mock"
testing "testing"
)
// MockDNSServiceClient is an autogenerated mock type for the DNSServiceClient type
@ -47,8 +45,13 @@ func (_m *MockDNSServiceClient) Query(ctx context.Context, in *QueryRequest, opt
return r0, r1
}
// NewMockDNSServiceClient creates a new instance of MockDNSServiceClient. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockDNSServiceClient(t testing.TB) *MockDNSServiceClient {
type mockConstructorTestingTNewMockDNSServiceClient interface {
mock.TestingT
Cleanup(func())
}
// NewMockDNSServiceClient creates a new instance of MockDNSServiceClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockDNSServiceClient(t mockConstructorTestingTNewMockDNSServiceClient) *MockDNSServiceClient {
mock := &MockDNSServiceClient{}
mock.Mock.Test(t)

View File

@ -1,10 +1,9 @@
// Code generated by mockery v2.12.2. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package pbdns
import (
context "context"
testing "testing"
mock "github.com/stretchr/testify/mock"
)
@ -37,8 +36,13 @@ func (_m *MockDNSServiceServer) Query(_a0 context.Context, _a1 *QueryRequest) (*
return r0, r1
}
// NewMockDNSServiceServer creates a new instance of MockDNSServiceServer. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockDNSServiceServer(t testing.TB) *MockDNSServiceServer {
type mockConstructorTestingTNewMockDNSServiceServer interface {
mock.TestingT
Cleanup(func())
}
// NewMockDNSServiceServer creates a new instance of MockDNSServiceServer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockDNSServiceServer(t mockConstructorTestingTNewMockDNSServiceServer) *MockDNSServiceServer {
mock := &MockDNSServiceServer{}
mock.Mock.Test(t)

View File

@ -1,12 +1,8 @@
// Code generated by mockery v2.12.2. DO NOT EDIT.
// Code generated by mockery v2.15.0. DO NOT EDIT.
package pbdns
import (
testing "testing"
mock "github.com/stretchr/testify/mock"
)
import mock "github.com/stretchr/testify/mock"
// MockUnsafeDNSServiceServer is an autogenerated mock type for the UnsafeDNSServiceServer type
type MockUnsafeDNSServiceServer struct {
@ -18,8 +14,13 @@ func (_m *MockUnsafeDNSServiceServer) mustEmbedUnimplementedDNSServiceServer() {
_m.Called()
}
// NewMockUnsafeDNSServiceServer creates a new instance of MockUnsafeDNSServiceServer. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockUnsafeDNSServiceServer(t testing.TB) *MockUnsafeDNSServiceServer {
type mockConstructorTestingTNewMockUnsafeDNSServiceServer interface {
mock.TestingT
Cleanup(func())
}
// NewMockUnsafeDNSServiceServer creates a new instance of MockUnsafeDNSServiceServer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockUnsafeDNSServiceServer(t mockConstructorTestingTNewMockUnsafeDNSServiceServer) *MockUnsafeDNSServiceServer {
mock := &MockUnsafeDNSServiceServer{}
mock.Mock.Test(t)