mirror of https://github.com/hashicorp/consul
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
549 lines
16 KiB
549 lines
16 KiB
// Copyright (c) HashiCorp, Inc. |
|
// SPDX-License-Identifier: BUSL-1.1 |
|
|
|
package catalog |
|
|
|
import ( |
|
"context" |
|
"errors" |
|
"fmt" |
|
"testing" |
|
"time" |
|
|
|
"github.com/hashicorp/go-hclog" |
|
"github.com/stretchr/testify/mock" |
|
"github.com/stretchr/testify/require" |
|
|
|
"github.com/hashicorp/consul/agent/consul/state" |
|
"github.com/hashicorp/consul/agent/consul/stream" |
|
"github.com/hashicorp/consul/agent/grpc-external/limiter" |
|
"github.com/hashicorp/consul/agent/local" |
|
"github.com/hashicorp/consul/agent/proxycfg" |
|
"github.com/hashicorp/consul/agent/structs" |
|
"github.com/hashicorp/consul/agent/token" |
|
proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot" |
|
rtest "github.com/hashicorp/consul/internal/resource/resourcetest" |
|
pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v2beta1" |
|
) |
|
|
|
func TestConfigSource_Success(t *testing.T) { |
|
serviceID := structs.NewServiceID("web-sidecar-proxy-1", nil) |
|
nodeName := "node-name" |
|
token := "token" |
|
|
|
store := testStateStore(t) |
|
|
|
// Register the proxy in the catalog/state store at port 9999. |
|
require.NoError(t, store.EnsureRegistration(0, &structs.RegisterRequest{ |
|
Node: nodeName, |
|
Service: &structs.NodeService{ |
|
ID: serviceID.ID, |
|
Service: "web-sidecar-proxy", |
|
Port: 9999, |
|
Kind: structs.ServiceKindConnectProxy, |
|
Proxy: structs.ConnectProxyConfig{ |
|
Config: map[string]any{ |
|
"local_connect_timeout_ms": 123, |
|
}, |
|
}, |
|
}, |
|
})) |
|
|
|
// testConfigManager builds a ConfigManager that emits a ConfigSnapshot whenever |
|
// Register is called, and closes the watch channel when Deregister is called. |
|
// |
|
// Though a little odd, this allows us to make assertions on the sync goroutine's |
|
// behavior without sleeping which leads to slow/racy tests. |
|
cfgMgr := testConfigManager(t, serviceID, nodeName, token) |
|
|
|
lim := NewMockSessionLimiter(t) |
|
|
|
session1 := newMockSession(t) |
|
session1TermCh := make(limiter.SessionTerminatedChan) |
|
session1.On("Terminated").Return(session1TermCh) |
|
session1.On("End").Return() |
|
|
|
session2 := newMockSession(t) |
|
session2TermCh := make(limiter.SessionTerminatedChan) |
|
session2.On("Terminated").Return(session2TermCh) |
|
session2.On("End").Return() |
|
|
|
lim.On("BeginSession").Return(session1, nil).Once() |
|
lim.On("BeginSession").Return(session2, nil).Once() |
|
|
|
mgr := NewConfigSource(Config{ |
|
Manager: cfgMgr, |
|
LocalState: testLocalState(t), |
|
Logger: hclog.NewNullLogger(), |
|
GetStore: func() Store { return store }, |
|
SessionLimiter: lim, |
|
}) |
|
t.Cleanup(mgr.Shutdown) |
|
|
|
snapCh, termCh, _, cancelWatch1, err := mgr.Watch(rtest.Resource(pbmesh.ProxyConfigurationType, serviceID.ID).ID(), nodeName, token) |
|
require.NoError(t, err) |
|
require.Equal(t, session1TermCh, termCh) |
|
|
|
// Expect Register to have been called with the proxy's inital port. |
|
select { |
|
case snap := <-snapCh: |
|
require.Equal(t, 9999, snap.(*proxycfg.ConfigSnapshot).Port) |
|
require.Equal(t, token, snap.(*proxycfg.ConfigSnapshot).ProxyID.Token) |
|
case <-time.After(100 * time.Millisecond): |
|
t.Fatal("timeout waiting for snapshot") |
|
} |
|
|
|
// Update the proxy's port to 8888. |
|
require.NoError(t, store.EnsureRegistration(0, &structs.RegisterRequest{ |
|
Node: nodeName, |
|
Service: &structs.NodeService{ |
|
ID: serviceID.ID, |
|
Service: "web-sidecar-proxy", |
|
Port: 8888, |
|
Kind: structs.ServiceKindConnectProxy, |
|
Proxy: structs.ConnectProxyConfig{ |
|
Config: map[string]any{ |
|
"local_connect_timeout_ms": 123, |
|
}, |
|
}, |
|
}, |
|
})) |
|
|
|
// Expect Register to have been called again with the proxy's new port. |
|
select { |
|
case snap := <-snapCh: |
|
require.Equal(t, 8888, snap.(*proxycfg.ConfigSnapshot).Port) |
|
case <-time.After(100 * time.Millisecond): |
|
t.Fatal("timeout waiting for snapshot") |
|
} |
|
|
|
// Update proxy-defaults. |
|
require.NoError(t, store.EnsureConfigEntry(1, &structs.ProxyConfigEntry{ |
|
Name: structs.ProxyConfigGlobal, |
|
Config: map[string]any{ |
|
"max_inbound_connections": 321, |
|
}, |
|
})) |
|
|
|
// Expect Register to have been called again with the new merged config. |
|
select { |
|
case snap := <-snapCh: |
|
require.Equal(t, map[string]any{ |
|
"local_connect_timeout_ms": 123, |
|
"max_inbound_connections": 321, |
|
}, snap.(*proxycfg.ConfigSnapshot).Proxy.Config) |
|
case <-time.After(100 * time.Millisecond): |
|
t.Fatal("timeout waiting for snapshot") |
|
} |
|
|
|
// Start another watch. |
|
_, termCh2, _, cancelWatch2, err := mgr.Watch(rtest.Resource(pbmesh.ProxyConfigurationType, serviceID.ID).ID(), nodeName, token) |
|
require.NoError(t, err) |
|
require.Equal(t, session2TermCh, termCh2) |
|
|
|
// Expect the service to have not been re-registered by the second watch. |
|
select { |
|
case <-snapCh: |
|
t.Fatal("service shouldn't have been re-registered") |
|
case <-time.After(100 * time.Millisecond): |
|
} |
|
|
|
// Expect cancelling the first watch to *not* de-register the service. |
|
cancelWatch1() |
|
select { |
|
case <-snapCh: |
|
t.Fatal("service shouldn't have been de-registered until other watch went away") |
|
case <-time.After(100 * time.Millisecond): |
|
} |
|
|
|
// Expect cancelling the other watch to de-register the service. |
|
cancelWatch2() |
|
select { |
|
case _, ok := <-snapCh: |
|
require.False(t, ok, "channel should've been closed") |
|
case <-time.After(100 * time.Millisecond): |
|
t.Fatal("timeout waiting for service to be de-registered") |
|
} |
|
|
|
session1.AssertCalled(t, "End") |
|
session2.AssertCalled(t, "End") |
|
} |
|
|
|
func TestConfigSource_LocallyManagedService(t *testing.T) { |
|
serviceID := structs.NewServiceID("web-sidecar-proxy-1", nil) |
|
proxyID := rtest.Resource(pbmesh.ProxyConfigurationType, serviceID.ID).ID() |
|
nodeName := "node-1" |
|
token := "token" |
|
|
|
localState := testLocalState(t) |
|
localState.AddServiceWithChecks(&structs.NodeService{ID: serviceID.ID}, nil, "", false) |
|
|
|
localWatcher := NewMockWatcher(t) |
|
localWatcher.On("Watch", proxyID, nodeName, token). |
|
Return(make(<-chan proxysnapshot.ProxySnapshot), nil, nil, proxysnapshot.CancelFunc(func() {}), nil) |
|
|
|
mgr := NewConfigSource(Config{ |
|
NodeName: nodeName, |
|
LocalState: localState, |
|
LocalConfigSource: localWatcher, |
|
Logger: hclog.NewNullLogger(), |
|
GetStore: func() Store { panic("state store shouldn't have been used") }, |
|
SessionLimiter: nullSessionLimiter{}, |
|
}) |
|
t.Cleanup(mgr.Shutdown) |
|
|
|
_, _, _, _, err := mgr.Watch(proxyID, nodeName, token) |
|
require.NoError(t, err) |
|
} |
|
|
|
func TestConfigSource_ErrorRegisteringService(t *testing.T) { |
|
serviceID := structs.NewServiceID("web-sidecar-proxy-1", nil) |
|
nodeName := "node-name" |
|
token := "token" |
|
|
|
store := testStateStore(t) |
|
|
|
require.NoError(t, store.EnsureRegistration(0, &structs.RegisterRequest{ |
|
Node: nodeName, |
|
Service: &structs.NodeService{ |
|
ID: serviceID.ID, |
|
Service: "web-sidecar-proxy", |
|
Port: 9999, |
|
Kind: structs.ServiceKindConnectProxy, |
|
}, |
|
})) |
|
|
|
var canceledWatch bool |
|
cancel := proxysnapshot.CancelFunc(func() { canceledWatch = true }) |
|
|
|
cfgMgr := NewMockConfigManager(t) |
|
|
|
cfgMgr.On("Watch", mock.Anything). |
|
Return(make(<-chan proxysnapshot.ProxySnapshot), cancel) |
|
|
|
cfgMgr.On("Register", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). |
|
Return(errors.New("KABOOM")) |
|
|
|
session := newMockSession(t) |
|
session.On("End").Return() |
|
|
|
lim := NewMockSessionLimiter(t) |
|
lim.On("BeginSession").Return(session, nil) |
|
|
|
mgr := NewConfigSource(Config{ |
|
Manager: cfgMgr, |
|
LocalState: testLocalState(t), |
|
Logger: hclog.NewNullLogger(), |
|
GetStore: func() Store { return store }, |
|
SessionLimiter: lim, |
|
}) |
|
t.Cleanup(mgr.Shutdown) |
|
|
|
_, _, _, _, err := mgr.Watch(rtest.Resource(pbmesh.ProxyConfigurationType, serviceID.ID).ID(), nodeName, token) |
|
require.Error(t, err) |
|
require.True(t, canceledWatch, "watch should've been canceled") |
|
|
|
session.AssertCalled(t, "End") |
|
} |
|
|
|
func TestConfigSource_ErrorInSyncLoop(t *testing.T) { |
|
serviceID := structs.NewServiceID("web-sidecar-proxy-1", nil) |
|
nodeName := "node-name" |
|
token := "token" |
|
|
|
store := testStateStore(t) |
|
|
|
// Register the proxy in the catalog/state store at port 9999. |
|
require.NoError(t, store.EnsureRegistration(0, &structs.RegisterRequest{ |
|
Node: nodeName, |
|
Service: &structs.NodeService{ |
|
ID: serviceID.ID, |
|
Service: "web-sidecar-proxy", |
|
Port: 9999, |
|
Kind: structs.ServiceKindConnectProxy, |
|
Proxy: structs.ConnectProxyConfig{ |
|
Config: map[string]any{ |
|
"local_connect_timeout_ms": 123, |
|
}, |
|
}, |
|
}, |
|
})) |
|
|
|
cfgMgr := NewMockConfigManager(t) |
|
{ |
|
proxyID := proxycfg.ProxyID{ |
|
ServiceID: serviceID, |
|
NodeName: nodeName, |
|
Token: token, |
|
} |
|
snapCh := make(chan proxysnapshot.ProxySnapshot, 1) |
|
cfgMgr.On("Watch", proxyID). |
|
Return((<-chan proxysnapshot.ProxySnapshot)(snapCh), proxysnapshot.CancelFunc(func() {}), nil) |
|
|
|
// Answer the register call successfully for session 1 starting (Repeatability = 1). |
|
// Session 2 should not have caused a re-register to happen. |
|
cfgMgr.On("Register", mock.Anything, mock.Anything, source, token, false). |
|
Run(func(args mock.Arguments) { |
|
id := args.Get(0).(proxycfg.ProxyID) |
|
ns := args.Get(1).(*structs.NodeService) |
|
|
|
snapCh <- &proxycfg.ConfigSnapshot{ |
|
ProxyID: id, |
|
Port: ns.Port, |
|
Proxy: ns.Proxy, |
|
} |
|
}). |
|
Return(nil). |
|
Repeatability = 1 |
|
|
|
// Error on subsequent registrations afterwards (during the sync loop). |
|
cfgMgr.On("Register", mock.Anything, mock.Anything, source, token, false). |
|
Return(fmt.Errorf("intentional registration error")) |
|
|
|
cfgMgr.On("Deregister", proxyID, source). |
|
Run(func(mock.Arguments) { close(snapCh) }). |
|
Return() |
|
} |
|
|
|
lim := NewMockSessionLimiter(t) |
|
session1TermCh := make(limiter.SessionTerminatedChan) |
|
session2TermCh := make(limiter.SessionTerminatedChan) |
|
{ |
|
session1 := newMockSession(t) |
|
session1.On("Terminated").Return(session1TermCh) |
|
session1.On("End").Return() |
|
|
|
session2 := newMockSession(t) |
|
session2.On("Terminated").Return(session2TermCh) |
|
session2.On("End").Return() |
|
|
|
lim.On("BeginSession").Return(session1, nil).Once() |
|
lim.On("BeginSession").Return(session2, nil).Once() |
|
} |
|
|
|
mgr := NewConfigSource(Config{ |
|
Manager: cfgMgr, |
|
LocalState: testLocalState(t), |
|
Logger: hclog.NewNullLogger(), |
|
GetStore: func() Store { return store }, |
|
SessionLimiter: lim, |
|
}) |
|
t.Cleanup(mgr.Shutdown) |
|
|
|
snapCh, termCh, cfgSrcTerminated1, cancelWatch1, err := mgr.Watch(rtest.Resource(pbmesh.ProxyConfigurationType, serviceID.ID).ID(), nodeName, token) |
|
require.NoError(t, err) |
|
require.Equal(t, session1TermCh, termCh) |
|
|
|
// Expect Register to have been called with the proxy's inital port. |
|
select { |
|
case snap := <-snapCh: |
|
require.Equal(t, 9999, snap.(*proxycfg.ConfigSnapshot).Port) |
|
require.Equal(t, token, snap.(*proxycfg.ConfigSnapshot).ProxyID.Token) |
|
case <-time.After(100 * time.Millisecond): |
|
t.Fatal("timeout waiting for snapshot") |
|
} |
|
|
|
// Start another watch. |
|
_, termCh2, cfgSrcTerminated2, cancelWatch2, err := mgr.Watch(rtest.Resource(pbmesh.ProxyConfigurationType, serviceID.ID).ID(), nodeName, token) |
|
require.NoError(t, err) |
|
require.Equal(t, session2TermCh, termCh2) |
|
|
|
// Expect the service to have not been re-registered by the second watch. |
|
select { |
|
case <-snapCh: |
|
t.Fatal("service shouldn't have been re-registered") |
|
case <-time.After(100 * time.Millisecond): |
|
} |
|
|
|
// Ensure that no config-source syncLoops were terminated. |
|
select { |
|
case <-cfgSrcTerminated1: |
|
t.Fatal("unexpected config-source termination 1") |
|
case <-cfgSrcTerminated2: |
|
t.Fatal("unexpected config-source termination 2") |
|
default: |
|
} |
|
|
|
// Update the proxy's port to 8888. |
|
// This should trigger the config-source syncLoop termination channel due to an error. |
|
require.NoError(t, store.EnsureRegistration(0, &structs.RegisterRequest{ |
|
Node: nodeName, |
|
Service: &structs.NodeService{ |
|
ID: serviceID.ID, |
|
Service: "web-sidecar-proxy", |
|
Port: 8888, |
|
Kind: structs.ServiceKindConnectProxy, |
|
Proxy: structs.ConnectProxyConfig{ |
|
Config: map[string]any{ |
|
"local_connect_timeout_ms": 123, |
|
}, |
|
}, |
|
}, |
|
})) |
|
|
|
// Expect both config sources to have terminated when the syncLoop errors. |
|
select { |
|
case _, ok := <-cfgSrcTerminated1: |
|
cancelWatch1() |
|
require.False(t, ok) |
|
case <-time.After(100 * time.Millisecond): |
|
t.Fatal("timeout waiting for config-source termination 1") |
|
} |
|
select { |
|
case _, ok := <-cfgSrcTerminated2: |
|
cancelWatch2() |
|
require.False(t, ok) |
|
case <-time.After(100 * time.Millisecond): |
|
t.Fatal("timeout waiting for config-source termination 2") |
|
} |
|
|
|
// Expect the snap channels to have been closed. |
|
select { |
|
case _, ok := <-snapCh: |
|
require.False(t, ok) |
|
case <-time.After(100 * time.Millisecond): |
|
t.Fatal("snap channel was not closed") |
|
} |
|
} |
|
|
|
func TestConfigSource_NotProxyService(t *testing.T) { |
|
serviceID := structs.NewServiceID("web", nil) |
|
nodeName := "node-name" |
|
token := "token" |
|
|
|
store := testStateStore(t) |
|
|
|
require.NoError(t, store.EnsureRegistration(0, &structs.RegisterRequest{ |
|
Node: nodeName, |
|
Service: &structs.NodeService{ |
|
ID: serviceID.ID, |
|
Service: "web", |
|
Port: 9999, |
|
Kind: structs.ServiceKindTypical, |
|
}, |
|
})) |
|
|
|
var canceledWatch bool |
|
cancel := proxysnapshot.CancelFunc(func() { canceledWatch = true }) |
|
|
|
cfgMgr := NewMockConfigManager(t) |
|
|
|
cfgMgr.On("Watch", mock.Anything). |
|
Return(make(<-chan proxysnapshot.ProxySnapshot), cancel) |
|
|
|
mgr := NewConfigSource(Config{ |
|
Manager: cfgMgr, |
|
LocalState: testLocalState(t), |
|
Logger: hclog.NewNullLogger(), |
|
GetStore: func() Store { return store }, |
|
SessionLimiter: nullSessionLimiter{}, |
|
}) |
|
t.Cleanup(mgr.Shutdown) |
|
|
|
_, _, _, _, err := mgr.Watch(rtest.Resource(pbmesh.ProxyConfigurationType, serviceID.ID).ID(), nodeName, token) |
|
require.Error(t, err) |
|
require.Contains(t, err.Error(), "must be a sidecar proxy or gateway") |
|
require.True(t, canceledWatch, "watch should've been canceled") |
|
} |
|
|
|
func TestConfigSource_SessionLimiterError(t *testing.T) { |
|
lim := NewMockSessionLimiter(t) |
|
lim.On("BeginSession").Return(nil, limiter.ErrCapacityReached) |
|
|
|
src := NewConfigSource(Config{ |
|
LocalState: testLocalState(t), |
|
SessionLimiter: lim, |
|
}) |
|
t.Cleanup(src.Shutdown) |
|
|
|
_, _, _, _, err := src.Watch( |
|
rtest.Resource(pbmesh.ProxyConfigurationType, "web-sidecar-proxy-1").ID(), |
|
"node-name", |
|
"token", |
|
) |
|
require.Equal(t, limiter.ErrCapacityReached, err) |
|
} |
|
|
|
func testConfigManager(t *testing.T, serviceID structs.ServiceID, nodeName string, token string) *MockConfigManager { |
|
t.Helper() |
|
|
|
cfgMgr := NewMockConfigManager(t) |
|
|
|
proxyID := proxycfg.ProxyID{ |
|
ServiceID: serviceID, |
|
NodeName: nodeName, |
|
Token: token, |
|
} |
|
|
|
snapCh := make(chan proxysnapshot.ProxySnapshot, 1) |
|
cfgMgr.On("Watch", proxyID). |
|
Return((<-chan proxysnapshot.ProxySnapshot)(snapCh), proxysnapshot.CancelFunc(func() {}), nil) |
|
|
|
cfgMgr.On("Register", mock.Anything, mock.Anything, source, token, false). |
|
Run(func(args mock.Arguments) { |
|
id := args.Get(0).(proxycfg.ProxyID) |
|
ns := args.Get(1).(*structs.NodeService) |
|
|
|
snapCh <- &proxycfg.ConfigSnapshot{ |
|
ProxyID: id, |
|
Port: ns.Port, |
|
Proxy: ns.Proxy, |
|
} |
|
}). |
|
Return(nil) |
|
|
|
cfgMgr.On("Deregister", proxyID, source). |
|
Run(func(mock.Arguments) { close(snapCh) }). |
|
Return() |
|
|
|
return cfgMgr |
|
} |
|
|
|
func testStateStore(t *testing.T) *state.Store { |
|
t.Helper() |
|
|
|
gc, err := state.NewTombstoneGC(time.Second, time.Millisecond) |
|
require.NoError(t, err) |
|
return state.NewStateStoreWithEventPublisher(gc, stream.NoOpEventPublisher{}) |
|
} |
|
|
|
func testLocalState(t *testing.T) *local.State { |
|
t.Helper() |
|
|
|
l := local.NewState(local.Config{}, hclog.NewNullLogger(), &token.Store{}) |
|
l.TriggerSyncChanges = func() {} |
|
return l |
|
} |
|
|
|
type nullSessionLimiter struct{} |
|
|
|
func (nullSessionLimiter) BeginSession() (limiter.Session, error) { |
|
return nullSession{}, nil |
|
} |
|
|
|
func (nullSessionLimiter) Run(ctx context.Context) {} |
|
|
|
type nullSession struct{} |
|
|
|
func (nullSession) End() {} |
|
|
|
func (nullSession) Terminated() limiter.SessionTerminatedChan { return nil } |
|
|
|
type mockSession struct { |
|
mock.Mock |
|
} |
|
|
|
func newMockSession(t *testing.T) *mockSession { |
|
m := &mockSession{} |
|
m.Mock.Test(t) |
|
|
|
t.Cleanup(func() { m.AssertExpectations(t) }) |
|
|
|
return m |
|
} |
|
|
|
func (m *mockSession) End() { m.Called() } |
|
|
|
func (m *mockSession) Terminated() limiter.SessionTerminatedChan { |
|
return m.Called().Get(0).(limiter.SessionTerminatedChan) |
|
}
|
|
|