Consul is a distributed, highly available, and data center aware solution to connect and configure applications across dynamic, distributed infrastructure.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

549 lines
16 KiB

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package catalog
import (
"context"
"errors"
"fmt"
"testing"
"time"
"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
proxysnapshot "github.com/hashicorp/consul/internal/mesh/proxy-snapshot"
rtest "github.com/hashicorp/consul/internal/resource/resourcetest"
pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v2beta1"
)
func TestConfigSource_Success(t *testing.T) {
serviceID := structs.NewServiceID("web-sidecar-proxy-1", nil)
nodeName := "node-name"
token := "token"
store := testStateStore(t)
// Register the proxy in the catalog/state store at port 9999.
require.NoError(t, store.EnsureRegistration(0, &structs.RegisterRequest{
Node: nodeName,
Service: &structs.NodeService{
ID: serviceID.ID,
Service: "web-sidecar-proxy",
Port: 9999,
Kind: structs.ServiceKindConnectProxy,
Proxy: structs.ConnectProxyConfig{
Config: map[string]any{
"local_connect_timeout_ms": 123,
},
},
},
}))
// testConfigManager builds a ConfigManager that emits a ConfigSnapshot whenever
// Register is called, and closes the watch channel when Deregister is called.
//
// Though a little odd, this allows us to make assertions on the sync goroutine's
// behavior without sleeping which leads to slow/racy tests.
cfgMgr := testConfigManager(t, serviceID, nodeName, token)
lim := NewMockSessionLimiter(t)
session1 := newMockSession(t)
session1TermCh := make(limiter.SessionTerminatedChan)
session1.On("Terminated").Return(session1TermCh)
session1.On("End").Return()
session2 := newMockSession(t)
session2TermCh := make(limiter.SessionTerminatedChan)
session2.On("Terminated").Return(session2TermCh)
session2.On("End").Return()
lim.On("BeginSession").Return(session1, nil).Once()
lim.On("BeginSession").Return(session2, nil).Once()
mgr := NewConfigSource(Config{
Manager: cfgMgr,
LocalState: testLocalState(t),
Logger: hclog.NewNullLogger(),
GetStore: func() Store { return store },
SessionLimiter: lim,
})
t.Cleanup(mgr.Shutdown)
snapCh, termCh, _, cancelWatch1, err := mgr.Watch(rtest.Resource(pbmesh.ProxyConfigurationType, serviceID.ID).ID(), nodeName, token)
require.NoError(t, err)
require.Equal(t, session1TermCh, termCh)
// Expect Register to have been called with the proxy's inital port.
select {
case snap := <-snapCh:
require.Equal(t, 9999, snap.(*proxycfg.ConfigSnapshot).Port)
require.Equal(t, token, snap.(*proxycfg.ConfigSnapshot).ProxyID.Token)
case <-time.After(100 * time.Millisecond):
t.Fatal("timeout waiting for snapshot")
}
// Update the proxy's port to 8888.
require.NoError(t, store.EnsureRegistration(0, &structs.RegisterRequest{
Node: nodeName,
Service: &structs.NodeService{
ID: serviceID.ID,
Service: "web-sidecar-proxy",
Port: 8888,
Kind: structs.ServiceKindConnectProxy,
Proxy: structs.ConnectProxyConfig{
Config: map[string]any{
"local_connect_timeout_ms": 123,
},
},
},
}))
// Expect Register to have been called again with the proxy's new port.
select {
case snap := <-snapCh:
require.Equal(t, 8888, snap.(*proxycfg.ConfigSnapshot).Port)
case <-time.After(100 * time.Millisecond):
t.Fatal("timeout waiting for snapshot")
}
// Update proxy-defaults.
require.NoError(t, store.EnsureConfigEntry(1, &structs.ProxyConfigEntry{
Name: structs.ProxyConfigGlobal,
Config: map[string]any{
"max_inbound_connections": 321,
},
}))
// Expect Register to have been called again with the new merged config.
select {
case snap := <-snapCh:
require.Equal(t, map[string]any{
"local_connect_timeout_ms": 123,
"max_inbound_connections": 321,
}, snap.(*proxycfg.ConfigSnapshot).Proxy.Config)
case <-time.After(100 * time.Millisecond):
t.Fatal("timeout waiting for snapshot")
}
// Start another watch.
_, termCh2, _, cancelWatch2, err := mgr.Watch(rtest.Resource(pbmesh.ProxyConfigurationType, serviceID.ID).ID(), nodeName, token)
require.NoError(t, err)
require.Equal(t, session2TermCh, termCh2)
// Expect the service to have not been re-registered by the second watch.
select {
case <-snapCh:
t.Fatal("service shouldn't have been re-registered")
case <-time.After(100 * time.Millisecond):
}
// Expect cancelling the first watch to *not* de-register the service.
cancelWatch1()
select {
case <-snapCh:
t.Fatal("service shouldn't have been de-registered until other watch went away")
case <-time.After(100 * time.Millisecond):
}
// Expect cancelling the other watch to de-register the service.
cancelWatch2()
select {
case _, ok := <-snapCh:
require.False(t, ok, "channel should've been closed")
case <-time.After(100 * time.Millisecond):
t.Fatal("timeout waiting for service to be de-registered")
}
session1.AssertCalled(t, "End")
session2.AssertCalled(t, "End")
}
func TestConfigSource_LocallyManagedService(t *testing.T) {
serviceID := structs.NewServiceID("web-sidecar-proxy-1", nil)
proxyID := rtest.Resource(pbmesh.ProxyConfigurationType, serviceID.ID).ID()
nodeName := "node-1"
token := "token"
localState := testLocalState(t)
localState.AddServiceWithChecks(&structs.NodeService{ID: serviceID.ID}, nil, "", false)
localWatcher := NewMockWatcher(t)
localWatcher.On("Watch", proxyID, nodeName, token).
Return(make(<-chan proxysnapshot.ProxySnapshot), nil, nil, proxysnapshot.CancelFunc(func() {}), nil)
mgr := NewConfigSource(Config{
NodeName: nodeName,
LocalState: localState,
LocalConfigSource: localWatcher,
Logger: hclog.NewNullLogger(),
GetStore: func() Store { panic("state store shouldn't have been used") },
SessionLimiter: nullSessionLimiter{},
})
t.Cleanup(mgr.Shutdown)
_, _, _, _, err := mgr.Watch(proxyID, nodeName, token)
require.NoError(t, err)
}
func TestConfigSource_ErrorRegisteringService(t *testing.T) {
serviceID := structs.NewServiceID("web-sidecar-proxy-1", nil)
nodeName := "node-name"
token := "token"
store := testStateStore(t)
require.NoError(t, store.EnsureRegistration(0, &structs.RegisterRequest{
Node: nodeName,
Service: &structs.NodeService{
ID: serviceID.ID,
Service: "web-sidecar-proxy",
Port: 9999,
Kind: structs.ServiceKindConnectProxy,
},
}))
var canceledWatch bool
cancel := proxysnapshot.CancelFunc(func() { canceledWatch = true })
cfgMgr := NewMockConfigManager(t)
cfgMgr.On("Watch", mock.Anything).
Return(make(<-chan proxysnapshot.ProxySnapshot), cancel)
cfgMgr.On("Register", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
Return(errors.New("KABOOM"))
session := newMockSession(t)
session.On("End").Return()
lim := NewMockSessionLimiter(t)
lim.On("BeginSession").Return(session, nil)
mgr := NewConfigSource(Config{
Manager: cfgMgr,
LocalState: testLocalState(t),
Logger: hclog.NewNullLogger(),
GetStore: func() Store { return store },
SessionLimiter: lim,
})
t.Cleanup(mgr.Shutdown)
_, _, _, _, err := mgr.Watch(rtest.Resource(pbmesh.ProxyConfigurationType, serviceID.ID).ID(), nodeName, token)
require.Error(t, err)
require.True(t, canceledWatch, "watch should've been canceled")
session.AssertCalled(t, "End")
}
func TestConfigSource_ErrorInSyncLoop(t *testing.T) {
serviceID := structs.NewServiceID("web-sidecar-proxy-1", nil)
nodeName := "node-name"
token := "token"
store := testStateStore(t)
// Register the proxy in the catalog/state store at port 9999.
require.NoError(t, store.EnsureRegistration(0, &structs.RegisterRequest{
Node: nodeName,
Service: &structs.NodeService{
ID: serviceID.ID,
Service: "web-sidecar-proxy",
Port: 9999,
Kind: structs.ServiceKindConnectProxy,
Proxy: structs.ConnectProxyConfig{
Config: map[string]any{
"local_connect_timeout_ms": 123,
},
},
},
}))
cfgMgr := NewMockConfigManager(t)
{
proxyID := proxycfg.ProxyID{
ServiceID: serviceID,
NodeName: nodeName,
Token: token,
}
snapCh := make(chan proxysnapshot.ProxySnapshot, 1)
cfgMgr.On("Watch", proxyID).
Return((<-chan proxysnapshot.ProxySnapshot)(snapCh), proxysnapshot.CancelFunc(func() {}), nil)
// Answer the register call successfully for session 1 starting (Repeatability = 1).
// Session 2 should not have caused a re-register to happen.
cfgMgr.On("Register", mock.Anything, mock.Anything, source, token, false).
Run(func(args mock.Arguments) {
id := args.Get(0).(proxycfg.ProxyID)
ns := args.Get(1).(*structs.NodeService)
snapCh <- &proxycfg.ConfigSnapshot{
ProxyID: id,
Port: ns.Port,
Proxy: ns.Proxy,
}
}).
Return(nil).
Repeatability = 1
// Error on subsequent registrations afterwards (during the sync loop).
cfgMgr.On("Register", mock.Anything, mock.Anything, source, token, false).
Return(fmt.Errorf("intentional registration error"))
cfgMgr.On("Deregister", proxyID, source).
Run(func(mock.Arguments) { close(snapCh) }).
Return()
}
lim := NewMockSessionLimiter(t)
session1TermCh := make(limiter.SessionTerminatedChan)
session2TermCh := make(limiter.SessionTerminatedChan)
{
session1 := newMockSession(t)
session1.On("Terminated").Return(session1TermCh)
session1.On("End").Return()
session2 := newMockSession(t)
session2.On("Terminated").Return(session2TermCh)
session2.On("End").Return()
lim.On("BeginSession").Return(session1, nil).Once()
lim.On("BeginSession").Return(session2, nil).Once()
}
mgr := NewConfigSource(Config{
Manager: cfgMgr,
LocalState: testLocalState(t),
Logger: hclog.NewNullLogger(),
GetStore: func() Store { return store },
SessionLimiter: lim,
})
t.Cleanup(mgr.Shutdown)
snapCh, termCh, cfgSrcTerminated1, cancelWatch1, err := mgr.Watch(rtest.Resource(pbmesh.ProxyConfigurationType, serviceID.ID).ID(), nodeName, token)
require.NoError(t, err)
require.Equal(t, session1TermCh, termCh)
// Expect Register to have been called with the proxy's inital port.
select {
case snap := <-snapCh:
require.Equal(t, 9999, snap.(*proxycfg.ConfigSnapshot).Port)
require.Equal(t, token, snap.(*proxycfg.ConfigSnapshot).ProxyID.Token)
case <-time.After(100 * time.Millisecond):
t.Fatal("timeout waiting for snapshot")
}
// Start another watch.
_, termCh2, cfgSrcTerminated2, cancelWatch2, err := mgr.Watch(rtest.Resource(pbmesh.ProxyConfigurationType, serviceID.ID).ID(), nodeName, token)
require.NoError(t, err)
require.Equal(t, session2TermCh, termCh2)
// Expect the service to have not been re-registered by the second watch.
select {
case <-snapCh:
t.Fatal("service shouldn't have been re-registered")
case <-time.After(100 * time.Millisecond):
}
// Ensure that no config-source syncLoops were terminated.
select {
case <-cfgSrcTerminated1:
t.Fatal("unexpected config-source termination 1")
case <-cfgSrcTerminated2:
t.Fatal("unexpected config-source termination 2")
default:
}
// Update the proxy's port to 8888.
// This should trigger the config-source syncLoop termination channel due to an error.
require.NoError(t, store.EnsureRegistration(0, &structs.RegisterRequest{
Node: nodeName,
Service: &structs.NodeService{
ID: serviceID.ID,
Service: "web-sidecar-proxy",
Port: 8888,
Kind: structs.ServiceKindConnectProxy,
Proxy: structs.ConnectProxyConfig{
Config: map[string]any{
"local_connect_timeout_ms": 123,
},
},
},
}))
// Expect both config sources to have terminated when the syncLoop errors.
select {
case _, ok := <-cfgSrcTerminated1:
cancelWatch1()
require.False(t, ok)
case <-time.After(100 * time.Millisecond):
t.Fatal("timeout waiting for config-source termination 1")
}
select {
case _, ok := <-cfgSrcTerminated2:
cancelWatch2()
require.False(t, ok)
case <-time.After(100 * time.Millisecond):
t.Fatal("timeout waiting for config-source termination 2")
}
// Expect the snap channels to have been closed.
select {
case _, ok := <-snapCh:
require.False(t, ok)
case <-time.After(100 * time.Millisecond):
t.Fatal("snap channel was not closed")
}
}
func TestConfigSource_NotProxyService(t *testing.T) {
serviceID := structs.NewServiceID("web", nil)
nodeName := "node-name"
token := "token"
store := testStateStore(t)
require.NoError(t, store.EnsureRegistration(0, &structs.RegisterRequest{
Node: nodeName,
Service: &structs.NodeService{
ID: serviceID.ID,
Service: "web",
Port: 9999,
Kind: structs.ServiceKindTypical,
},
}))
var canceledWatch bool
cancel := proxysnapshot.CancelFunc(func() { canceledWatch = true })
cfgMgr := NewMockConfigManager(t)
cfgMgr.On("Watch", mock.Anything).
Return(make(<-chan proxysnapshot.ProxySnapshot), cancel)
mgr := NewConfigSource(Config{
Manager: cfgMgr,
LocalState: testLocalState(t),
Logger: hclog.NewNullLogger(),
GetStore: func() Store { return store },
SessionLimiter: nullSessionLimiter{},
})
t.Cleanup(mgr.Shutdown)
_, _, _, _, err := mgr.Watch(rtest.Resource(pbmesh.ProxyConfigurationType, serviceID.ID).ID(), nodeName, token)
require.Error(t, err)
require.Contains(t, err.Error(), "must be a sidecar proxy or gateway")
require.True(t, canceledWatch, "watch should've been canceled")
}
func TestConfigSource_SessionLimiterError(t *testing.T) {
lim := NewMockSessionLimiter(t)
lim.On("BeginSession").Return(nil, limiter.ErrCapacityReached)
src := NewConfigSource(Config{
LocalState: testLocalState(t),
SessionLimiter: lim,
})
t.Cleanup(src.Shutdown)
_, _, _, _, err := src.Watch(
rtest.Resource(pbmesh.ProxyConfigurationType, "web-sidecar-proxy-1").ID(),
"node-name",
"token",
)
require.Equal(t, limiter.ErrCapacityReached, err)
}
func testConfigManager(t *testing.T, serviceID structs.ServiceID, nodeName string, token string) *MockConfigManager {
t.Helper()
cfgMgr := NewMockConfigManager(t)
proxyID := proxycfg.ProxyID{
ServiceID: serviceID,
NodeName: nodeName,
Token: token,
}
snapCh := make(chan proxysnapshot.ProxySnapshot, 1)
cfgMgr.On("Watch", proxyID).
Return((<-chan proxysnapshot.ProxySnapshot)(snapCh), proxysnapshot.CancelFunc(func() {}), nil)
cfgMgr.On("Register", mock.Anything, mock.Anything, source, token, false).
Run(func(args mock.Arguments) {
id := args.Get(0).(proxycfg.ProxyID)
ns := args.Get(1).(*structs.NodeService)
snapCh <- &proxycfg.ConfigSnapshot{
ProxyID: id,
Port: ns.Port,
Proxy: ns.Proxy,
}
}).
Return(nil)
cfgMgr.On("Deregister", proxyID, source).
Run(func(mock.Arguments) { close(snapCh) }).
Return()
return cfgMgr
}
func testStateStore(t *testing.T) *state.Store {
t.Helper()
gc, err := state.NewTombstoneGC(time.Second, time.Millisecond)
require.NoError(t, err)
return state.NewStateStoreWithEventPublisher(gc, stream.NoOpEventPublisher{})
}
func testLocalState(t *testing.T) *local.State {
t.Helper()
l := local.NewState(local.Config{}, hclog.NewNullLogger(), &token.Store{})
l.TriggerSyncChanges = func() {}
return l
}
type nullSessionLimiter struct{}
func (nullSessionLimiter) BeginSession() (limiter.Session, error) {
return nullSession{}, nil
}
func (nullSessionLimiter) Run(ctx context.Context) {}
type nullSession struct{}
func (nullSession) End() {}
func (nullSession) Terminated() limiter.SessionTerminatedChan { return nil }
type mockSession struct {
mock.Mock
}
func newMockSession(t *testing.T) *mockSession {
m := &mockSession{}
m.Mock.Test(t)
t.Cleanup(func() { m.AssertExpectations(t) })
return m
}
func (m *mockSession) End() { m.Called() }
func (m *mockSession) Terminated() limiter.SessionTerminatedChan {
return m.Called().Get(0).(limiter.SessionTerminatedChan)
}