You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
consul/agent/agent_test.go

4105 lines
107 KiB

package agent
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"strconv"
"strings"
"testing"
"time"
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/checks"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/sdk/freeport"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/go-uuid"
"github.com/pascaldekloe/goe/verify"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func getService(a *TestAgent, id string) *structs.NodeService {
return a.State.Service(structs.NewServiceID(id, nil))
}
func getCheck(a *TestAgent, id types.CheckID) *structs.HealthCheck {
return a.State.Check(structs.NewCheckID(id, nil))
}
func requireServiceExists(t *testing.T, a *TestAgent, id string) *structs.NodeService {
t.Helper()
svc := getService(a, id)
require.NotNil(t, svc, "missing service %q", id)
return svc
}
func requireServiceMissing(t *testing.T, a *TestAgent, id string) {
t.Helper()
require.Nil(t, getService(a, id), "have service %q (expected missing)", id)
}
func requireCheckExists(t *testing.T, a *TestAgent, id types.CheckID) *structs.HealthCheck {
t.Helper()
chk := getCheck(a, id)
require.NotNil(t, chk, "missing check %q", id)
return chk
}
func requireCheckMissing(t *testing.T, a *TestAgent, id types.CheckID) {
t.Helper()
require.Nil(t, getCheck(a, id), "have check %q (expected missing)", id)
}
func requireCheckExistsMap(t *testing.T, m interface{}, id types.CheckID) {
t.Helper()
require.Contains(t, m, structs.NewCheckID(id, nil), "missing check %q", id)
}
func requireCheckMissingMap(t *testing.T, m interface{}, id types.CheckID) {
t.Helper()
require.NotContains(t, m, structs.NewCheckID(id, nil), "have check %q (expected missing)", id)
}
func externalIP() (string, error) {
addrs, err := net.InterfaceAddrs()
if err != nil {
return "", fmt.Errorf("Unable to lookup network interfaces: %v", err)
}
for _, a := range addrs {
if ipnet, ok := a.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
if ipnet.IP.To4() != nil {
return ipnet.IP.String(), nil
}
}
}
return "", fmt.Errorf("Unable to find a non-loopback interface")
}
func TestAgent_MultiStartStop(t *testing.T) {
for i := 0; i < 10; i++ {
t.Run("", func(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
time.Sleep(250 * time.Millisecond)
a.Shutdown()
})
}
}
func TestAgent_ConnectClusterIDConfig(t *testing.T) {
tests := []struct {
name string
hcl string
wantClusterID string
wantErr bool
}{
{
name: "default TestAgent has fixed cluster id",
hcl: "",
wantClusterID: connect.TestClusterID,
},
{
name: "no cluster ID specified sets to test ID",
hcl: "connect { enabled = true }",
wantClusterID: connect.TestClusterID,
},
{
name: "non-UUID cluster_id is fatal",
hcl: `connect {
enabled = true
ca_config {
cluster_id = "fake-id"
}
}`,
wantClusterID: "",
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// This is a rare case where using a constructor for TestAgent
// (NewTestAgent and the likes) won't work, since we expect an error
// in one test case, and the constructors have built-in retry logic
// that runs automatically upon error.
a := &TestAgent{Name: tt.name, HCL: tt.hcl, LogOutput: testutil.TestWriter(t)}
err := a.Start()
if tt.wantErr {
if err == nil {
t.Fatal("expected error, got nil")
}
return // don't run the rest of the test
}
if !tt.wantErr && err != nil {
t.Fatal(err)
}
defer a.Shutdown()
cfg := a.consulConfig()
assert.Equal(t, tt.wantClusterID, cfg.CAConfig.ClusterID)
})
}
}
func TestAgent_StartStop(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
if err := a.Leave(); err != nil {
t.Fatalf("err: %v", err)
}
if err := a.Shutdown(); err != nil {
t.Fatalf("err: %v", err)
}
select {
case <-a.ShutdownCh():
default:
t.Fatalf("should be closed")
}
}
func TestAgent_RPCPing(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
var out struct{}
if err := a.RPC("Status.Ping", struct{}{}, &out); err != nil {
t.Fatalf("err: %v", err)
}
}
func TestAgent_TokenStore(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), `
acl_token = "user"
acl_agent_token = "agent"
acl_agent_master_token = "master"`,
)
defer a.Shutdown()
if got, want := a.tokens.UserToken(), "user"; got != want {
t.Fatalf("got %q want %q", got, want)
}
if got, want := a.tokens.AgentToken(), "agent"; got != want {
t.Fatalf("got %q want %q", got, want)
}
if got, want := a.tokens.IsAgentMasterToken("master"), true; got != want {
t.Fatalf("got %v want %v", got, want)
}
}
func TestAgent_ReconnectConfigSettings(t *testing.T) {
t.Parallel()
func() {
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
lan := a.consulConfig().SerfLANConfig.ReconnectTimeout
if lan != 3*24*time.Hour {
t.Fatalf("bad: %s", lan.String())
}
wan := a.consulConfig().SerfWANConfig.ReconnectTimeout
if wan != 3*24*time.Hour {
t.Fatalf("bad: %s", wan.String())
}
}()
func() {
a := NewTestAgent(t, t.Name(), `
reconnect_timeout = "24h"
reconnect_timeout_wan = "36h"
`)
defer a.Shutdown()
lan := a.consulConfig().SerfLANConfig.ReconnectTimeout
if lan != 24*time.Hour {
t.Fatalf("bad: %s", lan.String())
}
wan := a.consulConfig().SerfWANConfig.ReconnectTimeout
if wan != 36*time.Hour {
t.Fatalf("bad: %s", wan.String())
}
}()
}
func TestAgent_ReconnectConfigWanDisabled(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), `
ports { serf_wan = -1 }
reconnect_timeout_wan = "36h"
`)
defer a.Shutdown()
// This is also testing that we dont panic like before #4515
require.Nil(t, a.consulConfig().SerfWANConfig)
}
func TestAgent_setupNodeID(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), `
node_id = ""
`)
defer a.Shutdown()
cfg := a.config
// The auto-assigned ID should be valid.
id := a.consulConfig().NodeID
if _, err := uuid.ParseUUID(string(id)); err != nil {
t.Fatalf("err: %v", err)
}
// Running again should get the same ID (persisted in the file).
cfg.NodeID = ""
if err := a.setupNodeID(cfg); err != nil {
t.Fatalf("err: %v", err)
}
if newID := a.consulConfig().NodeID; id != newID {
t.Fatalf("bad: %q vs %q", id, newID)
}
// Set an invalid ID via.Config.
cfg.NodeID = types.NodeID("nope")
err := a.setupNodeID(cfg)
if err == nil || !strings.Contains(err.Error(), "uuid string is wrong length") {
t.Fatalf("err: %v", err)
}
// Set a valid ID via.Config.
newID, err := uuid.GenerateUUID()
if err != nil {
t.Fatalf("err: %v", err)
}
cfg.NodeID = types.NodeID(strings.ToUpper(newID))
if err := a.setupNodeID(cfg); err != nil {
t.Fatalf("err: %v", err)
}
if id := a.consulConfig().NodeID; string(id) != newID {
t.Fatalf("bad: %q vs. %q", id, newID)
}
// Set an invalid ID via the file.
fileID := filepath.Join(cfg.DataDir, "node-id")
if err := ioutil.WriteFile(fileID, []byte("adf4238a!882b!9ddc!4a9d!5b6758e4159e"), 0600); err != nil {
t.Fatalf("err: %v", err)
}
cfg.NodeID = ""
err = a.setupNodeID(cfg)
if err == nil || !strings.Contains(err.Error(), "uuid is improperly formatted") {
t.Fatalf("err: %v", err)
}
// Set a valid ID via the file.
if err := ioutil.WriteFile(fileID, []byte("ADF4238a-882b-9ddc-4a9d-5b6758e4159e"), 0600); err != nil {
t.Fatalf("err: %v", err)
}
cfg.NodeID = ""
if err := a.setupNodeID(cfg); err != nil {
t.Fatalf("err: %v", err)
}
if id := a.consulConfig().NodeID; string(id) != "adf4238a-882b-9ddc-4a9d-5b6758e4159e" {
t.Fatalf("bad: %q vs. %q", id, newID)
}
}
func TestAgent_makeNodeID(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), `
node_id = ""
`)
defer a.Shutdown()
// We should get a valid host-based ID initially.
id, err := a.makeNodeID()
if err != nil {
t.Fatalf("err: %v", err)
}
if _, err := uuid.ParseUUID(string(id)); err != nil {
t.Fatalf("err: %v", err)
}
// Calling again should yield a random ID by default.
another, err := a.makeNodeID()
if err != nil {
t.Fatalf("err: %v", err)
}
if id == another {
t.Fatalf("bad: %s vs %s", id, another)
}
// Turn on host-based IDs and try again. We should get the same ID
// each time (and a different one from the random one above).
a.Config.DisableHostNodeID = false
id, err = a.makeNodeID()
if err != nil {
t.Fatalf("err: %v", err)
}
if id == another {
t.Fatalf("bad: %s vs %s", id, another)
}
// Calling again should yield the host-based ID.
another, err = a.makeNodeID()
if err != nil {
t.Fatalf("err: %v", err)
}
if id != another {
t.Fatalf("bad: %s vs %s", id, another)
}
}
func TestAgent_AddService(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
testAgent_AddService(t, "")
})
t.Run("service manager", func(t *testing.T) {
t.Parallel()
testAgent_AddService(t, "enable_central_service_config = true")
})
}
func testAgent_AddService(t *testing.T, extraHCL string) {
t.Helper()
a := NewTestAgent(t, t.Name(), `
node_name = "node1"
`+extraHCL)
defer a.Shutdown()
tests := []struct {
desc string
srv *structs.NodeService
wantSrv func(ns *structs.NodeService)
chkTypes []*structs.CheckType
healthChks map[string]*structs.HealthCheck
}{
{
"one check",
&structs.NodeService{
ID: "svcid1",
Service: "svcname1",
Tags: []string{"tag1"},
Weights: nil, // nil weights...
Port: 8100,
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
// ... should be populated to avoid "IsSame" returning true during AE.
func(ns *structs.NodeService) {
ns.Weights = &structs.Weights{
Passing: 1,
Warning: 1,
}
},
[]*structs.CheckType{
&structs.CheckType{
CheckID: "check1",
Name: "name1",
TTL: time.Minute,
Notes: "note1",
},
},
map[string]*structs.HealthCheck{
"check1": &structs.HealthCheck{
Node: "node1",
CheckID: "check1",
Name: "name1",
Status: "critical",
Notes: "note1",
ServiceID: "svcid1",
ServiceName: "svcname1",
ServiceTags: []string{"tag1"},
Type: "ttl",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
},
},
{
"multiple checks",
&structs.NodeService{
ID: "svcid2",
Service: "svcname2",
Weights: &structs.Weights{
Passing: 2,
Warning: 1,
},
Tags: []string{"tag2"},
Port: 8200,
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
nil, // No change expected
[]*structs.CheckType{
&structs.CheckType{
CheckID: "check1",
Name: "name1",
TTL: time.Minute,
Notes: "note1",
},
&structs.CheckType{
CheckID: "check-noname",
TTL: time.Minute,
},
&structs.CheckType{
Name: "check-noid",
TTL: time.Minute,
},
&structs.CheckType{
TTL: time.Minute,
},
},
map[string]*structs.HealthCheck{
"check1": &structs.HealthCheck{
Node: "node1",
CheckID: "check1",
Name: "name1",
Status: "critical",
Notes: "note1",
ServiceID: "svcid2",
ServiceName: "svcname2",
ServiceTags: []string{"tag2"},
Type: "ttl",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
"check-noname": &structs.HealthCheck{
Node: "node1",
CheckID: "check-noname",
Name: "Service 'svcname2' check",
Status: "critical",
ServiceID: "svcid2",
ServiceName: "svcname2",
ServiceTags: []string{"tag2"},
Type: "ttl",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
"service:svcid2:3": &structs.HealthCheck{
Node: "node1",
CheckID: "service:svcid2:3",
Name: "check-noid",
Status: "critical",
ServiceID: "svcid2",
ServiceName: "svcname2",
ServiceTags: []string{"tag2"},
Type: "ttl",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
"service:svcid2:4": &structs.HealthCheck{
Node: "node1",
CheckID: "service:svcid2:4",
Name: "Service 'svcname2' check",
Status: "critical",
ServiceID: "svcid2",
ServiceName: "svcname2",
ServiceTags: []string{"tag2"},
Type: "ttl",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
},
},
},
}
for _, tt := range tests {
t.Run(tt.desc, func(t *testing.T) {
// check the service registration
t.Run(tt.srv.ID, func(t *testing.T) {
err := a.AddService(tt.srv, tt.chkTypes, false, "", ConfigSourceLocal)
if err != nil {
t.Fatalf("err: %v", err)
}
got := getService(a, tt.srv.ID)
// Make a copy since the tt.srv points to the one in memory in the local
// state still so changing it is a tautology!
want := *tt.srv
if tt.wantSrv != nil {
tt.wantSrv(&want)
}
require.Equal(t, &want, got)
require.True(t, got.IsSame(&want))
})
// check the health checks
for k, v := range tt.healthChks {
t.Run(k, func(t *testing.T) {
got := getCheck(a, types.CheckID(k))
require.Equal(t, v, got)
})
}
// check the ttl checks
for k := range tt.healthChks {
t.Run(k+" ttl", func(t *testing.T) {
chk := a.checkTTLs[structs.NewCheckID(types.CheckID(k), nil)]
if chk == nil {
t.Fatal("got nil want TTL check")
}
if got, want := string(chk.CheckID.ID), k; got != want {
t.Fatalf("got CheckID %v want %v", got, want)
}
if got, want := chk.TTL, time.Minute; got != want {
t.Fatalf("got TTL %v want %v", got, want)
}
})
}
})
}
}
func TestAgent_AddServices_AliasUpdateCheckNotReverted(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
testAgent_AddServices_AliasUpdateCheckNotReverted(t, "")
})
t.Run("service manager", func(t *testing.T) {
t.Parallel()
testAgent_AddServices_AliasUpdateCheckNotReverted(t, "enable_central_service_config = true")
})
}
func testAgent_AddServices_AliasUpdateCheckNotReverted(t *testing.T, extraHCL string) {
t.Helper()
a := NewTestAgent(t, t.Name(), `
node_name = "node1"
`+extraHCL)
defer a.Shutdown()
// It's tricky to get an UpdateCheck call to be timed properly so it lands
// right in the middle of an addServiceInternal call so we cheat a bit and
// rely upon alias checks to do that work for us. We add enough services
// that probabilistically one of them is going to end up properly in the
// critical section.
//
// The first number I picked here (10) surprisingly failed every time prior
// to PR #6144 solving the underlying problem.
const numServices = 10
services := make([]*structs.ServiceDefinition, numServices)
checkIDs := make([]types.CheckID, numServices)
for i := 0; i < numServices; i++ {
name := fmt.Sprintf("web-%d", i)
services[i] = &structs.ServiceDefinition{
ID: name,
Name: name,
Port: 8080 + i,
Checks: []*structs.CheckType{
&structs.CheckType{
Name: "alias-for-fake-service",
AliasService: "fake",
},
},
}
checkIDs[i] = types.CheckID("service:" + name)
}
// Add all of the services quickly as you might do from config file snippets.
for _, service := range services {
ns := service.NodeService()
chkTypes, err := service.CheckTypes()
require.NoError(t, err)
require.NoError(t, a.AddService(ns, chkTypes, false, service.Token, ConfigSourceLocal))
}
retry.Run(t, func(r *retry.R) {
gotChecks := a.State.Checks(nil)
for id, check := range gotChecks {
require.Equal(r, "passing", check.Status, "check %q is wrong", id)
require.Equal(r, "No checks found.", check.Output, "check %q is wrong", id)
}
})
}
func TestAgent_AddServiceNoExec(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
testAgent_AddServiceNoExec(t, "")
})
t.Run("service manager", func(t *testing.T) {
t.Parallel()
testAgent_AddServiceNoExec(t, "enable_central_service_config = true")
})
}
func testAgent_AddServiceNoExec(t *testing.T, extraHCL string) {
t.Helper()
a := NewTestAgent(t, t.Name(), `
node_name = "node1"
`+extraHCL)
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
srv := &structs.NodeService{
ID: "svcid1",
Service: "svcname1",
Tags: []string{"tag1"},
Port: 8100,
}
chk := &structs.CheckType{
ScriptArgs: []string{"exit", "0"},
Interval: 15 * time.Second,
}
err := a.AddService(srv, []*structs.CheckType{chk}, false, "", ConfigSourceLocal)
if err == nil || !strings.Contains(err.Error(), "Scripts are disabled on this agent") {
t.Fatalf("err: %v", err)
}
err = a.AddService(srv, []*structs.CheckType{chk}, false, "", ConfigSourceRemote)
if err == nil || !strings.Contains(err.Error(), "Scripts are disabled on this agent") {
t.Fatalf("err: %v", err)
}
}
func TestAgent_AddServiceNoRemoteExec(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
testAgent_AddServiceNoRemoteExec(t, "")
})
t.Run("service manager", func(t *testing.T) {
t.Parallel()
testAgent_AddServiceNoRemoteExec(t, "enable_central_service_config = true")
})
}
func testAgent_AddServiceNoRemoteExec(t *testing.T, extraHCL string) {
t.Helper()
a := NewTestAgent(t, t.Name(), `
node_name = "node1"
enable_local_script_checks = true
`+extraHCL)
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
srv := &structs.NodeService{
ID: "svcid1",
Service: "svcname1",
Tags: []string{"tag1"},
Port: 8100,
}
chk := &structs.CheckType{
ScriptArgs: []string{"exit", "0"},
Interval: 15 * time.Second,
}
err := a.AddService(srv, []*structs.CheckType{chk}, false, "", ConfigSourceRemote)
if err == nil || !strings.Contains(err.Error(), "Scripts are disabled on this agent") {
t.Fatalf("err: %v", err)
}
}
func TestAddServiceIPv4TaggedDefault(t *testing.T) {
t.Helper()
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
srv := &structs.NodeService{
Service: "my_service",
ID: "my_service_id",
Port: 8100,
Address: "10.0.1.2",
}
err := a.AddService(srv, []*structs.CheckType{}, false, "", ConfigSourceRemote)
require.Nil(t, err)
ns := a.State.Service(structs.NewServiceID("my_service_id", nil))
require.NotNil(t, ns)
svcAddr := structs.ServiceAddress{Address: srv.Address, Port: srv.Port}
require.Equal(t, svcAddr, ns.TaggedAddresses[structs.TaggedAddressLANIPv4])
require.Equal(t, svcAddr, ns.TaggedAddresses[structs.TaggedAddressWANIPv4])
_, ok := ns.TaggedAddresses[structs.TaggedAddressLANIPv6]
require.False(t, ok)
_, ok = ns.TaggedAddresses[structs.TaggedAddressWANIPv6]
require.False(t, ok)
}
func TestAddServiceIPv6TaggedDefault(t *testing.T) {
t.Helper()
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
srv := &structs.NodeService{
Service: "my_service",
ID: "my_service_id",
Port: 8100,
Address: "::5",
}
err := a.AddService(srv, []*structs.CheckType{}, false, "", ConfigSourceRemote)
require.Nil(t, err)
ns := a.State.Service(structs.NewServiceID("my_service_id", nil))
require.NotNil(t, ns)
svcAddr := structs.ServiceAddress{Address: srv.Address, Port: srv.Port}
require.Equal(t, svcAddr, ns.TaggedAddresses[structs.TaggedAddressLANIPv6])
require.Equal(t, svcAddr, ns.TaggedAddresses[structs.TaggedAddressWANIPv6])
_, ok := ns.TaggedAddresses[structs.TaggedAddressLANIPv4]
require.False(t, ok)
_, ok = ns.TaggedAddresses[structs.TaggedAddressWANIPv4]
require.False(t, ok)
}
func TestAddServiceIPv4TaggedSet(t *testing.T) {
t.Helper()
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
srv := &structs.NodeService{
Service: "my_service",
ID: "my_service_id",
Port: 8100,
Address: "10.0.1.2",
TaggedAddresses: map[string]structs.ServiceAddress{
structs.TaggedAddressWANIPv4: {
Address: "10.100.200.5",
Port: 8100,
},
},
}
err := a.AddService(srv, []*structs.CheckType{}, false, "", ConfigSourceRemote)
require.Nil(t, err)
ns := a.State.Service(structs.NewServiceID("my_service_id", nil))
require.NotNil(t, ns)
svcAddr := structs.ServiceAddress{Address: srv.Address, Port: srv.Port}
require.Equal(t, svcAddr, ns.TaggedAddresses[structs.TaggedAddressLANIPv4])
require.Equal(t, structs.ServiceAddress{Address: "10.100.200.5", Port: 8100}, ns.TaggedAddresses[structs.TaggedAddressWANIPv4])
_, ok := ns.TaggedAddresses[structs.TaggedAddressLANIPv6]
require.False(t, ok)
_, ok = ns.TaggedAddresses[structs.TaggedAddressWANIPv6]
require.False(t, ok)
}
func TestAddServiceIPv6TaggedSet(t *testing.T) {
t.Helper()
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
srv := &structs.NodeService{
Service: "my_service",
ID: "my_service_id",
Port: 8100,
Address: "::5",
TaggedAddresses: map[string]structs.ServiceAddress{
structs.TaggedAddressWANIPv6: {
Address: "::6",
Port: 8100,
},
},
}
err := a.AddService(srv, []*structs.CheckType{}, false, "", ConfigSourceRemote)
require.Nil(t, err)
ns := a.State.Service(structs.NewServiceID("my_service_id", nil))
require.NotNil(t, ns)
svcAddr := structs.ServiceAddress{Address: srv.Address, Port: srv.Port}
require.Equal(t, svcAddr, ns.TaggedAddresses[structs.TaggedAddressLANIPv6])
require.Equal(t, structs.ServiceAddress{Address: "::6", Port: 8100}, ns.TaggedAddresses[structs.TaggedAddressWANIPv6])
_, ok := ns.TaggedAddresses[structs.TaggedAddressLANIPv4]
require.False(t, ok)
_, ok = ns.TaggedAddresses[structs.TaggedAddressWANIPv4]
require.False(t, ok)
}
func TestAgent_RemoveService(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
testAgent_RemoveService(t, "")
})
t.Run("service manager", func(t *testing.T) {
t.Parallel()
testAgent_RemoveService(t, "enable_central_service_config = true")
})
}
func testAgent_RemoveService(t *testing.T, extraHCL string) {
t.Helper()
a := NewTestAgent(t, t.Name(), extraHCL)
defer a.Shutdown()
// Remove a service that doesn't exist
if err := a.RemoveService(structs.NewServiceID("redis", nil)); err != nil {
t.Fatalf("err: %v", err)
}
// Remove without an ID
if err := a.RemoveService(structs.NewServiceID("", nil)); err == nil {
t.Fatalf("should have errored")
}
// Removing a service with a single check works
{
srv := &structs.NodeService{
ID: "memcache",
Service: "memcache",
Port: 8000,
}
chkTypes := []*structs.CheckType{&structs.CheckType{TTL: time.Minute}}
if err := a.AddService(srv, chkTypes, false, "", ConfigSourceLocal); err != nil {
t.Fatalf("err: %v", err)
}
// Add a check after the fact with a specific check ID
check := &structs.CheckDefinition{
ID: "check2",
Name: "check2",
ServiceID: "memcache",
TTL: time.Minute,
}
hc := check.HealthCheck("node1")
if err := a.AddCheck(hc, check.CheckType(), false, "", ConfigSourceLocal); err != nil {
t.Fatalf("err: %s", err)
}
if err := a.RemoveService(structs.NewServiceID("memcache", nil)); err != nil {
t.Fatalf("err: %s", err)
}
require.Nil(t, a.State.Check(structs.NewCheckID("service:memcache", nil)), "have memcache check")
require.Nil(t, a.State.Check(structs.NewCheckID("check2", nil)), "have check2 check")
}
// Removing a service with multiple checks works
{
// add a service to remove
srv := &structs.NodeService{
ID: "redis",
Service: "redis",
Port: 8000,
}
chkTypes := []*structs.CheckType{
&structs.CheckType{TTL: time.Minute},
&structs.CheckType{TTL: 30 * time.Second},
}
if err := a.AddService(srv, chkTypes, false, "", ConfigSourceLocal); err != nil {
t.Fatalf("err: %v", err)
}
// add another service that wont be affected
srv = &structs.NodeService{
ID: "mysql",
Service: "mysql",
Port: 3306,
}
chkTypes = []*structs.CheckType{
&structs.CheckType{TTL: time.Minute},
&structs.CheckType{TTL: 30 * time.Second},
}
if err := a.AddService(srv, chkTypes, false, "", ConfigSourceLocal); err != nil {
t.Fatalf("err: %v", err)
}
// Remove the service
if err := a.RemoveService(structs.NewServiceID("redis", nil)); err != nil {
t.Fatalf("err: %v", err)
}
// Ensure we have a state mapping
requireServiceMissing(t, a, "redis")
// Ensure checks were removed
requireCheckMissing(t, a, "service:redis:1")
requireCheckMissing(t, a, "service:redis:2")
requireCheckMissingMap(t, a.checkTTLs, "service:redis:1")
requireCheckMissingMap(t, a.checkTTLs, "service:redis:2")
// check the mysql service is unnafected
requireCheckExistsMap(t, a.checkTTLs, "service:mysql:1")
requireCheckExists(t, a, "service:mysql:1")
requireCheckExistsMap(t, a.checkTTLs, "service:mysql:2")
requireCheckExists(t, a, "service:mysql:2")
}
}
func TestAgent_RemoveServiceRemovesAllChecks(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
testAgent_RemoveServiceRemovesAllChecks(t, "")
})
t.Run("service manager", func(t *testing.T) {
t.Parallel()
testAgent_RemoveServiceRemovesAllChecks(t, "enable_central_service_config = true")
})
}
func testAgent_RemoveServiceRemovesAllChecks(t *testing.T, extraHCL string) {
t.Helper()
a := NewTestAgent(t, t.Name(), `
node_name = "node1"
`+extraHCL)
defer a.Shutdown()
svc := &structs.NodeService{ID: "redis", Service: "redis", Port: 8000, EnterpriseMeta: *structs.DefaultEnterpriseMeta()}
chk1 := &structs.CheckType{CheckID: "chk1", Name: "chk1", TTL: time.Minute}
chk2 := &structs.CheckType{CheckID: "chk2", Name: "chk2", TTL: 2 * time.Minute}
hchk1 := &structs.HealthCheck{
Node: "node1",
CheckID: "chk1",
Name: "chk1",
Status: "critical",
ServiceID: "redis",
ServiceName: "redis",
Type: "ttl",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
}
hchk2 := &structs.HealthCheck{Node: "node1",
CheckID: "chk2",
Name: "chk2",
Status: "critical",
ServiceID: "redis",
ServiceName: "redis",
Type: "ttl",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
}
// register service with chk1
if err := a.AddService(svc, []*structs.CheckType{chk1}, false, "", ConfigSourceLocal); err != nil {
t.Fatal("Failed to register service", err)
}
// verify chk1 exists
requireCheckExists(t, a, "chk1")
// update the service with chk2
if err := a.AddService(svc, []*structs.CheckType{chk2}, false, "", ConfigSourceLocal); err != nil {
t.Fatal("Failed to update service", err)
}
// check that both checks are there
require.Equal(t, hchk1, getCheck(a, "chk1"))
require.Equal(t, hchk2, getCheck(a, "chk2"))
// Remove service
if err := a.RemoveService(structs.NewServiceID("redis", nil)); err != nil {
t.Fatal("Failed to remove service", err)
}
// Check that both checks are gone
requireCheckMissing(t, a, "chk1")
requireCheckMissing(t, a, "chk2")
}
// TestAgent_IndexChurn is designed to detect a class of issues where
// we would have unnecessary catalog churn from anti-entropy. See issues
// #3259, #3642, #3845, and #3866.
func TestAgent_IndexChurn(t *testing.T) {
t.Parallel()
t.Run("no tags", func(t *testing.T) {
verifyIndexChurn(t, nil)
})
t.Run("with tags", func(t *testing.T) {
verifyIndexChurn(t, []string{"foo", "bar"})
})
}
// verifyIndexChurn registers some things and runs anti-entropy a bunch of times
// in a row to make sure there are no index bumps.
func verifyIndexChurn(t *testing.T, tags []string) {
t.Helper()
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
weights := &structs.Weights{
Passing: 1,
Warning: 1,
}
// Ensure we have a leader before we start adding the services
testrpc.WaitForLeader(t, a.RPC, "dc1")
svc := &structs.NodeService{
ID: "redis",
Service: "redis",
Port: 8000,
Tags: tags,
Weights: weights,
}
if err := a.AddService(svc, nil, true, "", ConfigSourceLocal); err != nil {
t.Fatalf("err: %v", err)
}
chk := &structs.HealthCheck{
CheckID: "redis-check",
Name: "Service-level check",
ServiceID: "redis",
Status: api.HealthCritical,
}
chkt := &structs.CheckType{
TTL: time.Hour,
}
if err := a.AddCheck(chk, chkt, true, "", ConfigSourceLocal); err != nil {
t.Fatalf("err: %v", err)
}
chk = &structs.HealthCheck{
CheckID: "node-check",
Name: "Node-level check",
Status: api.HealthCritical,
}
chkt = &structs.CheckType{
TTL: time.Hour,
}
if err := a.AddCheck(chk, chkt, true, "", ConfigSourceLocal); err != nil {
t.Fatalf("err: %v", err)
}
if err := a.sync.State.SyncFull(); err != nil {
t.Fatalf("err: %v", err)
}
args := &structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "redis",
}
var before structs.IndexedCheckServiceNodes
// This sleep is so that the serfHealth check is added to the agent
// A value of 375ms is sufficient enough time to ensure the serfHealth
// check is added to an agent. 500ms so that we don't see flakiness ever.
time.Sleep(500 * time.Millisecond)
if err := a.RPC("Health.ServiceNodes", args, &before); err != nil {
t.Fatalf("err: %v", err)
}
for _, name := range before.Nodes[0].Checks {
a.logger.Debug("Registered node", "node", name.Name)
}
if got, want := len(before.Nodes), 1; got != want {
t.Fatalf("got %d want %d", got, want)
}
if got, want := len(before.Nodes[0].Checks), 3; /* incl. serfHealth */ got != want {
t.Fatalf("got %d want %d", got, want)
}
for i := 0; i < 10; i++ {
a.logger.Info("Sync in progress", "iteration", i+1)
if err := a.sync.State.SyncFull(); err != nil {
t.Fatalf("err: %v", err)
}
}
// If this test fails here this means that the Consul-X-Index
// has changed for the RPC, which means that idempotent ops
// are not working as intended.
var after structs.IndexedCheckServiceNodes
if err := a.RPC("Health.ServiceNodes", args, &after); err != nil {
t.Fatalf("err: %v", err)
}
verify.Values(t, "", after, before)
}
func TestAgent_AddCheck(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), `
enable_script_checks = true
`)
defer a.Shutdown()
health := &structs.HealthCheck{
Node: "foo",
CheckID: "mem",
Name: "memory util",
Status: api.HealthCritical,
}
chk := &structs.CheckType{
ScriptArgs: []string{"exit", "0"},
Interval: 15 * time.Second,
}
err := a.AddCheck(health, chk, false, "", ConfigSourceLocal)
if err != nil {
t.Fatalf("err: %v", err)
}
// Ensure we have a check mapping
sChk := requireCheckExists(t, a, "mem")
// Ensure our check is in the right state
if sChk.Status != api.HealthCritical {
t.Fatalf("check not critical")
}
// Ensure a TTL is setup
requireCheckExistsMap(t, a.checkMonitors, "mem")
}
func TestAgent_AddCheck_StartPassing(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), `
enable_script_checks = true
`)
defer a.Shutdown()
health := &structs.HealthCheck{
Node: "foo",
CheckID: "mem",
Name: "memory util",
Status: api.HealthPassing,
}
chk := &structs.CheckType{
ScriptArgs: []string{"exit", "0"},
Interval: 15 * time.Second,
}
err := a.AddCheck(health, chk, false, "", ConfigSourceLocal)
if err != nil {
t.Fatalf("err: %v", err)
}
// Ensure we have a check mapping
sChk := requireCheckExists(t, a, "mem")
// Ensure our check is in the right state
if sChk.Status != api.HealthPassing {
t.Fatalf("check not passing")
}
// Ensure a TTL is setup
requireCheckExistsMap(t, a.checkMonitors, "mem")
}
func TestAgent_AddCheck_MinInterval(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), `
enable_script_checks = true
`)
defer a.Shutdown()
health := &structs.HealthCheck{
Node: "foo",
CheckID: "mem",
Name: "memory util",
Status: api.HealthCritical,
}
chk := &structs.CheckType{
ScriptArgs: []string{"exit", "0"},
Interval: time.Microsecond,
}
err := a.AddCheck(health, chk, false, "", ConfigSourceLocal)
if err != nil {
t.Fatalf("err: %v", err)
}
// Ensure we have a check mapping
requireCheckExists(t, a, "mem")
// Ensure a TTL is setup
if mon, ok := a.checkMonitors[structs.NewCheckID("mem", nil)]; !ok {
t.Fatalf("missing mem monitor")
} else if mon.Interval != checks.MinInterval {
t.Fatalf("bad mem monitor interval")
}
}
func TestAgent_AddCheck_MissingService(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), `
enable_script_checks = true
`)
defer a.Shutdown()
health := &structs.HealthCheck{
Node: "foo",
CheckID: "baz",
Name: "baz check 1",
ServiceID: "baz",
}
chk := &structs.CheckType{
ScriptArgs: []string{"exit", "0"},
Interval: time.Microsecond,
}
err := a.AddCheck(health, chk, false, "", ConfigSourceLocal)
if err == nil || err.Error() != fmt.Sprintf("ServiceID %q does not exist", structs.ServiceIDString("baz", nil)) {
t.Fatalf("expected service id error, got: %v", err)
}
}
func TestAgent_AddCheck_RestoreState(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
// Create some state and persist it
ttl := &checks.CheckTTL{
CheckID: structs.NewCheckID("baz", nil),
TTL: time.Minute,
}
err := a.persistCheckState(ttl, api.HealthPassing, "yup")
if err != nil {
t.Fatalf("err: %s", err)
}
// Build and register the check definition and initial state
health := &structs.HealthCheck{
Node: "foo",
CheckID: "baz",
Name: "baz check 1",
}
chk := &structs.CheckType{
TTL: time.Minute,
}
err = a.AddCheck(health, chk, false, "", ConfigSourceLocal)
if err != nil {
t.Fatalf("err: %s", err)
}
// Ensure the check status was restored during registration
check := requireCheckExists(t, a, "baz")
if check.Status != api.HealthPassing {
t.Fatalf("bad: %#v", check)
}
if check.Output != "yup" {
t.Fatalf("bad: %#v", check)
}
}
func TestAgent_AddCheck_ExecDisable(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
health := &structs.HealthCheck{
Node: "foo",
CheckID: "mem",
Name: "memory util",
Status: api.HealthCritical,
}
chk := &structs.CheckType{
ScriptArgs: []string{"exit", "0"},
Interval: 15 * time.Second,
}
err := a.AddCheck(health, chk, false, "", ConfigSourceLocal)
if err == nil || !strings.Contains(err.Error(), "Scripts are disabled on this agent") {
t.Fatalf("err: %v", err)
}
// Ensure we don't have a check mapping
requireCheckMissing(t, a, "mem")
err = a.AddCheck(health, chk, false, "", ConfigSourceRemote)
if err == nil || !strings.Contains(err.Error(), "Scripts are disabled on this agent") {
t.Fatalf("err: %v", err)
}
// Ensure we don't have a check mapping
requireCheckMissing(t, a, "mem")
}
func TestAgent_AddCheck_ExecRemoteDisable(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), `
enable_local_script_checks = true
`)
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
health := &structs.HealthCheck{
Node: "foo",
CheckID: "mem",
Name: "memory util",
Status: api.HealthCritical,
}
chk := &structs.CheckType{
ScriptArgs: []string{"exit", "0"},
Interval: 15 * time.Second,
}
err := a.AddCheck(health, chk, false, "", ConfigSourceRemote)
if err == nil || !strings.Contains(err.Error(), "Scripts are disabled on this agent from remote calls") {
t.Fatalf("err: %v", err)
}
// Ensure we don't have a check mapping
requireCheckMissing(t, a, "mem")
}
func TestAgent_AddCheck_GRPC(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
health := &structs.HealthCheck{
Node: "foo",
CheckID: "grpchealth",
Name: "grpc health checking protocol",
Status: api.HealthCritical,
}
chk := &structs.CheckType{
GRPC: "localhost:12345/package.Service",
Interval: 15 * time.Second,
}
err := a.AddCheck(health, chk, false, "", ConfigSourceLocal)
if err != nil {
t.Fatalf("err: %v", err)
}
// Ensure we have a check mapping
sChk := requireCheckExists(t, a, "grpchealth")
// Ensure our check is in the right state
if sChk.Status != api.HealthCritical {
t.Fatalf("check not critical")
}
// Ensure a check is setup
requireCheckExistsMap(t, a.checkGRPCs, "grpchealth")
}
func TestAgent_RestoreServiceWithAliasCheck(t *testing.T) {
// t.Parallel() don't even think about making this parallel
// This test is very contrived and tests for the absence of race conditions
// related to the implementation of alias checks. As such it is slow,
// serial, full of sleeps and retries, and not generally a great test to
// run all of the time.
//
// That said it made it incredibly easy to root out various race conditions
// quite successfully.
//
// The original set of races was between:
//
// - agent startup reloading Services and Checks from disk
// - API requests to also re-register those same Services and Checks
// - the goroutines for the as-yet-to-be-stopped CheckAlias goroutines
if os.Getenv("SLOWTEST") != "1" {
t.Skip("skipping slow test; set SLOWTEST=1 to run")
return
}
// We do this so that the agent logs and the informational messages from
// the test itself are interwoven properly.
logf := func(t *testing.T, a *TestAgent, format string, args ...interface{}) {
a.logger.Info("testharness: " + fmt.Sprintf(format, args...))
}
dataDir := testutil.TempDir(t, "agent") // we manage the data dir
cfg := `
server = false
bootstrap = false
enable_central_service_config = false
data_dir = "` + dataDir + `"
`
a := NewTestAgentWithFields(t, true, TestAgent{HCL: cfg, DataDir: dataDir})
defer os.RemoveAll(dataDir)
defer a.Shutdown()
testCtx, testCancel := context.WithCancel(context.Background())
defer testCancel()
testHTTPServer, returnPort := launchHTTPCheckServer(t, testCtx)
defer func() {
testHTTPServer.Close()
returnPort()
}()
registerServicesAndChecks := func(t *testing.T, a *TestAgent) {
// add one persistent service with a simple check
require.NoError(t, a.AddService(
&structs.NodeService{
ID: "ping",
Service: "ping",
Port: 8000,
},
[]*structs.CheckType{
&structs.CheckType{
HTTP: testHTTPServer.URL,
Method: "GET",
Interval: 5 * time.Second,
Timeout: 1 * time.Second,
},
},
true, "", ConfigSourceLocal,
))
// add one persistent sidecar service with an alias check in the manner
// of how sidecar_service would add it
require.NoError(t, a.AddService(
&structs.NodeService{
ID: "ping-sidecar-proxy",
Service: "ping-sidecar-proxy",
Port: 9000,
},
[]*structs.CheckType{
&structs.CheckType{
Name: "Connect Sidecar Aliasing ping",
AliasService: "ping",
},
},
true, "", ConfigSourceLocal,
))
}
retryUntilCheckState := func(t *testing.T, a *TestAgent, checkID string, expectedStatus string) {
t.Helper()
retry.Run(t, func(r *retry.R) {
chk := requireCheckExists(t, a, types.CheckID(checkID))
if chk.Status != expectedStatus {
logf(t, a, "check=%q expected status %q but got %q", checkID, expectedStatus, chk.Status)
r.Fatalf("check=%q expected status %q but got %q", checkID, expectedStatus, chk.Status)
}
logf(t, a, "check %q has reached desired status %q", checkID, expectedStatus)
})
}
registerServicesAndChecks(t, a)
time.Sleep(1 * time.Second)
retryUntilCheckState(t, a, "service:ping", api.HealthPassing)
retryUntilCheckState(t, a, "service:ping-sidecar-proxy", api.HealthPassing)
logf(t, a, "==== POWERING DOWN ORIGINAL ====")
require.NoError(t, a.Shutdown())
time.Sleep(1 * time.Second)
futureHCL := cfg + `
node_id = "` + string(a.Config.NodeID) + `"
node_name = "` + a.Config.NodeName + `"
`
restartOnce := func(idx int, t *testing.T) {
t.Helper()
// Reload and retain former NodeID and data directory.
a2 := NewTestAgentWithFields(t, true, TestAgent{HCL: futureHCL, DataDir: dataDir})
defer a2.Shutdown()
a = nil
// reregister during standup; we use an adjustable timing to try and force a race
sleepDur := time.Duration(idx+1) * 500 * time.Millisecond
time.Sleep(sleepDur)
logf(t, a2, "re-registering checks and services after a delay of %v", sleepDur)
for i := 0; i < 20; i++ { // RACE RACE RACE!
registerServicesAndChecks(t, a2)
time.Sleep(50 * time.Millisecond)
}
time.Sleep(1 * time.Second)
retryUntilCheckState(t, a2, "service:ping", api.HealthPassing)
logf(t, a2, "giving the alias check a chance to notice...")
time.Sleep(5 * time.Second)
retryUntilCheckState(t, a2, "service:ping-sidecar-proxy", api.HealthPassing)
}
for i := 0; i < 20; i++ {
name := "restart-" + strconv.Itoa(i)
ok := t.Run(name, func(t *testing.T) {
restartOnce(i, t)
})
require.True(t, ok, name+" failed")
}
}
func launchHTTPCheckServer(t *testing.T, ctx context.Context) (srv *httptest.Server, returnPortsFn func()) {
ports := freeport.MustTake(1)
port := ports[0]
addr := net.JoinHostPort("127.0.0.1", strconv.Itoa(port))
var lc net.ListenConfig
listener, err := lc.Listen(ctx, "tcp", addr)
require.NoError(t, err)
handler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("OK\n"))
})
srv = &httptest.Server{
Listener: listener,
Config: &http.Server{Handler: handler},
}
srv.Start()
return srv, func() { freeport.Return(ports) }
}
func TestAgent_AddCheck_Alias(t *testing.T) {
t.Parallel()
require := require.New(t)
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
health := &structs.HealthCheck{
Node: "foo",
CheckID: "aliashealth",
Name: "Alias health check",
Status: api.HealthCritical,
}
chk := &structs.CheckType{
AliasService: "foo",
}
err := a.AddCheck(health, chk, false, "", ConfigSourceLocal)
require.NoError(err)
// Ensure we have a check mapping
sChk := requireCheckExists(t, a, "aliashealth")
require.Equal(api.HealthCritical, sChk.Status)
chkImpl, ok := a.checkAliases[structs.NewCheckID("aliashealth", nil)]
require.True(ok, "missing aliashealth check")
require.Equal("", chkImpl.RPCReq.Token)
cs := a.State.CheckState(structs.NewCheckID("aliashealth", nil))
require.NotNil(cs)
require.Equal("", cs.Token)
}
func TestAgent_AddCheck_Alias_setToken(t *testing.T) {
t.Parallel()
require := require.New(t)
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
health := &structs.HealthCheck{
Node: "foo",
CheckID: "aliashealth",
Name: "Alias health check",
Status: api.HealthCritical,
}
chk := &structs.CheckType{
AliasService: "foo",
}
err := a.AddCheck(health, chk, false, "foo", ConfigSourceLocal)
require.NoError(err)
cs := a.State.CheckState(structs.NewCheckID("aliashealth", nil))
require.NotNil(cs)
require.Equal("foo", cs.Token)
chkImpl, ok := a.checkAliases[structs.NewCheckID("aliashealth", nil)]
require.True(ok, "missing aliashealth check")
require.Equal("foo", chkImpl.RPCReq.Token)
}
func TestAgent_AddCheck_Alias_userToken(t *testing.T) {
t.Parallel()
require := require.New(t)
a := NewTestAgent(t, t.Name(), `
acl_token = "hello"
`)
defer a.Shutdown()
health := &structs.HealthCheck{
Node: "foo",
CheckID: "aliashealth",
Name: "Alias health check",
Status: api.HealthCritical,
}
chk := &structs.CheckType{
AliasService: "foo",
}
err := a.AddCheck(health, chk, false, "", ConfigSourceLocal)
require.NoError(err)
cs := a.State.CheckState(structs.NewCheckID("aliashealth", nil))
require.NotNil(cs)
require.Equal("", cs.Token) // State token should still be empty
chkImpl, ok := a.checkAliases[structs.NewCheckID("aliashealth", nil)]
require.True(ok, "missing aliashealth check")
require.Equal("hello", chkImpl.RPCReq.Token) // Check should use the token
}
func TestAgent_AddCheck_Alias_userAndSetToken(t *testing.T) {
t.Parallel()
require := require.New(t)
a := NewTestAgent(t, t.Name(), `
acl_token = "hello"
`)
defer a.Shutdown()
health := &structs.HealthCheck{
Node: "foo",
CheckID: "aliashealth",
Name: "Alias health check",
Status: api.HealthCritical,
}
chk := &structs.CheckType{
AliasService: "foo",
}
err := a.AddCheck(health, chk, false, "goodbye", ConfigSourceLocal)
require.NoError(err)
cs := a.State.CheckState(structs.NewCheckID("aliashealth", nil))
require.NotNil(cs)
require.Equal("goodbye", cs.Token)
chkImpl, ok := a.checkAliases[structs.NewCheckID("aliashealth", nil)]
require.True(ok, "missing aliashealth check")
require.Equal("goodbye", chkImpl.RPCReq.Token)
}
func TestAgent_RemoveCheck(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), `
enable_script_checks = true
`)
defer a.Shutdown()
// Remove check that doesn't exist
if err := a.RemoveCheck(structs.NewCheckID("mem", nil), false); err != nil {
t.Fatalf("err: %v", err)
}
// Remove without an ID
if err := a.RemoveCheck(structs.NewCheckID("", nil), false); err == nil {
t.Fatalf("should have errored")
}
health := &structs.HealthCheck{
Node: "foo",
CheckID: "mem",
Name: "memory util",
Status: api.HealthCritical,
}
chk := &structs.CheckType{
ScriptArgs: []string{"exit", "0"},
Interval: 15 * time.Second,
}
err := a.AddCheck(health, chk, false, "", ConfigSourceLocal)
if err != nil {
t.Fatalf("err: %v", err)
}
// Remove check
if err := a.RemoveCheck(structs.NewCheckID("mem", nil), false); err != nil {
t.Fatalf("err: %v", err)
}
// Ensure we have a check mapping
requireCheckMissing(t, a, "mem")
// Ensure a TTL is setup
requireCheckMissingMap(t, a.checkMonitors, "mem")
}
func TestAgent_HTTPCheck_TLSSkipVerify(t *testing.T) {
t.Parallel()
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, "GOOD")
})
server := httptest.NewTLSServer(handler)
defer server.Close()
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
health := &structs.HealthCheck{
Node: "foo",
CheckID: "tls",
Name: "tls check",
Status: api.HealthCritical,
}
chk := &structs.CheckType{
HTTP: server.URL,
Interval: 20 * time.Millisecond,
TLSSkipVerify: true,
}
err := a.AddCheck(health, chk, false, "", ConfigSourceLocal)
if err != nil {
t.Fatalf("err: %v", err)
}
retry.Run(t, func(r *retry.R) {
status := getCheck(a, "tls")
if status.Status != api.HealthPassing {
r.Fatalf("bad: %v", status.Status)
}
if !strings.Contains(status.Output, "GOOD") {
r.Fatalf("bad: %v", status.Output)
}
})
}
func TestAgent_HTTPCheck_EnableAgentTLSForChecks(t *testing.T) {
t.Parallel()
run := func(t *testing.T, ca string) {
a := NewTestAgentWithFields(t, true, TestAgent{
Name: t.Name(),
UseTLS: true,
HCL: `
enable_agent_tls_for_checks = true
verify_incoming = true
server_name = "consul.test"
key_file = "../test/client_certs/server.key"
cert_file = "../test/client_certs/server.crt"
` + ca,
})
defer a.Shutdown()
health := &structs.HealthCheck{
Node: "foo",
CheckID: "tls",
Name: "tls check",
Status: api.HealthCritical,
}
url := fmt.Sprintf("https://%s/v1/agent/self", a.srv.ln.Addr().String())
chk := &structs.CheckType{
HTTP: url,
Interval: 20 * time.Millisecond,
}
err := a.AddCheck(health, chk, false, "", ConfigSourceLocal)
if err != nil {
t.Fatalf("err: %v", err)
}
retry.Run(t, func(r *retry.R) {
status := getCheck(a, "tls")
if status.Status != api.HealthPassing {
r.Fatalf("bad: %v", status.Status)
}
if !strings.Contains(status.Output, "200 OK") {
r.Fatalf("bad: %v", status.Output)
}
})
}
// We need to test both methods of passing the CA info to ensure that
// we propagate all the fields correctly. All the other fields are
// covered by the HCL in the test run function.
tests := []struct {
desc string
config string
}{
{"ca_file", `ca_file = "../test/client_certs/rootca.crt"`},
{"ca_path", `ca_path = "../test/client_certs/path"`},
}
for _, tt := range tests {
t.Run(tt.desc, func(t *testing.T) {
run(t, tt.config)
})
}
}
func TestAgent_updateTTLCheck(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
checkBufSize := 100
health := &structs.HealthCheck{
Node: "foo",
CheckID: "mem",
Name: "memory util",
Status: api.HealthCritical,
}
chk := &structs.CheckType{
TTL: 15 * time.Second,
OutputMaxSize: checkBufSize,
}
// Add check and update it.
err := a.AddCheck(health, chk, false, "", ConfigSourceLocal)
if err != nil {
t.Fatalf("err: %v", err)
}
if err := a.updateTTLCheck(structs.NewCheckID("mem", nil), api.HealthPassing, "foo"); err != nil {
t.Fatalf("err: %v", err)
}
// Ensure we have a check mapping.
status := getCheck(a, "mem")
if status.Status != api.HealthPassing {
t.Fatalf("bad: %v", status)
}
if status.Output != "foo" {
t.Fatalf("bad: %v", status)
}
if err := a.updateTTLCheck(structs.NewCheckID("mem", nil), api.HealthCritical, strings.Repeat("--bad-- ", 5*checkBufSize)); err != nil {
t.Fatalf("err: %v", err)
}
// Ensure we have a check mapping.
status = getCheck(a, "mem")
if status.Status != api.HealthCritical {
t.Fatalf("bad: %v", status)
}
if len(status.Output) > checkBufSize*2 {
t.Fatalf("bad: %v", len(status.Output))
}
}
func TestAgent_PersistService(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
testAgent_PersistService(t, "")
})
t.Run("service manager", func(t *testing.T) {
t.Parallel()
testAgent_PersistService(t, "enable_central_service_config = true")
})
}
func testAgent_PersistService(t *testing.T, extraHCL string) {
t.Helper()
dataDir := testutil.TempDir(t, "agent") // we manage the data dir
defer os.RemoveAll(dataDir)
cfg := `
server = false
bootstrap = false
data_dir = "` + dataDir + `"
` + extraHCL
a := NewTestAgentWithFields(t, true, TestAgent{HCL: cfg, DataDir: dataDir})
defer a.Shutdown()
svc := &structs.NodeService{
ID: "redis",
Service: "redis",
Tags: []string{"foo"},
Port: 8000,
}
file := filepath.Join(a.Config.DataDir, servicesDir, stringHash(svc.ID))
// Check is not persisted unless requested
if err := a.AddService(svc, nil, false, "", ConfigSourceLocal); err != nil {
t.Fatalf("err: %v", err)
}
if _, err := os.Stat(file); err == nil {
t.Fatalf("should not persist")
}
// Persists to file if requested
if err := a.AddService(svc, nil, true, "mytoken", ConfigSourceLocal); err != nil {
t.Fatalf("err: %v", err)
}
if _, err := os.Stat(file); err != nil {
t.Fatalf("err: %s", err)
}
expected, err := json.Marshal(persistedService{
Token: "mytoken",
Service: svc,
Source: "local",
})
if err != nil {
t.Fatalf("err: %s", err)
}
content, err := ioutil.ReadFile(file)
if err != nil {
t.Fatalf("err: %s", err)
}
if !bytes.Equal(expected, content) {
t.Fatalf("bad: %s", string(content))
}
// Updates service definition on disk
svc.Port = 8001
if err := a.AddService(svc, nil, true, "mytoken", ConfigSourceLocal); err != nil {
t.Fatalf("err: %v", err)
}
expected, err = json.Marshal(persistedService{
Token: "mytoken",
Service: svc,
Source: "local",
})
if err != nil {
t.Fatalf("err: %s", err)
}
content, err = ioutil.ReadFile(file)
if err != nil {
t.Fatalf("err: %s", err)
}
if !bytes.Equal(expected, content) {
t.Fatalf("bad: %s", string(content))
}
a.Shutdown()
// Should load it back during later start
a2 := NewTestAgentWithFields(t, true, TestAgent{HCL: cfg, DataDir: dataDir})
defer a2.Shutdown()
restored := a2.State.ServiceState(structs.NewServiceID(svc.ID, nil))
if restored == nil {
t.Fatalf("service %q missing", svc.ID)
}
if got, want := restored.Token, "mytoken"; got != want {
t.Fatalf("got token %q want %q", got, want)
}
if got, want := restored.Service.Port, 8001; got != want {
t.Fatalf("got port %d want %d", got, want)
}
}
func TestAgent_persistedService_compat(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
testAgent_persistedService_compat(t, "")
})
t.Run("service manager", func(t *testing.T) {
t.Parallel()
testAgent_persistedService_compat(t, "enable_central_service_config = true")
})
}
func testAgent_persistedService_compat(t *testing.T, extraHCL string) {
t.Helper()
// Tests backwards compatibility of persisted services from pre-0.5.1
a := NewTestAgent(t, t.Name(), extraHCL)
defer a.Shutdown()
svc := &structs.NodeService{
ID: "redis",
Service: "redis",
Tags: []string{"foo"},
Port: 8000,
TaggedAddresses: map[string]structs.ServiceAddress{},
Weights: &structs.Weights{Passing: 1, Warning: 1},
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
}
// Encode the NodeService directly. This is what previous versions
// would serialize to the file (without the wrapper)
encoded, err := json.Marshal(svc)
if err != nil {
t.Fatalf("err: %s", err)
}
// Write the content to the file
file := filepath.Join(a.Config.DataDir, servicesDir, stringHash(svc.ID))
if err := os.MkdirAll(filepath.Dir(file), 0700); err != nil {
t.Fatalf("err: %s", err)
}
if err := ioutil.WriteFile(file, encoded, 0600); err != nil {
t.Fatalf("err: %s", err)
}
// Load the services
if err := a.loadServices(a.Config); err != nil {
t.Fatalf("err: %s", err)
}
// Ensure the service was restored
result := requireServiceExists(t, a, "redis")
require.Equal(t, svc, result)
}
func TestAgent_PurgeService(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
testAgent_PurgeService(t, "")
})
t.Run("service manager", func(t *testing.T) {
t.Parallel()
testAgent_PurgeService(t, "enable_central_service_config = true")
})
}
func testAgent_PurgeService(t *testing.T, extraHCL string) {
t.Helper()
a := NewTestAgent(t, t.Name(), extraHCL)
defer a.Shutdown()
svc := &structs.NodeService{
ID: "redis",
Service: "redis",
Tags: []string{"foo"},
Port: 8000,
}
file := filepath.Join(a.Config.DataDir, servicesDir, stringHash(svc.ID))
if err := a.AddService(svc, nil, true, "", ConfigSourceLocal); err != nil {
t.Fatalf("err: %v", err)
}
// Exists
if _, err := os.Stat(file); err != nil {
t.Fatalf("err: %s", err)
}
// Not removed
if err := a.removeService(structs.NewServiceID(svc.ID, nil), false); err != nil {
t.Fatalf("err: %s", err)
}
if _, err := os.Stat(file); err != nil {
t.Fatalf("err: %s", err)
}
// Re-add the service
if err := a.AddService(svc, nil, true, "", ConfigSourceLocal); err != nil {
t.Fatalf("err: %v", err)
}
// Removed
if err := a.removeService(structs.NewServiceID(svc.ID, nil), true); err != nil {
t.Fatalf("err: %s", err)
}
if _, err := os.Stat(file); !os.IsNotExist(err) {
t.Fatalf("bad: %#v", err)
}
}
func TestAgent_PurgeServiceOnDuplicate(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
testAgent_PurgeServiceOnDuplicate(t, "")
})
t.Run("service manager", func(t *testing.T) {
t.Parallel()
testAgent_PurgeServiceOnDuplicate(t, "enable_central_service_config = true")
})
}
func testAgent_PurgeServiceOnDuplicate(t *testing.T, extraHCL string) {
t.Helper()
dataDir := testutil.TempDir(t, "agent") // we manage the data dir
defer os.RemoveAll(dataDir)
cfg := `
data_dir = "` + dataDir + `"
server = false
bootstrap = false
` + extraHCL
a := NewTestAgentWithFields(t, true, TestAgent{HCL: cfg, DataDir: dataDir})
defer a.Shutdown()
svc1 := &structs.NodeService{
ID: "redis",
Service: "redis",
Tags: []string{"foo"},
Port: 8000,
}
// First persist the service
require.NoError(t, a.AddService(svc1, nil, true, "", ConfigSourceLocal))
a.Shutdown()
// Try bringing the agent back up with the service already
// existing in the config
a2 := NewTestAgentWithFields(t, true, TestAgent{Name: t.Name() + "-a2", HCL: cfg + `
service = {
id = "redis"
name = "redis"
tags = ["bar"]
port = 9000
}
`, DataDir: dataDir})
defer a2.Shutdown()
sid := svc1.CompoundServiceID()
file := filepath.Join(a.Config.DataDir, servicesDir, sid.StringHash())
_, err := os.Stat(file)
require.Error(t, err, "should have removed persisted service")
result := requireServiceExists(t, a, "redis")
require.NotEqual(t, []string{"bar"}, result.Tags)
require.NotEqual(t, 9000, result.Port)
}
func TestAgent_PersistCheck(t *testing.T) {
t.Parallel()
dataDir := testutil.TempDir(t, "agent") // we manage the data dir
cfg := `
data_dir = "` + dataDir + `"
server = false
bootstrap = false
enable_script_checks = true
`
a := NewTestAgentWithFields(t, true, TestAgent{HCL: cfg, DataDir: dataDir})
defer os.RemoveAll(dataDir)
defer a.Shutdown()
check := &structs.HealthCheck{
Node: a.config.NodeName,
CheckID: "mem",
Name: "memory check",
Status: api.HealthPassing,
}
chkType := &structs.CheckType{
ScriptArgs: []string{"/bin/true"},
Interval: 10 * time.Second,
}
cid := check.CompoundCheckID()
file := filepath.Join(a.Config.DataDir, checksDir, cid.StringHash())
// Not persisted if not requested
require.NoError(t, a.AddCheck(check, chkType, false, "", ConfigSourceLocal))
_, err := os.Stat(file)
require.Error(t, err, "should not persist")
// Should persist if requested
require.NoError(t, a.AddCheck(check, chkType, true, "mytoken", ConfigSourceLocal))
_, err = os.Stat(file)
require.NoError(t, err)
expected, err := json.Marshal(persistedCheck{
Check: check,
ChkType: chkType,
Token: "mytoken",
Source: "local",
})
require.NoError(t, err)
content, err := ioutil.ReadFile(file)
require.NoError(t, err)
require.Equal(t, expected, content)
// Updates the check definition on disk
check.Name = "mem1"
require.NoError(t, a.AddCheck(check, chkType, true, "mytoken", ConfigSourceLocal))
expected, err = json.Marshal(persistedCheck{
Check: check,
ChkType: chkType,
Token: "mytoken",
Source: "local",
})
require.NoError(t, err)
content, err = ioutil.ReadFile(file)
require.NoError(t, err)
require.Equal(t, expected, content)
a.Shutdown()
// Should load it back during later start
a2 := NewTestAgentWithFields(t, true, TestAgent{Name: t.Name() + "-a2", HCL: cfg, DataDir: dataDir})
defer a2.Shutdown()
result := requireCheckExists(t, a2, check.CheckID)
require.Equal(t, api.HealthCritical, result.Status)
require.Equal(t, "mem1", result.Name)
// Should have restored the monitor
requireCheckExistsMap(t, a2.checkMonitors, check.CheckID)
chkState := a2.State.CheckState(structs.NewCheckID(check.CheckID, nil))
require.NotNil(t, chkState)
require.Equal(t, "mytoken", chkState.Token)
}
func TestAgent_PurgeCheck(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
check := &structs.HealthCheck{
Node: a.Config.NodeName,
CheckID: "mem",
Name: "memory check",
Status: api.HealthPassing,
}
file := filepath.Join(a.Config.DataDir, checksDir, checkIDHash(check.CheckID))
if err := a.AddCheck(check, nil, true, "", ConfigSourceLocal); err != nil {
t.Fatalf("err: %v", err)
}
// Not removed
if err := a.RemoveCheck(structs.NewCheckID(check.CheckID, nil), false); err != nil {
t.Fatalf("err: %s", err)
}
if _, err := os.Stat(file); err != nil {
t.Fatalf("err: %s", err)
}
// Removed
if err := a.RemoveCheck(structs.NewCheckID(check.CheckID, nil), true); err != nil {
t.Fatalf("err: %s", err)
}
if _, err := os.Stat(file); !os.IsNotExist(err) {
t.Fatalf("bad: %#v", err)
}
}
func TestAgent_PurgeCheckOnDuplicate(t *testing.T) {
t.Parallel()
nodeID := NodeID()
dataDir := testutil.TempDir(t, "agent")
a := NewTestAgentWithFields(t, true, TestAgent{
Name: t.Name(),
DataDir: dataDir,
HCL: `
node_id = "` + nodeID + `"
node_name = "Node ` + nodeID + `"
data_dir = "` + dataDir + `"
server = false
bootstrap = false
enable_script_checks = true
`})
defer os.RemoveAll(dataDir)
defer a.Shutdown()
check1 := &structs.HealthCheck{
Node: a.Config.NodeName,
CheckID: "mem",
Name: "memory check",
Status: api.HealthPassing,
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
}
// First persist the check
if err := a.AddCheck(check1, nil, true, "", ConfigSourceLocal); err != nil {
t.Fatalf("err: %v", err)
}
a.Shutdown()
// Start again with the check registered in config
a2 := NewTestAgentWithFields(t, true, TestAgent{
Name: t.Name() + "-a2",
DataDir: dataDir,
HCL: `
node_id = "` + nodeID + `"
node_name = "Node ` + nodeID + `"
data_dir = "` + dataDir + `"
server = false
bootstrap = false
enable_script_checks = true
check = {
id = "mem"
name = "memory check"
notes = "my cool notes"
args = ["/bin/check-redis.py"]
interval = "30s"
}
`})
defer a2.Shutdown()
cid := check1.CompoundCheckID()
file := filepath.Join(dataDir, checksDir, cid.StringHash())
if _, err := os.Stat(file); err == nil {
t.Fatalf("should have removed persisted check")
}
result := requireCheckExists(t, a2, "mem")
expected := &structs.HealthCheck{
Node: a2.Config.NodeName,
CheckID: "mem",
Name: "memory check",
Status: api.HealthCritical,
Notes: "my cool notes",
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
}
require.Equal(t, expected, result)
}
func TestAgent_loadChecks_token(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), `
check = {
id = "rabbitmq"
name = "rabbitmq"
token = "abc123"
ttl = "10s"
}
`)
defer a.Shutdown()
requireCheckExists(t, a, "rabbitmq")
require.Equal(t, "abc123", a.State.CheckToken(structs.NewCheckID("rabbitmq", nil)))
}
func TestAgent_unloadChecks(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
// First register a service
svc := &structs.NodeService{
ID: "redis",
Service: "redis",
Tags: []string{"foo"},
Port: 8000,
}
if err := a.AddService(svc, nil, false, "", ConfigSourceLocal); err != nil {
t.Fatalf("err: %v", err)
}
// Register a check
check1 := &structs.HealthCheck{
Node: a.Config.NodeName,
CheckID: "service:redis",
Name: "redischeck",
Status: api.HealthPassing,
ServiceID: "redis",
ServiceName: "redis",
}
if err := a.AddCheck(check1, nil, false, "", ConfigSourceLocal); err != nil {
t.Fatalf("err: %s", err)
}
requireCheckExists(t, a, check1.CheckID)
// Unload all of the checks
if err := a.unloadChecks(); err != nil {
t.Fatalf("err: %s", err)
}
// Make sure it was unloaded
requireCheckMissing(t, a, check1.CheckID)
}
func TestAgent_loadServices_token(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
testAgent_loadServices_token(t, "")
})
t.Run("service manager", func(t *testing.T) {
t.Parallel()
testAgent_loadServices_token(t, "enable_central_service_config = true")
})
}
func testAgent_loadServices_token(t *testing.T, extraHCL string) {
t.Helper()
a := NewTestAgent(t, t.Name(), `
service = {
id = "rabbitmq"
name = "rabbitmq"
port = 5672
token = "abc123"
}
`+extraHCL)
defer a.Shutdown()
requireServiceExists(t, a, "rabbitmq")
if token := a.State.ServiceToken(structs.NewServiceID("rabbitmq", nil)); token != "abc123" {
t.Fatalf("bad: %s", token)
}
}
func TestAgent_loadServices_sidecar(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
testAgent_loadServices_sidecar(t, "")
})
t.Run("service manager", func(t *testing.T) {
t.Parallel()
testAgent_loadServices_sidecar(t, "enable_central_service_config = true")
})
}
func testAgent_loadServices_sidecar(t *testing.T, extraHCL string) {
t.Helper()
a := NewTestAgent(t, t.Name(), `
service = {
id = "rabbitmq"
name = "rabbitmq"
port = 5672
token = "abc123"
connect = {
sidecar_service {}
}
}
`+extraHCL)
defer a.Shutdown()
svc := requireServiceExists(t, a, "rabbitmq")
if token := a.State.ServiceToken(structs.NewServiceID("rabbitmq", nil)); token != "abc123" {
t.Fatalf("bad: %s", token)
}
requireServiceExists(t, a, "rabbitmq-sidecar-proxy")
if token := a.State.ServiceToken(structs.NewServiceID("rabbitmq-sidecar-proxy", nil)); token != "abc123" {
t.Fatalf("bad: %s", token)
}
// Sanity check rabbitmq service should NOT have sidecar info in state since
// it's done it's job and should be a registration syntax sugar only.
assert.Nil(t, svc.Connect.SidecarService)
}
func TestAgent_loadServices_sidecarSeparateToken(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
testAgent_loadServices_sidecarSeparateToken(t, "")
})
t.Run("service manager", func(t *testing.T) {
t.Parallel()
testAgent_loadServices_sidecarSeparateToken(t, "enable_central_service_config = true")
})
}
func testAgent_loadServices_sidecarSeparateToken(t *testing.T, extraHCL string) {
t.Helper()
a := NewTestAgent(t, t.Name(), `
service = {
id = "rabbitmq"
name = "rabbitmq"
port = 5672
token = "abc123"
connect = {
sidecar_service {
token = "789xyz"
}
}
}
`+extraHCL)
defer a.Shutdown()
requireServiceExists(t, a, "rabbitmq")
if token := a.State.ServiceToken(structs.NewServiceID("rabbitmq", nil)); token != "abc123" {
t.Fatalf("bad: %s", token)
}
requireServiceExists(t, a, "rabbitmq-sidecar-proxy")
if token := a.State.ServiceToken(structs.NewServiceID("rabbitmq-sidecar-proxy", nil)); token != "789xyz" {
t.Fatalf("bad: %s", token)
}
}
func TestAgent_loadServices_sidecarInheritMeta(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
testAgent_loadServices_sidecarInheritMeta(t, "")
})
t.Run("service manager", func(t *testing.T) {
t.Parallel()
testAgent_loadServices_sidecarInheritMeta(t, "enable_central_service_config = true")
})
}
func testAgent_loadServices_sidecarInheritMeta(t *testing.T, extraHCL string) {
t.Helper()
a := NewTestAgent(t, t.Name(), `
service = {
id = "rabbitmq"
name = "rabbitmq"
port = 5672
tags = ["a", "b"],
meta = {
environment = "prod"
}
connect = {
sidecar_service {
}
}
}
`+extraHCL)
defer a.Shutdown()
svc := requireServiceExists(t, a, "rabbitmq")
require.Len(t, svc.Tags, 2)
require.Len(t, svc.Meta, 1)
sidecar := requireServiceExists(t, a, "rabbitmq-sidecar-proxy")
require.ElementsMatch(t, svc.Tags, sidecar.Tags)
require.Len(t, sidecar.Meta, 1)
meta, ok := sidecar.Meta["environment"]
require.True(t, ok, "missing sidecar service meta")
require.Equal(t, "prod", meta)
}
func TestAgent_loadServices_sidecarOverrideMeta(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
testAgent_loadServices_sidecarOverrideMeta(t, "")
})
t.Run("service manager", func(t *testing.T) {
t.Parallel()
testAgent_loadServices_sidecarOverrideMeta(t, "enable_central_service_config = true")
})
}
func testAgent_loadServices_sidecarOverrideMeta(t *testing.T, extraHCL string) {
t.Helper()
a := NewTestAgent(t, t.Name(), `
service = {
id = "rabbitmq"
name = "rabbitmq"
port = 5672
tags = ["a", "b"],
meta = {
environment = "prod"
}
connect = {
sidecar_service {
tags = ["foo"],
meta = {
environment = "qa"
}
}
}
}
`+extraHCL)
defer a.Shutdown()
svc := requireServiceExists(t, a, "rabbitmq")
require.Len(t, svc.Tags, 2)
require.Len(t, svc.Meta, 1)
sidecar := requireServiceExists(t, a, "rabbitmq-sidecar-proxy")
require.Len(t, sidecar.Tags, 1)
require.Equal(t, "foo", sidecar.Tags[0])
require.Len(t, sidecar.Meta, 1)
meta, ok := sidecar.Meta["environment"]
require.True(t, ok, "missing sidecar service meta")
require.Equal(t, "qa", meta)
}
func TestAgent_unloadServices(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
testAgent_unloadServices(t, "")
})
t.Run("service manager", func(t *testing.T) {
t.Parallel()
testAgent_unloadServices(t, "enable_central_service_config = true")
})
}
func testAgent_unloadServices(t *testing.T, extraHCL string) {
t.Helper()
a := NewTestAgent(t, t.Name(), extraHCL)
defer a.Shutdown()
svc := &structs.NodeService{
ID: "redis",
Service: "redis",
Tags: []string{"foo"},
Port: 8000,
}
// Register the service
if err := a.AddService(svc, nil, false, "", ConfigSourceLocal); err != nil {
t.Fatalf("err: %v", err)
}
requireServiceExists(t, a, svc.ID)
// Unload all services
if err := a.unloadServices(); err != nil {
t.Fatalf("err: %s", err)
}
if len(a.State.Services(structs.WildcardEnterpriseMeta())) != 0 {
t.Fatalf("should have unloaded services")
}
}
func TestAgent_Service_MaintenanceMode(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
svc := &structs.NodeService{
ID: "redis",
Service: "redis",
Tags: []string{"foo"},
Port: 8000,
}
// Register the service
if err := a.AddService(svc, nil, false, "", ConfigSourceLocal); err != nil {
t.Fatalf("err: %v", err)
}
sid := structs.NewServiceID("redis", nil)
// Enter maintenance mode for the service
if err := a.EnableServiceMaintenance(sid, "broken", "mytoken"); err != nil {
t.Fatalf("err: %s", err)
}
// Make sure the critical health check was added
checkID := serviceMaintCheckID(sid)
check := a.State.Check(checkID)
if check == nil {
t.Fatalf("should have registered critical maintenance check")
}
// Check that the token was used to register the check
if token := a.State.CheckToken(checkID); token != "mytoken" {
t.Fatalf("expected 'mytoken', got: '%s'", token)
}
// Ensure the reason was set in notes
if check.Notes != "broken" {
t.Fatalf("bad: %#v", check)
}
// Leave maintenance mode
if err := a.DisableServiceMaintenance(sid); err != nil {
t.Fatalf("err: %s", err)
}
// Ensure the check was deregistered
if found := a.State.Check(checkID); found != nil {
t.Fatalf("should have deregistered maintenance check")
}
// Enter service maintenance mode without providing a reason
if err := a.EnableServiceMaintenance(sid, "", ""); err != nil {
t.Fatalf("err: %s", err)
}
// Ensure the check was registered with the default notes
check = a.State.Check(checkID)
if check == nil {
t.Fatalf("should have registered critical check")
}
if check.Notes != defaultServiceMaintReason {
t.Fatalf("bad: %#v", check)
}
}
func TestAgent_Service_Reap(t *testing.T) {
// t.Parallel() // timing test. no parallel
a := NewTestAgent(t, t.Name(), `
check_reap_interval = "50ms"
check_deregister_interval_min = "0s"
`)
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
svc := &structs.NodeService{
ID: "redis",
Service: "redis",
Tags: []string{"foo"},
Port: 8000,
}
chkTypes := []*structs.CheckType{
&structs.CheckType{
Status: api.HealthPassing,
TTL: 25 * time.Millisecond,
DeregisterCriticalServiceAfter: 200 * time.Millisecond,
},
}
// Register the service.
if err := a.AddService(svc, chkTypes, false, "", ConfigSourceLocal); err != nil {
t.Fatalf("err: %v", err)
}
// Make sure it's there and there's no critical check yet.
requireServiceExists(t, a, "redis")
require.Len(t, a.State.CriticalCheckStates(structs.WildcardEnterpriseMeta()), 0, "should not have critical checks")
// Wait for the check TTL to fail but before the check is reaped.
time.Sleep(100 * time.Millisecond)
requireServiceExists(t, a, "redis")
require.Len(t, a.State.CriticalCheckStates(nil), 1, "should have 1 critical check")
// Pass the TTL.
if err := a.updateTTLCheck(structs.NewCheckID("service:redis", nil), api.HealthPassing, "foo"); err != nil {
t.Fatalf("err: %v", err)
}
requireServiceExists(t, a, "redis")
require.Len(t, a.State.CriticalCheckStates(structs.WildcardEnterpriseMeta()), 0, "should not have critical checks")
// Wait for the check TTL to fail again.
time.Sleep(100 * time.Millisecond)
requireServiceExists(t, a, "redis")
require.Len(t, a.State.CriticalCheckStates(structs.WildcardEnterpriseMeta()), 1, "should have 1 critical check")
// Wait for the reap.
time.Sleep(400 * time.Millisecond)
requireServiceMissing(t, a, "redis")
require.Len(t, a.State.CriticalCheckStates(structs.WildcardEnterpriseMeta()), 0, "should not have critical checks")
}
func TestAgent_Service_NoReap(t *testing.T) {
// t.Parallel() // timing test. no parallel
a := NewTestAgent(t, t.Name(), `
check_reap_interval = "50ms"
check_deregister_interval_min = "0s"
`)
defer a.Shutdown()
svc := &structs.NodeService{
ID: "redis",
Service: "redis",
Tags: []string{"foo"},
Port: 8000,
}
chkTypes := []*structs.CheckType{
&structs.CheckType{
Status: api.HealthPassing,
TTL: 25 * time.Millisecond,
},
}
// Register the service.
if err := a.AddService(svc, chkTypes, false, "", ConfigSourceLocal); err != nil {
t.Fatalf("err: %v", err)
}
// Make sure it's there and there's no critical check yet.
requireServiceExists(t, a, "redis")
require.Len(t, a.State.CriticalCheckStates(structs.WildcardEnterpriseMeta()), 0)
// Wait for the check TTL to fail.
time.Sleep(200 * time.Millisecond)
requireServiceExists(t, a, "redis")
require.Len(t, a.State.CriticalCheckStates(structs.WildcardEnterpriseMeta()), 1)
// Wait a while and make sure it doesn't reap.
time.Sleep(200 * time.Millisecond)
requireServiceExists(t, a, "redis")
require.Len(t, a.State.CriticalCheckStates(structs.WildcardEnterpriseMeta()), 1)
}
func TestAgent_AddService_restoresSnapshot(t *testing.T) {
t.Run("normal", func(t *testing.T) {
t.Parallel()
testAgent_AddService_restoresSnapshot(t, "")
})
t.Run("service manager", func(t *testing.T) {
t.Parallel()
testAgent_AddService_restoresSnapshot(t, "enable_central_service_config = true")
})
}
func testAgent_AddService_restoresSnapshot(t *testing.T, extraHCL string) {
t.Helper()
a := NewTestAgent(t, t.Name(), extraHCL)
defer a.Shutdown()
// First register a service
svc := &structs.NodeService{
ID: "redis",
Service: "redis",
Tags: []string{"foo"},
Port: 8000,
}
require.NoError(t, a.AddService(svc, nil, false, "", ConfigSourceLocal))
// Register a check
check1 := &structs.HealthCheck{
Node: a.Config.NodeName,
CheckID: "service:redis",
Name: "redischeck",
Status: api.HealthPassing,
ServiceID: "redis",
ServiceName: "redis",
}
require.NoError(t, a.AddCheck(check1, nil, false, "", ConfigSourceLocal))
// Re-registering the service preserves the state of the check
chkTypes := []*structs.CheckType{&structs.CheckType{TTL: 30 * time.Second}}
require.NoError(t, a.AddService(svc, chkTypes, false, "", ConfigSourceLocal))
check := requireCheckExists(t, a, "service:redis")
require.Equal(t, api.HealthPassing, check.Status)
}
func TestAgent_AddCheck_restoresSnapshot(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
// First register a service
svc := &structs.NodeService{
ID: "redis",
Service: "redis",
Tags: []string{"foo"},
Port: 8000,
}
if err := a.AddService(svc, nil, false, "", ConfigSourceLocal); err != nil {
t.Fatalf("err: %v", err)
}
// Register a check
check1 := &structs.HealthCheck{
Node: a.Config.NodeName,
CheckID: "service:redis",
Name: "redischeck",
Status: api.HealthPassing,
ServiceID: "redis",
ServiceName: "redis",
}
if err := a.AddCheck(check1, nil, false, "", ConfigSourceLocal); err != nil {
t.Fatalf("err: %s", err)
}
// Re-registering the check preserves its state
check1.Status = ""
if err := a.AddCheck(check1, &structs.CheckType{TTL: 30 * time.Second}, false, "", ConfigSourceLocal); err != nil {
t.Fatalf("err: %s", err)
}
check := requireCheckExists(t, a, "service:redis")
if check.Status != api.HealthPassing {
t.Fatalf("bad: %s", check.Status)
}
}
func TestAgent_NodeMaintenanceMode(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
// Enter maintenance mode for the node
a.EnableNodeMaintenance("broken", "mytoken")
// Make sure the critical health check was added
check := requireCheckExists(t, a, structs.NodeMaint)
// Check that the token was used to register the check
if token := a.State.CheckToken(structs.NodeMaintCheckID); token != "mytoken" {
t.Fatalf("expected 'mytoken', got: '%s'", token)
}
// Ensure the reason was set in notes
if check.Notes != "broken" {
t.Fatalf("bad: %#v", check)
}
// Leave maintenance mode
a.DisableNodeMaintenance()
// Ensure the check was deregistered
requireCheckMissing(t, a, structs.NodeMaint)
// Enter maintenance mode without passing a reason
a.EnableNodeMaintenance("", "")
// Make sure the check was registered with the default note
check = requireCheckExists(t, a, structs.NodeMaint)
if check.Notes != defaultNodeMaintReason {
t.Fatalf("bad: %#v", check)
}
}
func TestAgent_checkStateSnapshot(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
// First register a service
svc := &structs.NodeService{
ID: "redis",
Service: "redis",
Tags: []string{"foo"},
Port: 8000,
}
if err := a.AddService(svc, nil, false, "", ConfigSourceLocal); err != nil {
t.Fatalf("err: %v", err)
}
// Register a check
check1 := &structs.HealthCheck{
Node: a.Config.NodeName,
CheckID: "service:redis",
Name: "redischeck",
Status: api.HealthPassing,
ServiceID: "redis",
ServiceName: "redis",
}
if err := a.AddCheck(check1, nil, true, "", ConfigSourceLocal); err != nil {
t.Fatalf("err: %s", err)
}
// Snapshot the state
snap := a.snapshotCheckState()
// Unload all of the checks
if err := a.unloadChecks(); err != nil {
t.Fatalf("err: %s", err)
}
// Reload the checks and restore the snapshot.
if err := a.loadChecks(a.Config, snap); err != nil {
t.Fatalf("err: %s", err)
}
// Search for the check
out := requireCheckExists(t, a, check1.CheckID)
// Make sure state was restored
if out.Status != api.HealthPassing {
t.Fatalf("should have restored check state")
}
}
func TestAgent_loadChecks_checkFails(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
// Persist a health check with an invalid service ID
check := &structs.HealthCheck{
Node: a.Config.NodeName,
CheckID: "service:redis",
Name: "redischeck",
Status: api.HealthPassing,
ServiceID: "nope",
}
if err := a.persistCheck(check, nil, ConfigSourceLocal); err != nil {
t.Fatalf("err: %s", err)
}
// Check to make sure the check was persisted
checkHash := checkIDHash(check.CheckID)
checkPath := filepath.Join(a.Config.DataDir, checksDir, checkHash)
if _, err := os.Stat(checkPath); err != nil {
t.Fatalf("err: %s", err)
}
// Try loading the checks from the persisted files
if err := a.loadChecks(a.Config, nil); err != nil {
t.Fatalf("err: %s", err)
}
// Ensure the erroneous check was purged
if _, err := os.Stat(checkPath); err == nil {
t.Fatalf("should have purged check")
}
}
func TestAgent_persistCheckState(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
cid := structs.NewCheckID("check1", nil)
// Create the TTL check to persist
check := &checks.CheckTTL{
CheckID: cid,
TTL: 10 * time.Minute,
}
// Persist some check state for the check
err := a.persistCheckState(check, api.HealthCritical, "nope")
if err != nil {
t.Fatalf("err: %s", err)
}
// Check the persisted file exists and has the content
file := filepath.Join(a.Config.DataDir, checkStateDir, cid.StringHash())
buf, err := ioutil.ReadFile(file)
if err != nil {
t.Fatalf("err: %s", err)
}
// Decode the state
var p persistedCheckState
if err := json.Unmarshal(buf, &p); err != nil {
t.Fatalf("err: %s", err)
}
// Check the fields
if p.CheckID != cid.ID {
t.Fatalf("bad: %#v", p)
}
if p.Output != "nope" {
t.Fatalf("bad: %#v", p)
}
if p.Status != api.HealthCritical {
t.Fatalf("bad: %#v", p)
}
// Check the expiration time was set
if p.Expires < time.Now().Unix() {
t.Fatalf("bad: %#v", p)
}
}
func TestAgent_loadCheckState(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
// Create a check whose state will expire immediately
check := &checks.CheckTTL{
CheckID: structs.NewCheckID("check1", nil),
TTL: 0,
}
// Persist the check state
err := a.persistCheckState(check, api.HealthPassing, "yup")
if err != nil {
t.Fatalf("err: %s", err)
}
// Try to load the state
health := &structs.HealthCheck{
CheckID: "check1",
Status: api.HealthCritical,
}
if err := a.loadCheckState(health); err != nil {
t.Fatalf("err: %s", err)
}
// Should not have restored the status due to expiration
if health.Status != api.HealthCritical {
t.Fatalf("bad: %#v", health)
}
if health.Output != "" {
t.Fatalf("bad: %#v", health)
}
// Should have purged the state
file := filepath.Join(a.Config.DataDir, checksDir, stringHash("check1"))
if _, err := os.Stat(file); !os.IsNotExist(err) {
t.Fatalf("should have purged state")
}
// Set a TTL which will not expire before we check it
check.TTL = time.Minute
err = a.persistCheckState(check, api.HealthPassing, "yup")
if err != nil {
t.Fatalf("err: %s", err)
}
// Try to load
if err := a.loadCheckState(health); err != nil {
t.Fatalf("err: %s", err)
}
// Should have restored
if health.Status != api.HealthPassing {
t.Fatalf("bad: %#v", health)
}
if health.Output != "yup" {
t.Fatalf("bad: %#v", health)
}
}
func TestAgent_purgeCheckState(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
cid := structs.NewCheckID("check1", nil)
// No error if the state does not exist
if err := a.purgeCheckState(cid); err != nil {
t.Fatalf("err: %s", err)
}
// Persist some state to the data dir
check := &checks.CheckTTL{
CheckID: cid,
TTL: time.Minute,
}
err := a.persistCheckState(check, api.HealthPassing, "yup")
if err != nil {
t.Fatalf("err: %s", err)
}
// Purge the check state
if err := a.purgeCheckState(cid); err != nil {
t.Fatalf("err: %s", err)
}
// Removed the file
file := filepath.Join(a.Config.DataDir, checkStateDir, cid.StringHash())
if _, err := os.Stat(file); !os.IsNotExist(err) {
t.Fatalf("should have removed file")
}
}
func TestAgent_GetCoordinate(t *testing.T) {
t.Parallel()
check := func(server bool) {
a := NewTestAgent(t, t.Name(), `
server = true
`)
defer a.Shutdown()
// This doesn't verify the returned coordinate, but it makes
// sure that the agent chooses the correct Serf instance,
// depending on how it's configured as a client or a server.
// If it chooses the wrong one, this will crash.
if _, err := a.GetLANCoordinate(); err != nil {
t.Fatalf("err: %s", err)
}
}
check(true)
check(false)
}
func TestAgent_reloadWatches(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
// Normal watch with http addr set, should succeed
newConf := *a.config
newConf.Watches = []map[string]interface{}{
{
"type": "key",
"key": "asdf",
"args": []interface{}{"ls"},
},
}
if err := a.reloadWatches(&newConf); err != nil {
t.Fatalf("bad: %s", err)
}
// Should fail to reload with connect watches
newConf.Watches = []map[string]interface{}{
{
"type": "connect_roots",
"key": "asdf",
"args": []interface{}{"ls"},
},
}
if err := a.reloadWatches(&newConf); err == nil || !strings.Contains(err.Error(), "not allowed in agent config") {
t.Fatalf("bad: %s", err)
}
// Should still succeed with only HTTPS addresses
newConf.HTTPSAddrs = newConf.HTTPAddrs
newConf.HTTPAddrs = make([]net.Addr, 0)
newConf.Watches = []map[string]interface{}{
{
"type": "key",
"key": "asdf",
"args": []interface{}{"ls"},
},
}
if err := a.reloadWatches(&newConf); err != nil {
t.Fatalf("bad: %s", err)
}
// Should fail to reload with no http or https addrs
newConf.HTTPSAddrs = make([]net.Addr, 0)
newConf.Watches = []map[string]interface{}{
{
"type": "key",
"key": "asdf",
"args": []interface{}{"ls"},
},
}
if err := a.reloadWatches(&newConf); err == nil || !strings.Contains(err.Error(), "watch plans require an HTTP or HTTPS endpoint") {
t.Fatalf("bad: %s", err)
}
}
func TestAgent_reloadWatchesHTTPS(t *testing.T) {
t.Parallel()
a := TestAgent{Name: t.Name(), UseTLS: true}
if err := a.Start(); err != nil {
t.Fatal(err)
}
defer a.Shutdown()
// Normal watch with http addr set, should succeed
newConf := *a.config
newConf.Watches = []map[string]interface{}{
{
"type": "key",
"key": "asdf",
"args": []interface{}{"ls"},
},
}
if err := a.reloadWatches(&newConf); err != nil {
t.Fatalf("bad: %s", err)
}
}
func TestAgent_loadTokens(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), `
acl = {
enabled = true
tokens = {
agent = "alfa"
agent_master = "bravo",
default = "charlie"
replication = "delta"
}
}
`)
defer a.Shutdown()
require := require.New(t)
tokensFullPath := filepath.Join(a.config.DataDir, tokensPath)
t.Run("original-configuration", func(t *testing.T) {
require.Equal("alfa", a.tokens.AgentToken())
require.Equal("bravo", a.tokens.AgentMasterToken())
require.Equal("charlie", a.tokens.UserToken())
require.Equal("delta", a.tokens.ReplicationToken())
})
t.Run("updated-configuration", func(t *testing.T) {
cfg := &config.RuntimeConfig{
ACLToken: "echo",
ACLAgentToken: "foxtrot",
ACLAgentMasterToken: "golf",
ACLReplicationToken: "hotel",
}
// ensures no error for missing persisted tokens file
require.NoError(a.loadTokens(cfg))
require.Equal("echo", a.tokens.UserToken())
require.Equal("foxtrot", a.tokens.AgentToken())
require.Equal("golf", a.tokens.AgentMasterToken())
require.Equal("hotel", a.tokens.ReplicationToken())
})
t.Run("persisted-tokens", func(t *testing.T) {
cfg := &config.RuntimeConfig{
ACLToken: "echo",
ACLAgentToken: "foxtrot",
ACLAgentMasterToken: "golf",
ACLReplicationToken: "hotel",
}
tokens := `{
"agent" : "india",
"agent_master" : "juliett",
"default": "kilo",
"replication" : "lima"
}`
require.NoError(ioutil.WriteFile(tokensFullPath, []byte(tokens), 0600))
require.NoError(a.loadTokens(cfg))
// no updates since token persistence is not enabled
require.Equal("echo", a.tokens.UserToken())
require.Equal("foxtrot", a.tokens.AgentToken())
require.Equal("golf", a.tokens.AgentMasterToken())
require.Equal("hotel", a.tokens.ReplicationToken())
a.config.ACLEnableTokenPersistence = true
require.NoError(a.loadTokens(cfg))
require.Equal("india", a.tokens.AgentToken())
require.Equal("juliett", a.tokens.AgentMasterToken())
require.Equal("kilo", a.tokens.UserToken())
require.Equal("lima", a.tokens.ReplicationToken())
})
t.Run("persisted-tokens-override", func(t *testing.T) {
tokens := `{
"agent" : "mike",
"agent_master" : "november",
"default": "oscar",
"replication" : "papa"
}`
cfg := &config.RuntimeConfig{
ACLToken: "quebec",
ACLAgentToken: "romeo",
ACLAgentMasterToken: "sierra",
ACLReplicationToken: "tango",
}
require.NoError(ioutil.WriteFile(tokensFullPath, []byte(tokens), 0600))
require.NoError(a.loadTokens(cfg))
require.Equal("mike", a.tokens.AgentToken())
require.Equal("november", a.tokens.AgentMasterToken())
require.Equal("oscar", a.tokens.UserToken())
require.Equal("papa", a.tokens.ReplicationToken())
})
t.Run("partial-persisted", func(t *testing.T) {
tokens := `{
"agent" : "uniform",
"agent_master" : "victor"
}`
cfg := &config.RuntimeConfig{
ACLToken: "whiskey",
ACLAgentToken: "xray",
ACLAgentMasterToken: "yankee",
ACLReplicationToken: "zulu",
}
require.NoError(ioutil.WriteFile(tokensFullPath, []byte(tokens), 0600))
require.NoError(a.loadTokens(cfg))
require.Equal("uniform", a.tokens.AgentToken())
require.Equal("victor", a.tokens.AgentMasterToken())
require.Equal("whiskey", a.tokens.UserToken())
require.Equal("zulu", a.tokens.ReplicationToken())
})
t.Run("persistence-error-not-json", func(t *testing.T) {
cfg := &config.RuntimeConfig{
ACLToken: "one",
ACLAgentToken: "two",
ACLAgentMasterToken: "three",
ACLReplicationToken: "four",
}
require.NoError(ioutil.WriteFile(tokensFullPath, []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}, 0600))
err := a.loadTokens(cfg)
require.Error(err)
require.Equal("one", a.tokens.UserToken())
require.Equal("two", a.tokens.AgentToken())
require.Equal("three", a.tokens.AgentMasterToken())
require.Equal("four", a.tokens.ReplicationToken())
})
t.Run("persistence-error-wrong-top-level", func(t *testing.T) {
cfg := &config.RuntimeConfig{
ACLToken: "alfa",
ACLAgentToken: "bravo",
ACLAgentMasterToken: "charlie",
ACLReplicationToken: "foxtrot",
}
require.NoError(ioutil.WriteFile(tokensFullPath, []byte("[1,2,3]"), 0600))
err := a.loadTokens(cfg)
require.Error(err)
require.Equal("alfa", a.tokens.UserToken())
require.Equal("bravo", a.tokens.AgentToken())
require.Equal("charlie", a.tokens.AgentMasterToken())
require.Equal("foxtrot", a.tokens.ReplicationToken())
})
}
func TestAgent_ReloadConfigOutgoingRPCConfig(t *testing.T) {
t.Parallel()
dataDir := testutil.TempDir(t, "agent") // we manage the data dir
defer os.RemoveAll(dataDir)
hcl := `
data_dir = "` + dataDir + `"
verify_outgoing = true
ca_file = "../test/ca/root.cer"
cert_file = "../test/key/ourdomain.cer"
key_file = "../test/key/ourdomain.key"
verify_server_hostname = false
`
a := NewTestAgent(t, t.Name(), hcl)
defer a.Shutdown()
tlsConf := a.tlsConfigurator.OutgoingRPCConfig()
require.True(t, tlsConf.InsecureSkipVerify)
require.Len(t, tlsConf.ClientCAs.Subjects(), 1)
require.Len(t, tlsConf.RootCAs.Subjects(), 1)
hcl = `
data_dir = "` + dataDir + `"
verify_outgoing = true
ca_path = "../test/ca_path"
cert_file = "../test/key/ourdomain.cer"
key_file = "../test/key/ourdomain.key"
verify_server_hostname = true
`
c := TestConfig(testutil.Logger(t), config.Source{Name: t.Name(), Format: "hcl", Data: hcl})
require.NoError(t, a.ReloadConfig(c))
tlsConf = a.tlsConfigurator.OutgoingRPCConfig()
require.False(t, tlsConf.InsecureSkipVerify)
require.Len(t, tlsConf.RootCAs.Subjects(), 2)
require.Len(t, tlsConf.ClientCAs.Subjects(), 2)
}
func TestAgent_ReloadConfigIncomingRPCConfig(t *testing.T) {
t.Parallel()
dataDir := testutil.TempDir(t, "agent") // we manage the data dir
defer os.RemoveAll(dataDir)
hcl := `
data_dir = "` + dataDir + `"
verify_outgoing = true
ca_file = "../test/ca/root.cer"
cert_file = "../test/key/ourdomain.cer"
key_file = "../test/key/ourdomain.key"
verify_server_hostname = false
`
a := NewTestAgent(t, t.Name(), hcl)
defer a.Shutdown()
tlsConf := a.tlsConfigurator.IncomingRPCConfig()
require.NotNil(t, tlsConf.GetConfigForClient)
tlsConf, err := tlsConf.GetConfigForClient(nil)
require.NoError(t, err)
require.NotNil(t, tlsConf)
require.True(t, tlsConf.InsecureSkipVerify)
require.Len(t, tlsConf.ClientCAs.Subjects(), 1)
require.Len(t, tlsConf.RootCAs.Subjects(), 1)
hcl = `
data_dir = "` + dataDir + `"
verify_outgoing = true
ca_path = "../test/ca_path"
cert_file = "../test/key/ourdomain.cer"
key_file = "../test/key/ourdomain.key"
verify_server_hostname = true
`
c := TestConfig(testutil.Logger(t), config.Source{Name: t.Name(), Format: "hcl", Data: hcl})
require.NoError(t, a.ReloadConfig(c))
tlsConf, err = tlsConf.GetConfigForClient(nil)
require.NoError(t, err)
require.False(t, tlsConf.InsecureSkipVerify)
require.Len(t, tlsConf.ClientCAs.Subjects(), 2)
require.Len(t, tlsConf.RootCAs.Subjects(), 2)
}
func TestAgent_ReloadConfigTLSConfigFailure(t *testing.T) {
t.Parallel()
dataDir := testutil.TempDir(t, "agent") // we manage the data dir
defer os.RemoveAll(dataDir)
hcl := `
data_dir = "` + dataDir + `"
verify_outgoing = true
ca_file = "../test/ca/root.cer"
cert_file = "../test/key/ourdomain.cer"
key_file = "../test/key/ourdomain.key"
verify_server_hostname = false
`
a := NewTestAgent(t, t.Name(), hcl)
defer a.Shutdown()
tlsConf := a.tlsConfigurator.IncomingRPCConfig()
hcl = `
data_dir = "` + dataDir + `"
verify_incoming = true
`
c := TestConfig(testutil.Logger(t), config.Source{Name: t.Name(), Format: "hcl", Data: hcl})
require.Error(t, a.ReloadConfig(c))
tlsConf, err := tlsConf.GetConfigForClient(nil)
require.NoError(t, err)
require.Equal(t, tls.NoClientCert, tlsConf.ClientAuth)
require.Len(t, tlsConf.ClientCAs.Subjects(), 1)
require.Len(t, tlsConf.RootCAs.Subjects(), 1)
}
func TestAgent_consulConfig_AutoEncryptAllowTLS(t *testing.T) {
t.Parallel()
dataDir := testutil.TempDir(t, "agent") // we manage the data dir
defer os.RemoveAll(dataDir)
hcl := `
data_dir = "` + dataDir + `"
verify_incoming = true
ca_file = "../test/ca/root.cer"
cert_file = "../test/key/ourdomain.cer"
key_file = "../test/key/ourdomain.key"
auto_encrypt { allow_tls = true }
`
a := NewTestAgent(t, t.Name(), hcl)
defer a.Shutdown()
require.True(t, a.consulConfig().AutoEncryptAllowTLS)
}
func TestAgent_consulConfig_RaftTrailingLogs(t *testing.T) {
t.Parallel()
hcl := `
raft_trailing_logs = 812345
`
a := NewTestAgent(t, t.Name(), hcl)
defer a.Shutdown()
require.Equal(t, uint64(812345), a.consulConfig().RaftConfig.TrailingLogs)
}
func TestAgent_grpcInjectAddr(t *testing.T) {
tt := []struct {
name string
grpc string
ip string
port int
want string
}{
{
name: "localhost web svc",
grpc: "localhost:8080/web",
ip: "192.168.0.0",
port: 9090,
want: "192.168.0.0:9090/web",
},
{
name: "localhost no svc",
grpc: "localhost:8080",
ip: "192.168.0.0",
port: 9090,
want: "192.168.0.0:9090",
},
{
name: "ipv4 web svc",
grpc: "127.0.0.1:8080/web",
ip: "192.168.0.0",
port: 9090,
want: "192.168.0.0:9090/web",
},
{
name: "ipv4 no svc",
grpc: "127.0.0.1:8080",
ip: "192.168.0.0",
port: 9090,
want: "192.168.0.0:9090",
},
{
name: "ipv6 no svc",
grpc: "2001:db8:1f70::999:de8:7648:6e8:5000",
ip: "192.168.0.0",
port: 9090,
want: "192.168.0.0:9090",
},
{
name: "ipv6 web svc",
grpc: "2001:db8:1f70::999:de8:7648:6e8:5000/web",
ip: "192.168.0.0",
port: 9090,
want: "192.168.0.0:9090/web",
},
{
name: "zone ipv6 web svc",
grpc: "::FFFF:C0A8:1%1:5000/web",
ip: "192.168.0.0",
port: 9090,
want: "192.168.0.0:9090/web",
},
{
name: "ipv6 literal web svc",
grpc: "::FFFF:192.168.0.1:5000/web",
ip: "192.168.0.0",
port: 9090,
want: "192.168.0.0:9090/web",
},
{
name: "ipv6 injected into ipv6 url",
grpc: "2001:db8:1f70::999:de8:7648:6e8:5000",
ip: "::FFFF:C0A8:1",
port: 9090,
want: "::FFFF:C0A8:1:9090",
},
{
name: "ipv6 injected into ipv6 url with svc",
grpc: "2001:db8:1f70::999:de8:7648:6e8:5000/web",
ip: "::FFFF:C0A8:1",
port: 9090,
want: "::FFFF:C0A8:1:9090/web",
},
{
name: "ipv6 injected into ipv6 url with special",
grpc: "2001:db8:1f70::999:de8:7648:6e8:5000/service-$name:with@special:Chars",
ip: "::FFFF:C0A8:1",
port: 9090,
want: "::FFFF:C0A8:1:9090/service-$name:with@special:Chars",
},
}
for _, tt := range tt {
t.Run(tt.name, func(t *testing.T) {
got := grpcInjectAddr(tt.grpc, tt.ip, tt.port)
if got != tt.want {
t.Errorf("httpInjectAddr() got = %v, want %v", got, tt.want)
}
})
}
}
func TestAgent_httpInjectAddr(t *testing.T) {
tt := []struct {
name string
url string
ip string
port int
want string
}{
{
name: "localhost health",
url: "http://localhost:8080/health",
ip: "192.168.0.0",
port: 9090,
want: "http://192.168.0.0:9090/health",
},
{
name: "https localhost health",
url: "https://localhost:8080/health",
ip: "192.168.0.0",
port: 9090,
want: "https://192.168.0.0:9090/health",
},
{
name: "https ipv4 health",
url: "https://127.0.0.1:8080/health",
ip: "192.168.0.0",
port: 9090,
want: "https://192.168.0.0:9090/health",
},
{
name: "https ipv4 without path",
url: "https://127.0.0.1:8080",
ip: "192.168.0.0",
port: 9090,
want: "https://192.168.0.0:9090",
},
{
name: "https ipv6 health",
url: "https://[2001:db8:1f70::999:de8:7648:6e8]:5000/health",
ip: "192.168.0.0",
port: 9090,
want: "https://192.168.0.0:9090/health",
},
{
name: "https ipv6 with zone",
url: "https://[::FFFF:C0A8:1%1]:5000/health",
ip: "192.168.0.0",
port: 9090,
want: "https://192.168.0.0:9090/health",
},
{
name: "https ipv6 literal",
url: "https://[::FFFF:192.168.0.1]:5000/health",
ip: "192.168.0.0",
port: 9090,
want: "https://192.168.0.0:9090/health",
},
{
name: "https ipv6 without path",
url: "https://[2001:db8:1f70::999:de8:7648:6e8]:5000",
ip: "192.168.0.0",
port: 9090,
want: "https://192.168.0.0:9090",
},
{
name: "ipv6 injected into ipv6 url",
url: "https://[2001:db8:1f70::999:de8:7648:6e8]:5000",
ip: "::FFFF:C0A8:1",
port: 9090,
want: "https://[::FFFF:C0A8:1]:9090",
},
{
name: "ipv6 with brackets injected into ipv6 url",
url: "https://[2001:db8:1f70::999:de8:7648:6e8]:5000",
ip: "[::FFFF:C0A8:1]",
port: 9090,
want: "https://[::FFFF:C0A8:1]:9090",
},
{
name: "short domain health",
url: "http://i.co:8080/health",
ip: "192.168.0.0",
port: 9090,
want: "http://192.168.0.0:9090/health",
},
{
name: "nested url in query",
url: "http://my.corp.com:8080/health?from=http://google.com:8080",
ip: "192.168.0.0",
port: 9090,
want: "http://192.168.0.0:9090/health?from=http://google.com:8080",
},
}
for _, tt := range tt {
t.Run(tt.name, func(t *testing.T) {
got := httpInjectAddr(tt.url, tt.ip, tt.port)
if got != tt.want {
t.Errorf("httpInjectAddr() got = %v, want %v", got, tt.want)
}
})
}
}
func TestDefaultIfEmpty(t *testing.T) {
require.Equal(t, "", defaultIfEmpty("", ""))
require.Equal(t, "foo", defaultIfEmpty("", "foo"))
require.Equal(t, "bar", defaultIfEmpty("bar", "foo"))
require.Equal(t, "bar", defaultIfEmpty("bar", ""))
}
func TestConfigSourceFromName(t *testing.T) {
cases := []struct {
in string
expect configSource
bad bool
}{
{in: "local", expect: ConfigSourceLocal},
{in: "remote", expect: ConfigSourceRemote},
{in: "", expect: ConfigSourceLocal},
{in: "LOCAL", bad: true},
{in: "REMOTE", bad: true},
{in: "garbage", bad: true},
{in: " ", bad: true},
}
for _, tc := range cases {
tc := tc
t.Run(tc.in, func(t *testing.T) {
got, ok := ConfigSourceFromName(tc.in)
if tc.bad {
require.False(t, ok)
require.Empty(t, got)
} else {
require.True(t, ok)
require.Equal(t, tc.expect, got)
}
})
}
}
func TestAgent_RerouteExistingHTTPChecks(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
// Register a service without a ProxyAddr
svc := &structs.NodeService{
ID: "web",
Service: "web",
Address: "localhost",
Port: 8080,
}
chks := []*structs.CheckType{
{
CheckID: "http",
HTTP: "http://localhost:8080/mypath?query",
Interval: 20 * time.Millisecond,
TLSSkipVerify: true,
},
{
CheckID: "grpc",
GRPC: "localhost:8080/myservice",
Interval: 20 * time.Millisecond,
TLSSkipVerify: true,
},
}
if err := a.AddService(svc, chks, false, "", ConfigSourceLocal); err != nil {
t.Fatalf("failed to add svc: %v", err)
}
// Register a proxy and expose HTTP checks
// This should trigger setting ProxyHTTP and ProxyGRPC in the checks
proxy := &structs.NodeService{
Kind: "connect-proxy",
ID: "web-proxy",
Service: "web-proxy",
Address: "localhost",
Port: 21500,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "web",
DestinationServiceID: "web",
LocalServiceAddress: "localhost",
LocalServicePort: 8080,
MeshGateway: structs.MeshGatewayConfig{},
Expose: structs.ExposeConfig{
Checks: true,
},
},
}
if err := a.AddService(proxy, nil, false, "", ConfigSourceLocal); err != nil {
t.Fatalf("failed to add svc: %v", err)
}
retry.Run(t, func(r *retry.R) {
chks := a.ServiceHTTPBasedChecks(structs.NewServiceID("web", nil))
got := chks[0].ProxyHTTP
if got == "" {
r.Fatal("proxyHTTP addr not set in check")
}
want := "http://localhost:21500/mypath?query"
if got != want {
r.Fatalf("unexpected proxy addr in check, want: %s, got: %s", want, got)
}
})
retry.Run(t, func(r *retry.R) {
chks := a.ServiceHTTPBasedChecks(structs.NewServiceID("web", nil))
// Will be at a later index than HTTP check because of the fetching order in ServiceHTTPBasedChecks
got := chks[1].ProxyGRPC
if got == "" {
r.Fatal("ProxyGRPC addr not set in check")
}
// Node that this relies on listener ports auto-incrementing in a.listenerPortLocked
want := "localhost:21501/myservice"
if got != want {
r.Fatalf("unexpected proxy addr in check, want: %s, got: %s", want, got)
}
})
// Re-register a proxy and disable exposing HTTP checks
// This should trigger resetting ProxyHTTP and ProxyGRPC to empty strings
proxy = &structs.NodeService{
Kind: "connect-proxy",
ID: "web-proxy",
Service: "web-proxy",
Address: "localhost",
Port: 21500,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "web",
DestinationServiceID: "web",
LocalServiceAddress: "localhost",
LocalServicePort: 8080,
MeshGateway: structs.MeshGatewayConfig{},
Expose: structs.ExposeConfig{
Checks: false,
},
},
}
if err := a.AddService(proxy, nil, false, "", ConfigSourceLocal); err != nil {
t.Fatalf("failed to add svc: %v", err)
}
retry.Run(t, func(r *retry.R) {
chks := a.ServiceHTTPBasedChecks(structs.NewServiceID("web", nil))
got := chks[0].ProxyHTTP
if got != "" {
r.Fatal("ProxyHTTP addr was not reset")
}
})
retry.Run(t, func(r *retry.R) {
chks := a.ServiceHTTPBasedChecks(structs.NewServiceID("web", nil))
// Will be at a later index than HTTP check because of the fetching order in ServiceHTTPBasedChecks
got := chks[1].ProxyGRPC
if got != "" {
r.Fatal("ProxyGRPC addr was not reset")
}
})
}
func TestAgent_RerouteNewHTTPChecks(t *testing.T) {
t.Parallel()
a := NewTestAgent(t, t.Name(), "")
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")
// Register a service without a ProxyAddr
svc := &structs.NodeService{
ID: "web",
Service: "web",
Address: "localhost",
Port: 8080,
}
if err := a.AddService(svc, nil, false, "", ConfigSourceLocal); err != nil {
t.Fatalf("failed to add svc: %v", err)
}
// Register a proxy and expose HTTP checks
proxy := &structs.NodeService{
Kind: "connect-proxy",
ID: "web-proxy",
Service: "web-proxy",
Address: "localhost",
Port: 21500,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "web",
DestinationServiceID: "web",
LocalServiceAddress: "localhost",
LocalServicePort: 8080,
MeshGateway: structs.MeshGatewayConfig{},
Expose: structs.ExposeConfig{
Checks: true,
},
},
}
if err := a.AddService(proxy, nil, false, "", ConfigSourceLocal); err != nil {
t.Fatalf("failed to add svc: %v", err)
}
checks := []*structs.HealthCheck{
{
CheckID: "http",
Name: "http",
ServiceID: "web",
Status: api.HealthCritical,
},
{
CheckID: "grpc",
Name: "grpc",
ServiceID: "web",
Status: api.HealthCritical,
},
}
chkTypes := []*structs.CheckType{
{
CheckID: "http",
HTTP: "http://localhost:8080/mypath?query",
Interval: 20 * time.Millisecond,
TLSSkipVerify: true,
},
{
CheckID: "grpc",
GRPC: "localhost:8080/myservice",
Interval: 20 * time.Millisecond,
TLSSkipVerify: true,
},
}
// ProxyGRPC and ProxyHTTP should be set when creating check
// since proxy.expose.checks is enabled on the proxy
if err := a.AddCheck(checks[0], chkTypes[0], false, "", ConfigSourceLocal); err != nil {
t.Fatalf("failed to add check: %v", err)
}
if err := a.AddCheck(checks[1], chkTypes[1], false, "", ConfigSourceLocal); err != nil {
t.Fatalf("failed to add check: %v", err)
}
retry.Run(t, func(r *retry.R) {
chks := a.ServiceHTTPBasedChecks(structs.NewServiceID("web", nil))
got := chks[0].ProxyHTTP
if got == "" {
r.Fatal("ProxyHTTP addr not set in check")
}
want := "http://localhost:21500/mypath?query"
if got != want {
r.Fatalf("unexpected proxy addr in http check, want: %s, got: %s", want, got)
}
})
retry.Run(t, func(r *retry.R) {
chks := a.ServiceHTTPBasedChecks(structs.NewServiceID("web", nil))
// Will be at a later index than HTTP check because of the fetching order in ServiceHTTPBasedChecks
got := chks[1].ProxyGRPC
if got == "" {
r.Fatal("ProxyGRPC addr not set in check")
}
want := "localhost:21501/myservice"
if got != want {
r.Fatalf("unexpected proxy addr in grpc check, want: %s, got: %s", want, got)
}
})
}
func TestAgentCache_serviceInConfigFile_initialFetchErrors_Issue6521(t *testing.T) {
t.Parallel()
// Ensure that initial failures to fetch the discovery chain via the agent
// cache using the notify API for a service with no config entries
// correctly recovers when those RPCs resume working. The key here is that
// the lack of config entries guarantees that the RPC will come back with a
// synthetic index of 1.
//
// The bug in the Cache.notifyBlockingQuery used to incorrectly "fix" the
// index for the next query from 0 to 1 for all queries, when it should
// have not done so for queries that errored.
a1 := NewTestAgent(t, t.Name()+"-a1", "")
defer a1.Shutdown()
testrpc.WaitForLeader(t, a1.RPC, "dc1")
a2 := NewTestAgent(t, t.Name()+"-a2", `
server = false
bootstrap = false
services {
name = "echo-client"
port = 8080
connect {
sidecar_service {
proxy {
upstreams {
destination_name = "echo"
local_bind_port = 9191
}
}
}
}
}
services {
name = "echo"
port = 9090
connect {
sidecar_service {}
}
}
`)
defer a2.Shutdown()
// Starting a client agent disconnected from a server with services.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ch := make(chan cache.UpdateEvent, 1)
require.NoError(t, a2.cache.Notify(ctx, cachetype.CompiledDiscoveryChainName, &structs.DiscoveryChainRequest{
Datacenter: "dc1",
Name: "echo",
EvaluateInDatacenter: "dc1",
EvaluateInNamespace: "default",
}, "foo", ch))
{ // The first event is an error because we are not joined yet.
evt := <-ch
require.Equal(t, "foo", evt.CorrelationID)
require.Nil(t, evt.Result)
require.Error(t, evt.Err)
require.Equal(t, evt.Err, structs.ErrNoServers)
}
t.Logf("joining client to server")
// Now connect to server
_, err := a1.JoinLAN([]string{
fmt.Sprintf("127.0.0.1:%d", a2.Config.SerfPortLAN),
})
require.NoError(t, err)
t.Logf("joined client to server")
deadlineCh := time.After(10 * time.Second)
start := time.Now()
LOOP:
for {
select {
case evt := <-ch:
// We may receive several notifications of an error until we get the
// first successful reply.
require.Equal(t, "foo", evt.CorrelationID)
if evt.Err != nil {
break LOOP
}
require.NoError(t, evt.Err)
require.NotNil(t, evt.Result)
t.Logf("took %s to get first success", time.Since(start))
case <-deadlineCh:
t.Fatal("did not get notified successfully")
}
}
}