feat(v2): add consul service and workloads to catalog (#20077)

pull/20058/head
Dan Stough 2024-01-03 15:14:42 -05:00 committed by GitHub
parent 8e2d4e3aaf
commit 073959866d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 2335 additions and 1096 deletions

View File

@ -8,7 +8,6 @@ import (
"errors"
"fmt"
"net"
"reflect"
"strconv"
"strings"
"sync"
@ -17,6 +16,7 @@ import (
"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
"github.com/google/go-cmp/cmp"
"github.com/oklog/ulid/v2"
"golang.org/x/time/rate"
"google.golang.org/protobuf/types/known/anypb"
@ -36,9 +36,9 @@ import (
"github.com/hashicorp/consul/internal/storage"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/logging"
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1"
"github.com/hashicorp/consul/proto-public/pbresource"
pbtenancy "github.com/hashicorp/consul/proto-public/pbtenancy/v2beta1"
"github.com/hashicorp/consul/types"
)
var LeaderSummaries = []prometheus.SummaryDefinition{
@ -355,6 +355,12 @@ func (s *Server) establishLeadership(ctx context.Context) error {
}
}
if s.useV2Resources {
if err := s.initConsulService(ctx, s.insecureResourceServiceClient); err != nil {
return err
}
}
if s.config.Reporting.License.Enabled && s.reportingManager != nil {
s.reportingManager.StartReportingAgent()
}
@ -958,13 +964,21 @@ func (s *Server) reconcileReaped(known map[string]struct{}, nodeEntMeta *acl.Ent
}
// Attempt to reap this member
if err := s.handleReapMember(member, nodeEntMeta); err != nil {
if err := s.registrator.HandleReapMember(member, nodeEntMeta, s.removeConsulServer); err != nil {
return err
}
}
return nil
}
// ConsulRegistrator is an interface that manages the catalog registration lifecycle of Consul servers from serf events.
type ConsulRegistrator interface {
HandleAliveMember(member serf.Member, nodeEntMeta *acl.EnterpriseMeta, joinServer func(m serf.Member, parts *metadata.Server) error) error
HandleFailedMember(member serf.Member, nodeEntMeta *acl.EnterpriseMeta) error
HandleLeftMember(member serf.Member, nodeEntMeta *acl.EnterpriseMeta, removeServerFunc func(m serf.Member) error) error
HandleReapMember(member serf.Member, nodeEntMeta *acl.EnterpriseMeta, removeServerFunc func(m serf.Member) error) error
}
// reconcileMember is used to do an async reconcile of a single
// serf member
func (s *Server) reconcileMember(member serf.Member) error {
@ -983,13 +997,13 @@ func (s *Server) reconcileMember(member serf.Member) error {
var err error
switch member.Status {
case serf.StatusAlive:
err = s.handleAliveMember(member, nodeEntMeta)
err = s.registrator.HandleAliveMember(member, nodeEntMeta, s.joinConsulServer)
case serf.StatusFailed:
err = s.handleFailedMember(member, nodeEntMeta)
err = s.registrator.HandleFailedMember(member, nodeEntMeta)
case serf.StatusLeft:
err = s.handleLeftMember(member, nodeEntMeta)
err = s.registrator.HandleLeftMember(member, nodeEntMeta, s.removeConsulServer)
case StatusReap:
err = s.handleReapMember(member, nodeEntMeta)
err = s.registrator.HandleReapMember(member, nodeEntMeta, s.removeConsulServer)
}
if err != nil {
s.logger.Error("failed to reconcile member",
@ -1020,254 +1034,6 @@ func (s *Server) shouldHandleMember(member serf.Member) bool {
return false
}
// handleAliveMember is used to ensure the node
// is registered, with a passing health check.
func (s *Server) handleAliveMember(member serf.Member, nodeEntMeta *acl.EnterpriseMeta) error {
if nodeEntMeta == nil {
nodeEntMeta = structs.NodeEnterpriseMetaInDefaultPartition()
}
// Register consul service if a server
var service *structs.NodeService
if valid, parts := metadata.IsConsulServer(member); valid {
service = &structs.NodeService{
ID: structs.ConsulServiceID,
Service: structs.ConsulServiceName,
Port: parts.Port,
Weights: &structs.Weights{
Passing: 1,
Warning: 1,
},
EnterpriseMeta: *nodeEntMeta,
Meta: map[string]string{
// DEPRECATED - remove nonvoter in favor of read_replica in a future version of consul
"non_voter": strconv.FormatBool(member.Tags["nonvoter"] == "1"),
"read_replica": strconv.FormatBool(member.Tags["read_replica"] == "1"),
"raft_version": strconv.Itoa(parts.RaftVersion),
"serf_protocol_current": strconv.FormatUint(uint64(member.ProtocolCur), 10),
"serf_protocol_min": strconv.FormatUint(uint64(member.ProtocolMin), 10),
"serf_protocol_max": strconv.FormatUint(uint64(member.ProtocolMax), 10),
"version": parts.Build.String(),
},
}
if parts.ExternalGRPCPort > 0 {
service.Meta["grpc_port"] = strconv.Itoa(parts.ExternalGRPCPort)
}
if parts.ExternalGRPCTLSPort > 0 {
service.Meta["grpc_tls_port"] = strconv.Itoa(parts.ExternalGRPCTLSPort)
}
// Attempt to join the consul server
if err := s.joinConsulServer(member, parts); err != nil {
return err
}
}
// Check if the node exists
state := s.fsm.State()
_, node, err := state.GetNode(member.Name, nodeEntMeta, structs.DefaultPeerKeyword)
if err != nil {
return err
}
if node != nil && node.Address == member.Addr.String() {
// Check if the associated service is available
if service != nil {
match := false
_, services, err := state.NodeServices(nil, member.Name, nodeEntMeta, structs.DefaultPeerKeyword)
if err != nil {
return err
}
if services != nil {
for id, serv := range services.Services {
if id == service.ID {
// If metadata are different, be sure to update it
match = reflect.DeepEqual(serv.Meta, service.Meta)
}
}
}
if !match {
goto AFTER_CHECK
}
}
// Check if the serfCheck is in the passing state
_, checks, err := state.NodeChecks(nil, member.Name, nodeEntMeta, structs.DefaultPeerKeyword)
if err != nil {
return err
}
for _, check := range checks {
if check.CheckID == structs.SerfCheckID && check.Status == api.HealthPassing {
return nil
}
}
}
AFTER_CHECK:
s.logger.Info("member joined, marking health alive",
"member", member.Name,
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(),
)
// Get consul version from serf member
// add this as node meta in catalog register request
buildVersion, err := metadata.Build(&member)
if err != nil {
return err
}
// Register with the catalog.
req := structs.RegisterRequest{
Datacenter: s.config.Datacenter,
Node: member.Name,
ID: types.NodeID(member.Tags["id"]),
Address: member.Addr.String(),
Service: service,
Check: &structs.HealthCheck{
Node: member.Name,
CheckID: structs.SerfCheckID,
Name: structs.SerfCheckName,
Status: api.HealthPassing,
Output: structs.SerfCheckAliveOutput,
},
EnterpriseMeta: *nodeEntMeta,
NodeMeta: map[string]string{
structs.MetaConsulVersion: buildVersion.String(),
},
}
if node != nil {
req.TaggedAddresses = node.TaggedAddresses
req.NodeMeta = node.Meta
}
_, err = s.raftApply(structs.RegisterRequestType, &req)
return err
}
// handleFailedMember is used to mark the node's status
// as being critical, along with all checks as unknown.
func (s *Server) handleFailedMember(member serf.Member, nodeEntMeta *acl.EnterpriseMeta) error {
if nodeEntMeta == nil {
nodeEntMeta = structs.NodeEnterpriseMetaInDefaultPartition()
}
// Check if the node exists
state := s.fsm.State()
_, node, err := state.GetNode(member.Name, nodeEntMeta, structs.DefaultPeerKeyword)
if err != nil {
return err
}
if node == nil {
s.logger.Info("ignoring failed event for member because it does not exist in the catalog",
"member", member.Name,
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(),
)
return nil
}
if node.Address == member.Addr.String() {
// Check if the serfCheck is in the critical state
_, checks, err := state.NodeChecks(nil, member.Name, nodeEntMeta, structs.DefaultPeerKeyword)
if err != nil {
return err
}
for _, check := range checks {
if check.CheckID == structs.SerfCheckID && check.Status == api.HealthCritical {
return nil
}
}
}
s.logger.Info("member failed, marking health critical",
"member", member.Name,
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(),
)
// Register with the catalog
req := structs.RegisterRequest{
Datacenter: s.config.Datacenter,
Node: member.Name,
EnterpriseMeta: *nodeEntMeta,
ID: types.NodeID(member.Tags["id"]),
Address: member.Addr.String(),
Check: &structs.HealthCheck{
Node: member.Name,
CheckID: structs.SerfCheckID,
Name: structs.SerfCheckName,
Status: api.HealthCritical,
Output: structs.SerfCheckFailedOutput,
},
// If there's existing information about the node, do not
// clobber it.
SkipNodeUpdate: true,
}
_, err = s.raftApply(structs.RegisterRequestType, &req)
return err
}
// handleLeftMember is used to handle members that gracefully
// left. They are deregistered if necessary.
func (s *Server) handleLeftMember(member serf.Member, nodeEntMeta *acl.EnterpriseMeta) error {
return s.handleDeregisterMember("left", member, nodeEntMeta)
}
// handleReapMember is used to handle members that have been
// reaped after a prolonged failure. They are deregistered.
func (s *Server) handleReapMember(member serf.Member, nodeEntMeta *acl.EnterpriseMeta) error {
return s.handleDeregisterMember("reaped", member, nodeEntMeta)
}
// handleDeregisterMember is used to deregister a member of a given reason
func (s *Server) handleDeregisterMember(reason string, member serf.Member, nodeEntMeta *acl.EnterpriseMeta) error {
if nodeEntMeta == nil {
nodeEntMeta = structs.NodeEnterpriseMetaInDefaultPartition()
}
// Do not deregister ourself. This can only happen if the current leader
// is leaving. Instead, we should allow a follower to take-over and
// deregister us later.
//
// TODO(partitions): check partitions here too? server names should be unique in general though
if strings.EqualFold(member.Name, s.config.NodeName) {
s.logger.Warn("deregistering self should be done by follower",
"name", s.config.NodeName,
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(),
)
return nil
}
// Remove from Raft peers if this was a server
if valid, _ := metadata.IsConsulServer(member); valid {
if err := s.removeConsulServer(member); err != nil {
return err
}
}
// Check if the node does not exist
state := s.fsm.State()
_, node, err := state.GetNode(member.Name, nodeEntMeta, structs.DefaultPeerKeyword)
if err != nil {
return err
}
if node == nil {
return nil
}
// Deregister the node
s.logger.Info("deregistering member",
"member", member.Name,
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(),
"reason", reason,
)
req := structs.DeregisterRequest{
Datacenter: s.config.Datacenter,
Node: member.Name,
EnterpriseMeta: *nodeEntMeta,
}
_, err = s.raftApply(structs.DeregisterRequestType, &req)
return err
}
// joinConsulServer is used to try to join another consul server
func (s *Server) joinConsulServer(m serf.Member, parts *metadata.Server) error {
// Check for possibility of multiple bootstrap nodes
@ -1464,6 +1230,66 @@ func (s *serversIntentionsAsConfigEntriesInfo) update(srv *metadata.Server) bool
return false
}
func (s *Server) initConsulService(ctx context.Context, client pbresource.ResourceServiceClient) error {
service := &pbcatalog.Service{
Workloads: &pbcatalog.WorkloadSelector{
Prefixes: []string{consulWorkloadPrefix},
},
Ports: []*pbcatalog.ServicePort{
{
TargetPort: consulPortNameServer,
Protocol: pbcatalog.Protocol_PROTOCOL_TCP,
// No virtual port defined for now, as we assume this is generally for Service Discovery
},
},
}
serviceData, err := anypb.New(service)
if err != nil {
return fmt.Errorf("could not convert Service to `any` message: %w", err)
}
// create a default namespace in default partition
serviceID := &pbresource.ID{
Type: pbcatalog.ServiceType,
Name: structs.ConsulServiceName,
Tenancy: resource.DefaultNamespacedTenancy(),
}
serviceResource := &pbresource.Resource{
Id: serviceID,
Data: serviceData,
}
res, err := client.Read(ctx, &pbresource.ReadRequest{Id: serviceID})
if err != nil && !grpcNotFoundErr(err) {
return fmt.Errorf("failed to read the %s Service: %w", structs.ConsulServiceName, err)
}
if err == nil {
existingService := res.GetResource()
s.logger.Debug("existingService consul Service found")
// If the Service is identical, we're done.
if cmp.Equal(serviceResource, existingService, resourceCmpOptions...) {
s.logger.Debug("no updates to perform on consul Service")
return nil
}
// If the existing Service is different, add the Version to the patch for CAS write.
serviceResource.Id = existingService.Id
serviceResource.Version = existingService.Version
}
_, err = client.Write(ctx, &pbresource.WriteRequest{Resource: serviceResource})
if err != nil {
return fmt.Errorf("failed to create the %s service: %w", structs.ConsulServiceName, err)
}
s.logger.Info("Created consul Service in catalog")
return nil
}
func (s *Server) initTenancy(ctx context.Context, b storage.Backend) error {
// we write these defaults directly to the storage backend
// without going through the resource service since tenancy

View File

@ -0,0 +1,279 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package consul
import (
"reflect"
"strconv"
"strings"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/serf/serf"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/fsm"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/types"
)
var _ ConsulRegistrator = (*V1ConsulRegistrator)(nil)
type V1ConsulRegistrator struct {
Datacenter string
FSM *fsm.FSM
Logger hclog.Logger
NodeName string
RaftApplyFunc func(t structs.MessageType, msg any) (any, error)
}
// HandleAliveMember is used to ensure the node
// is registered, with a passing health check.
func (r V1ConsulRegistrator) HandleAliveMember(member serf.Member, nodeEntMeta *acl.EnterpriseMeta, joinServer func(m serf.Member, parts *metadata.Server) error) error {
if nodeEntMeta == nil {
nodeEntMeta = structs.NodeEnterpriseMetaInDefaultPartition()
}
// Register consul service if a server
var service *structs.NodeService
if valid, parts := metadata.IsConsulServer(member); valid {
service = &structs.NodeService{
ID: structs.ConsulServiceID,
Service: structs.ConsulServiceName,
Port: parts.Port,
Weights: &structs.Weights{
Passing: 1,
Warning: 1,
},
EnterpriseMeta: *nodeEntMeta,
Meta: map[string]string{
// DEPRECATED - remove nonvoter in favor of read_replica in a future version of consul
"non_voter": strconv.FormatBool(member.Tags["nonvoter"] == "1"),
"read_replica": strconv.FormatBool(member.Tags["read_replica"] == "1"),
"raft_version": strconv.Itoa(parts.RaftVersion),
"serf_protocol_current": strconv.FormatUint(uint64(member.ProtocolCur), 10),
"serf_protocol_min": strconv.FormatUint(uint64(member.ProtocolMin), 10),
"serf_protocol_max": strconv.FormatUint(uint64(member.ProtocolMax), 10),
"version": parts.Build.String(),
},
}
if parts.ExternalGRPCPort > 0 {
service.Meta["grpc_port"] = strconv.Itoa(parts.ExternalGRPCPort)
}
if parts.ExternalGRPCTLSPort > 0 {
service.Meta["grpc_tls_port"] = strconv.Itoa(parts.ExternalGRPCTLSPort)
}
// Attempt to join the consul server
if err := joinServer(member, parts); err != nil {
return err
}
}
// Check if the node exists
state := r.FSM.State()
_, node, err := state.GetNode(member.Name, nodeEntMeta, structs.DefaultPeerKeyword)
if err != nil {
return err
}
if node != nil && node.Address == member.Addr.String() {
// Check if the associated service is available
if service != nil {
match := false
_, services, err := state.NodeServices(nil, member.Name, nodeEntMeta, structs.DefaultPeerKeyword)
if err != nil {
return err
}
if services != nil {
for id, serv := range services.Services {
if id == service.ID {
// If metadata are different, be sure to update it
match = reflect.DeepEqual(serv.Meta, service.Meta)
}
}
}
if !match {
goto AFTER_CHECK
}
}
// Check if the serfCheck is in the passing state
_, checks, err := state.NodeChecks(nil, member.Name, nodeEntMeta, structs.DefaultPeerKeyword)
if err != nil {
return err
}
for _, check := range checks {
if check.CheckID == structs.SerfCheckID && check.Status == api.HealthPassing {
return nil
}
}
}
AFTER_CHECK:
r.Logger.Info("member joined, marking health alive",
"member", member.Name,
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(),
)
// Get consul version from serf member
// add this as node meta in catalog register request
buildVersion, err := metadata.Build(&member)
if err != nil {
return err
}
// Register with the catalog.
req := structs.RegisterRequest{
Datacenter: r.Datacenter,
Node: member.Name,
ID: types.NodeID(member.Tags["id"]),
Address: member.Addr.String(),
Service: service,
Check: &structs.HealthCheck{
Node: member.Name,
CheckID: structs.SerfCheckID,
Name: structs.SerfCheckName,
Status: api.HealthPassing,
Output: structs.SerfCheckAliveOutput,
},
EnterpriseMeta: *nodeEntMeta,
NodeMeta: map[string]string{
structs.MetaConsulVersion: buildVersion.String(),
},
}
if node != nil {
req.TaggedAddresses = node.TaggedAddresses
req.NodeMeta = node.Meta
}
_, err = r.RaftApplyFunc(structs.RegisterRequestType, &req)
return err
}
// HandleFailedMember is used to mark the node's status
// as being critical, along with all checks as unknown.
func (r V1ConsulRegistrator) HandleFailedMember(member serf.Member, nodeEntMeta *acl.EnterpriseMeta) error {
if nodeEntMeta == nil {
nodeEntMeta = structs.NodeEnterpriseMetaInDefaultPartition()
}
// Check if the node exists
state := r.FSM.State()
_, node, err := state.GetNode(member.Name, nodeEntMeta, structs.DefaultPeerKeyword)
if err != nil {
return err
}
if node == nil {
r.Logger.Info("ignoring failed event for member because it does not exist in the catalog",
"member", member.Name,
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(),
)
return nil
}
if node.Address == member.Addr.String() {
// Check if the serfCheck is in the critical state
_, checks, err := state.NodeChecks(nil, member.Name, nodeEntMeta, structs.DefaultPeerKeyword)
if err != nil {
return err
}
for _, check := range checks {
if check.CheckID == structs.SerfCheckID && check.Status == api.HealthCritical {
return nil
}
}
}
r.Logger.Info("member failed, marking health critical",
"member", member.Name,
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(),
)
// Register with the catalog
req := structs.RegisterRequest{
Datacenter: r.Datacenter,
Node: member.Name,
EnterpriseMeta: *nodeEntMeta,
ID: types.NodeID(member.Tags["id"]),
Address: member.Addr.String(),
Check: &structs.HealthCheck{
Node: member.Name,
CheckID: structs.SerfCheckID,
Name: structs.SerfCheckName,
Status: api.HealthCritical,
Output: structs.SerfCheckFailedOutput,
},
// If there's existing information about the node, do not
// clobber it.
SkipNodeUpdate: true,
}
_, err = r.RaftApplyFunc(structs.RegisterRequestType, &req)
return err
}
// HandleLeftMember is used to handle members that gracefully
// left. They are deregistered if necessary.
func (r V1ConsulRegistrator) HandleLeftMember(member serf.Member, nodeEntMeta *acl.EnterpriseMeta, removeServerFunc func(m serf.Member) error) error {
return r.handleDeregisterMember("left", member, nodeEntMeta, removeServerFunc)
}
// HandleReapMember is used to handle members that have been
// reaped after a prolonged failure. They are deregistered.
func (r V1ConsulRegistrator) HandleReapMember(member serf.Member, nodeEntMeta *acl.EnterpriseMeta, removeServerFunc func(m serf.Member) error) error {
return r.handleDeregisterMember("reaped", member, nodeEntMeta, removeServerFunc)
}
// handleDeregisterMember is used to deregister a member of a given reason
func (r V1ConsulRegistrator) handleDeregisterMember(reason string, member serf.Member, nodeEntMeta *acl.EnterpriseMeta, removeServerFunc func(m serf.Member) error) error {
if nodeEntMeta == nil {
nodeEntMeta = structs.NodeEnterpriseMetaInDefaultPartition()
}
// Do not deregister ourself. This can only happen if the current leader
// is leaving. Instead, we should allow a follower to take-over and
// deregister us later.
//
// TODO(partitions): check partitions here too? server names should be unique in general though
if strings.EqualFold(member.Name, r.NodeName) {
r.Logger.Warn("deregistering self should be done by follower",
"name", r.NodeName,
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(),
)
return nil
}
// Remove from Raft peers if this was a server
if valid, _ := metadata.IsConsulServer(member); valid {
if err := removeServerFunc(member); err != nil {
return err
}
}
// Check if the node does not exist
state := r.FSM.State()
_, node, err := state.GetNode(member.Name, nodeEntMeta, structs.DefaultPeerKeyword)
if err != nil {
return err
}
if node == nil {
return nil
}
// Deregister the node
r.Logger.Info("deregistering member",
"member", member.Name,
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(),
"reason", reason,
)
req := structs.DeregisterRequest{
Datacenter: r.Datacenter,
Node: member.Name,
EnterpriseMeta: *nodeEntMeta,
}
_, err = r.RaftApplyFunc(structs.DeregisterRequestType, &req)
return err
}

View File

@ -0,0 +1,887 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package consul
import (
"context"
"os"
"strconv"
"strings"
"testing"
"time"
"github.com/hashicorp/serf/serf"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/sdk/freeport"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc"
)
func TestLeader_RegisterMember(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLInitialManagementToken = "root"
c.ACLResolverSettings.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, c1 := testClient(t)
defer os.RemoveAll(dir2)
defer c1.Shutdown()
// Try to join
joinLAN(t, c1, s1)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// Client should be registered
state := s1.fsm.State()
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(c1.config.NodeName, nil, "")
if err != nil {
r.Fatalf("err: %v", err)
}
if node == nil {
r.Fatal("client not registered")
}
})
// Should have a check
_, checks, err := state.NodeChecks(nil, c1.config.NodeName, nil, "")
if err != nil {
t.Fatalf("err: %v", err)
}
if len(checks) != 1 {
t.Fatalf("client missing check")
}
if checks[0].CheckID != structs.SerfCheckID {
t.Fatalf("bad check: %v", checks[0])
}
if checks[0].Name != structs.SerfCheckName {
t.Fatalf("bad check: %v", checks[0])
}
if checks[0].Status != api.HealthPassing {
t.Fatalf("bad check: %v", checks[0])
}
// Server should be registered
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(s1.config.NodeName, nil, "")
if err != nil {
r.Fatalf("err: %v", err)
}
if node == nil {
r.Fatalf("server not registered")
}
})
// Service should be registered
_, services, err := state.NodeServices(nil, s1.config.NodeName, nil, "")
if err != nil {
t.Fatalf("err: %v", err)
}
if _, ok := services.Services["consul"]; !ok {
t.Fatalf("consul service not registered: %v", services)
}
}
func TestLeader_FailedMember(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLInitialManagementToken = "root"
c.ACLResolverSettings.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, c1 := testClient(t)
defer os.RemoveAll(dir2)
defer c1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// Try to join
joinLAN(t, c1, s1)
// Fail the member
c1.Shutdown()
// Should be registered
state := s1.fsm.State()
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(c1.config.NodeName, nil, "")
if err != nil {
r.Fatalf("err: %v", err)
}
if node == nil {
r.Fatal("client not registered")
}
})
// Should have a check
_, checks, err := state.NodeChecks(nil, c1.config.NodeName, nil, "")
if err != nil {
t.Fatalf("err: %v", err)
}
if len(checks) != 1 {
t.Fatalf("client missing check")
}
if checks[0].CheckID != structs.SerfCheckID {
t.Fatalf("bad check: %v", checks[0])
}
if checks[0].Name != structs.SerfCheckName {
t.Fatalf("bad check: %v", checks[0])
}
retry.Run(t, func(r *retry.R) {
_, checks, err = state.NodeChecks(nil, c1.config.NodeName, nil, "")
if err != nil {
r.Fatalf("err: %v", err)
}
if len(checks) != 1 {
r.Fatalf("client missing check")
}
if got, want := checks[0].Status, api.HealthCritical; got != want {
r.Fatalf("got status %q want %q", got, want)
}
})
}
func TestLeader_LeftMember(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLInitialManagementToken = "root"
c.ACLResolverSettings.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, c1 := testClient(t)
defer os.RemoveAll(dir2)
defer c1.Shutdown()
// Try to join
joinLAN(t, c1, s1)
state := s1.fsm.State()
// Should be registered
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(c1.config.NodeName, nil, "")
require.NoError(r, err)
require.NotNil(r, node, "client not registered")
})
// Node should leave
c1.Leave()
c1.Shutdown()
// Should be deregistered
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(c1.config.NodeName, nil, "")
require.NoError(r, err)
require.Nil(r, node, "client still registered")
})
}
func TestLeader_ReapMember(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLInitialManagementToken = "root"
c.ACLResolverSettings.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, c1 := testClient(t)
defer os.RemoveAll(dir2)
defer c1.Shutdown()
// Try to join
joinLAN(t, c1, s1)
state := s1.fsm.State()
// Should be registered
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(c1.config.NodeName, nil, "")
require.NoError(r, err)
require.NotNil(r, node, "client not registered")
})
// Simulate a node reaping
mems := s1.LANMembersInAgentPartition()
var c1mem serf.Member
for _, m := range mems {
if m.Name == c1.config.NodeName {
c1mem = m
c1mem.Status = StatusReap
break
}
}
s1.reconcileCh <- c1mem
// Should be deregistered; we have to poll quickly here because
// anti-entropy will put it back.
reaped := false
for start := time.Now(); time.Since(start) < 5*time.Second; {
_, node, err := state.GetNode(c1.config.NodeName, nil, "")
require.NoError(t, err)
if node == nil {
reaped = true
break
}
}
if !reaped {
t.Fatalf("client should not be registered")
}
}
func TestLeader_ReapOrLeftMember_IgnoreSelf(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
run := func(t *testing.T, status serf.MemberStatus, nameFn func(string) string) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLInitialManagementToken = "root"
c.ACLResolverSettings.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
nodeName := s1.config.NodeName
if nameFn != nil {
nodeName = nameFn(nodeName)
}
state := s1.fsm.State()
// Should be registered
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(nodeName, nil, "")
require.NoError(r, err)
require.NotNil(r, node, "server not registered")
})
// Simulate THIS node reaping or leaving
mems := s1.LANMembersInAgentPartition()
var s1mem serf.Member
for _, m := range mems {
if strings.EqualFold(m.Name, nodeName) {
s1mem = m
s1mem.Status = status
s1mem.Name = nodeName
break
}
}
s1.reconcileCh <- s1mem
// Should NOT be deregistered; we have to poll quickly here because
// anti-entropy will put it back if it did get deleted.
reaped := false
for start := time.Now(); time.Since(start) < 5*time.Second; {
_, node, err := state.GetNode(nodeName, nil, "")
require.NoError(t, err)
if node == nil {
reaped = true
break
}
}
if reaped {
t.Fatalf("server should still be registered")
}
}
t.Run("original name", func(t *testing.T) {
t.Parallel()
t.Run("left", func(t *testing.T) {
run(t, serf.StatusLeft, nil)
})
t.Run("reap", func(t *testing.T) {
run(t, StatusReap, nil)
})
})
t.Run("uppercased name", func(t *testing.T) {
t.Parallel()
t.Run("left", func(t *testing.T) {
run(t, serf.StatusLeft, strings.ToUpper)
})
t.Run("reap", func(t *testing.T) {
run(t, StatusReap, strings.ToUpper)
})
})
}
func TestLeader_CheckServersMeta(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
ports := freeport.GetN(t, 2) // s3 grpc, s3 grpc_tls
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLInitialManagementToken = "root"
c.ACLResolverSettings.ACLDefaultPolicy = "allow"
c.Bootstrap = true
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLInitialManagementToken = "root"
c.ACLResolverSettings.ACLDefaultPolicy = "allow"
c.Bootstrap = false
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLInitialManagementToken = "root"
c.ACLResolverSettings.ACLDefaultPolicy = "allow"
c.Bootstrap = false
c.GRPCPort = ports[0]
c.GRPCTLSPort = ports[1]
})
defer os.RemoveAll(dir3)
defer s3.Shutdown()
// Try to join
joinLAN(t, s1, s2)
joinLAN(t, s1, s3)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForLeader(t, s2.RPC, "dc1")
testrpc.WaitForLeader(t, s3.RPC, "dc1")
state := s1.fsm.State()
consulService := &structs.NodeService{
ID: "consul",
Service: "consul",
}
// s3 should be registered
retry.Run(t, func(r *retry.R) {
_, service, err := state.NodeService(nil, s3.config.NodeName, "consul", &consulService.EnterpriseMeta, "")
if err != nil {
r.Fatalf("err: %v", err)
}
if service == nil {
r.Fatal("client not registered")
}
if service.Meta["non_voter"] != "false" {
r.Fatalf("Expected to be non_voter == false, was: %s", service.Meta["non_voter"])
}
})
member := serf.Member{}
for _, m := range s1.serfLAN.Members() {
if m.Name == s3.config.NodeName {
member = m
member.Tags = make(map[string]string)
for key, value := range m.Tags {
member.Tags[key] = value
}
}
}
if member.Name != s3.config.NodeName {
t.Fatal("could not find node in serf members")
}
versionToExpect := "19.7.9"
retry.Run(t, func(r *retry.R) {
// DEPRECATED - remove nonvoter tag in favor of read_replica in a future version of consul
member.Tags["nonvoter"] = "1"
member.Tags["read_replica"] = "1"
member.Tags["build"] = versionToExpect
err := s1.registrator.HandleAliveMember(member, nil, s1.joinConsulServer)
if err != nil {
r.Fatalf("Unexpected error :%v", err)
}
_, service, err := state.NodeService(nil, s3.config.NodeName, "consul", &consulService.EnterpriseMeta, "")
if err != nil {
r.Fatalf("err: %v", err)
}
if service == nil {
r.Fatal("client not registered")
}
// DEPRECATED - remove non_voter in favor of read_replica in a future version of consul
if service.Meta["non_voter"] != "true" {
r.Fatalf("Expected to be non_voter == true, was: %s", service.Meta["non_voter"])
}
if service.Meta["read_replica"] != "true" {
r.Fatalf("Expected to be read_replica == true, was: %s", service.Meta["non_voter"])
}
newVersion := service.Meta["version"]
if newVersion != versionToExpect {
r.Fatalf("Expected version to be updated to %s, was %s", versionToExpect, newVersion)
}
grpcPort := service.Meta["grpc_port"]
if grpcPort != strconv.Itoa(ports[0]) {
r.Fatalf("Expected grpc port to be %d, was %s", ports[0], grpcPort)
}
grpcTLSPort := service.Meta["grpc_tls_port"]
if grpcTLSPort != strconv.Itoa(ports[1]) {
r.Fatalf("Expected grpc tls port to be %d, was %s", ports[1], grpcTLSPort)
}
})
}
func TestLeader_ReapServer(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLInitialManagementToken = "root"
c.ACLResolverSettings.ACLDefaultPolicy = "allow"
c.Bootstrap = true
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLInitialManagementToken = "root"
c.ACLResolverSettings.ACLDefaultPolicy = "allow"
c.Bootstrap = false
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLInitialManagementToken = "root"
c.ACLResolverSettings.ACLDefaultPolicy = "allow"
c.Bootstrap = false
})
defer os.RemoveAll(dir3)
defer s3.Shutdown()
// Try to join
joinLAN(t, s1, s2)
joinLAN(t, s1, s3)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForLeader(t, s2.RPC, "dc1")
testrpc.WaitForLeader(t, s3.RPC, "dc1")
state := s1.fsm.State()
// s3 should be registered
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(s3.config.NodeName, nil, "")
if err != nil {
r.Fatalf("err: %v", err)
}
if node == nil {
r.Fatal("client not registered")
}
})
// call reconcileReaped with a map that does not contain s3
knownMembers := make(map[string]struct{})
knownMembers[s1.config.NodeName] = struct{}{}
knownMembers[s2.config.NodeName] = struct{}{}
err := s1.reconcileReaped(knownMembers, nil)
if err != nil {
t.Fatalf("Unexpected error :%v", err)
}
// s3 should be deregistered
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(s3.config.NodeName, nil, "")
if err != nil {
r.Fatalf("err: %v", err)
}
if node != nil {
r.Fatalf("server with id %v should not be registered", s3.config.NodeID)
}
})
}
func TestLeader_Reconcile_ReapMember(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLInitialManagementToken = "root"
c.ACLResolverSettings.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// Register a non-existing member
dead := structs.RegisterRequest{
Datacenter: s1.config.Datacenter,
Node: "no-longer-around",
Address: "127.1.1.1",
Check: &structs.HealthCheck{
Node: "no-longer-around",
CheckID: structs.SerfCheckID,
Name: structs.SerfCheckName,
Status: api.HealthCritical,
},
WriteRequest: structs.WriteRequest{
Token: "root",
},
}
var out struct{}
if err := s1.RPC(context.Background(), "Catalog.Register", &dead, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Force a reconciliation
if err := s1.reconcile(); err != nil {
t.Fatalf("err: %v", err)
}
// Node should be gone
state := s1.fsm.State()
_, node, err := state.GetNode("no-longer-around", nil, "")
if err != nil {
t.Fatalf("err: %v", err)
}
if node != nil {
t.Fatalf("client registered")
}
}
func TestLeader_Reconcile(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLInitialManagementToken = "root"
c.ACLResolverSettings.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, c1 := testClient(t)
defer os.RemoveAll(dir2)
defer c1.Shutdown()
// Join before we have a leader, this should cause a reconcile!
joinLAN(t, c1, s1)
// Should not be registered
state := s1.fsm.State()
_, node, err := state.GetNode(c1.config.NodeName, nil, "")
if err != nil {
t.Fatalf("err: %v", err)
}
if node != nil {
t.Fatalf("client registered")
}
// Should be registered
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(c1.config.NodeName, nil, "")
if err != nil {
r.Fatalf("err: %v", err)
}
if node == nil {
r.Fatal("client not registered")
}
})
}
func TestLeader_Reconcile_Races(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
dir2, c1 := testClient(t)
defer os.RemoveAll(dir2)
defer c1.Shutdown()
joinLAN(t, c1, s1)
// Wait for the server to reconcile the client and register it.
state := s1.fsm.State()
var nodeAddr string
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(c1.config.NodeName, nil, "")
if err != nil {
r.Fatalf("err: %v", err)
}
if node == nil {
r.Fatal("client not registered")
}
nodeAddr = node.Address
})
// Add in some metadata via the catalog (as if the agent synced it
// there). We also set the serfHealth check to failing so the reconcile
// will attempt to flip it back
req := structs.RegisterRequest{
Datacenter: s1.config.Datacenter,
Node: c1.config.NodeName,
ID: c1.config.NodeID,
Address: nodeAddr,
NodeMeta: map[string]string{"hello": "world"},
Check: &structs.HealthCheck{
Node: c1.config.NodeName,
CheckID: structs.SerfCheckID,
Name: structs.SerfCheckName,
Status: api.HealthCritical,
Output: "",
},
}
var out struct{}
if err := s1.RPC(context.Background(), "Catalog.Register", &req, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Force a reconcile and make sure the metadata stuck around.
if err := s1.reconcile(); err != nil {
t.Fatalf("err: %v", err)
}
_, node, err := state.GetNode(c1.config.NodeName, nil, "")
if err != nil {
t.Fatalf("err: %v", err)
}
if node == nil {
t.Fatalf("bad")
}
if hello, ok := node.Meta["hello"]; !ok || hello != "world" {
t.Fatalf("bad")
}
// Fail the member and wait for the health to go critical.
c1.Shutdown()
retry.Run(t, func(r *retry.R) {
_, checks, err := state.NodeChecks(nil, c1.config.NodeName, nil, "")
if err != nil {
r.Fatalf("err: %v", err)
}
if len(checks) != 1 {
r.Fatalf("client missing check")
}
if got, want := checks[0].Status, api.HealthCritical; got != want {
r.Fatalf("got state %q want %q", got, want)
}
})
// Make sure the metadata didn't get clobbered.
_, node, err = state.GetNode(c1.config.NodeName, nil, "")
if err != nil {
t.Fatalf("err: %v", err)
}
if node == nil {
t.Fatalf("bad")
}
if hello, ok := node.Meta["hello"]; !ok || hello != "world" {
t.Fatalf("bad")
}
}
func TestLeader_LeftServer(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir3)
defer s3.Shutdown()
// Put s1 last so we don't trigger a leader election.
servers := []*Server{s2, s3, s1}
// Try to join
joinLAN(t, s2, s1)
joinLAN(t, s3, s1)
for _, s := range servers {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) })
}
// Kill any server
servers[0].Shutdown()
// Force remove the non-leader (transition to left state)
if err := servers[1].RemoveFailedNode(servers[0].config.NodeName, false, nil); err != nil {
t.Fatalf("err: %v", err)
}
// Wait until the remaining servers show only 2 peers.
for _, s := range servers[1:] {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 2)) })
}
s1.Shutdown()
}
func TestLeader_LeftLeader(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir3)
defer s3.Shutdown()
servers := []*Server{s1, s2, s3}
// Try to join
joinLAN(t, s2, s1)
joinLAN(t, s3, s1)
for _, s := range servers {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) })
}
// Kill the leader!
var leader *Server
for _, s := range servers {
if s.IsLeader() {
leader = s
break
}
}
if leader == nil {
t.Fatalf("Should have a leader")
}
if !leader.isReadyForConsistentReads() {
t.Fatalf("Expected leader to be ready for consistent reads ")
}
leader.Leave()
if leader.isReadyForConsistentReads() {
t.Fatalf("Expected consistent read state to be false ")
}
leader.Shutdown()
time.Sleep(100 * time.Millisecond)
var remain *Server
for _, s := range servers {
if s == leader {
continue
}
remain = s
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 2)) })
}
// Verify the old leader is deregistered
state := remain.fsm.State()
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(leader.config.NodeName, nil, "")
if err != nil {
r.Fatalf("err: %v", err)
}
if node != nil {
r.Fatal("leader should be deregistered")
}
})
}
func TestLeader_MultiBootstrap(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServer(t)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
servers := []*Server{s1, s2}
// Try to join
joinLAN(t, s2, s1)
for _, s := range servers {
retry.Run(t, func(r *retry.R) {
if got, want := len(s.serfLAN.Members()), 2; got != want {
r.Fatalf("got %d peers want %d", got, want)
}
})
}
// Ensure we don't have multiple raft peers
for _, s := range servers {
peers, _ := s.autopilot.NumVoters()
if peers != 1 {
t.Fatalf("should only have 1 raft peer!")
}
}
}

View File

@ -0,0 +1,407 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package consul
import (
"context"
"fmt"
"strconv"
"strings"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/serf/serf"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/anypb"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/internal/resource"
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1"
"github.com/hashicorp/consul/proto-public/pbresource"
"github.com/hashicorp/consul/types"
)
const (
consulWorkloadPrefix = "consul-server-"
consulPortNameServer = "server"
)
var _ ConsulRegistrator = (*V2ConsulRegistrator)(nil)
var resourceCmpOptions = []cmp.Option{
protocmp.IgnoreFields(&pbresource.Resource{}, "status", "generation", "version"),
protocmp.IgnoreFields(&pbresource.ID{}, "uid"),
protocmp.Transform(),
// Stringify any type passed to the sorter so that we can reliably compare most values.
cmpopts.SortSlices(func(a, b any) bool { return fmt.Sprintf("%v", a) < fmt.Sprintf("%v", b) }),
}
type V2ConsulRegistrator struct {
Logger hclog.Logger
NodeName string
EntMeta *acl.EnterpriseMeta
Client pbresource.ResourceServiceClient
}
// HandleAliveMember is used to ensure the server is registered as a Workload
// with a passing health check.
func (r V2ConsulRegistrator) HandleAliveMember(member serf.Member, nodeEntMeta *acl.EnterpriseMeta, joinServer func(m serf.Member, parts *metadata.Server) error) error {
valid, parts := metadata.IsConsulServer(member)
if !valid {
return nil
}
if nodeEntMeta == nil {
nodeEntMeta = structs.NodeEnterpriseMetaInDefaultPartition()
}
// Attempt to join the consul server, regardless of the existing catalog state
if err := joinServer(member, parts); err != nil {
return err
}
r.Logger.Info("member joined, creating catalog entries",
"member", member.Name,
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(),
)
workloadResource, err := r.createWorkloadFromMember(member, parts, nodeEntMeta)
if err != nil {
return err
}
// Check if the Workload already exists and if it's the same
res, err := r.Client.Read(context.TODO(), &pbresource.ReadRequest{Id: workloadResource.Id})
if err != nil && !grpcNotFoundErr(err) {
return fmt.Errorf("error checking for existing Workload %s: %w", workloadResource.Id.Name, err)
}
if err == nil {
existingWorkload := res.GetResource()
r.Logger.Debug("existing Workload matching the member found",
"member", member.Name,
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(),
)
// If the Workload is identical, move to updating the health status
if cmp.Equal(workloadResource, existingWorkload, resourceCmpOptions...) {
r.Logger.Debug("no updates to perform on member Workload",
"member", member.Name,
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(),
)
goto HEALTHSTATUS
}
// If the existing Workload different, add the existing Version into the patch for CAS write
workloadResource.Id = existingWorkload.Id
workloadResource.Version = existingWorkload.Version
}
if _, err := r.Client.Write(context.TODO(), &pbresource.WriteRequest{Resource: workloadResource}); err != nil {
return fmt.Errorf("failed to write Workload %s: %w", workloadResource.Id.Name, err)
}
r.Logger.Info("updated consul Workload in catalog",
"member", member.Name,
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(),
)
HEALTHSTATUS:
hsResource, err := r.createHealthStatusFromMember(member, workloadResource.Id, true, nodeEntMeta)
if err != nil {
return err
}
// Check if the HealthStatus already exists and if it's the same
res, err = r.Client.Read(context.TODO(), &pbresource.ReadRequest{Id: hsResource.Id})
if err != nil && !grpcNotFoundErr(err) {
return fmt.Errorf("error checking for existing HealthStatus %s: %w", hsResource.Id.Name, err)
}
if err == nil {
existingHS := res.GetResource()
r.Logger.Debug("existing HealthStatus matching the member found",
"member", member.Name,
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(),
)
// If the HealthStatus is identical, we're done.
if cmp.Equal(hsResource, existingHS, resourceCmpOptions...) {
r.Logger.Debug("no updates to perform on member HealthStatus",
"member", member.Name,
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(),
)
return nil
}
// If the existing HealthStatus is different, add the Version to the patch for CAS write.
hsResource.Id = existingHS.Id
hsResource.Version = existingHS.Version
}
if _, err := r.Client.Write(context.TODO(), &pbresource.WriteRequest{Resource: hsResource}); err != nil {
return fmt.Errorf("failed to write HealthStatus %s: %w", hsResource.Id.Name, err)
}
r.Logger.Info("updated consul HealthStatus in catalog",
"member", member.Name,
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(),
)
return nil
}
func (r V2ConsulRegistrator) createWorkloadFromMember(member serf.Member, parts *metadata.Server, nodeEntMeta *acl.EnterpriseMeta) (*pbresource.Resource, error) {
workloadMeta := map[string]string{
"read_replica": strconv.FormatBool(member.Tags["read_replica"] == "1"),
"raft_version": strconv.Itoa(parts.RaftVersion),
"serf_protocol_current": strconv.FormatUint(uint64(member.ProtocolCur), 10),
"serf_protocol_min": strconv.FormatUint(uint64(member.ProtocolMin), 10),
"serf_protocol_max": strconv.FormatUint(uint64(member.ProtocolMax), 10),
"version": parts.Build.String(),
}
if parts.ExternalGRPCPort > 0 {
workloadMeta["grpc_port"] = strconv.Itoa(parts.ExternalGRPCPort)
}
if parts.ExternalGRPCTLSPort > 0 {
workloadMeta["grpc_tls_port"] = strconv.Itoa(parts.ExternalGRPCTLSPort)
}
workload := &pbcatalog.Workload{
Addresses: []*pbcatalog.WorkloadAddress{
{Host: member.Addr.String(), Ports: []string{consulPortNameServer}},
},
// Don't include identity since Consul is not routable through the mesh.
// Don't include locality because these values are not passed along through serf, and they are probably
// different from the leader's values.
Ports: map[string]*pbcatalog.WorkloadPort{
consulPortNameServer: {
Port: uint32(parts.Port),
Protocol: pbcatalog.Protocol_PROTOCOL_TCP,
},
// TODO: add other agent ports
},
}
workloadData, err := anypb.New(workload)
if err != nil {
return nil, fmt.Errorf("could not convert Workload to 'any' type: %w", err)
}
workloadId := &pbresource.ID{
Name: fmt.Sprintf("%s%s", consulWorkloadPrefix, types.NodeID(member.Tags["id"])),
Type: pbcatalog.WorkloadType,
Tenancy: resource.DefaultNamespacedTenancy(),
}
workloadId.Tenancy.Partition = nodeEntMeta.PartitionOrDefault()
return &pbresource.Resource{
Id: workloadId,
Data: workloadData,
Metadata: workloadMeta,
}, nil
}
func (r V2ConsulRegistrator) createHealthStatusFromMember(member serf.Member, workloadId *pbresource.ID, passing bool, nodeEntMeta *acl.EnterpriseMeta) (*pbresource.Resource, error) {
hs := &pbcatalog.HealthStatus{
Type: string(structs.SerfCheckID),
Description: structs.SerfCheckName,
}
if passing {
hs.Status = pbcatalog.Health_HEALTH_PASSING
hs.Output = structs.SerfCheckAliveOutput
} else {
hs.Status = pbcatalog.Health_HEALTH_CRITICAL
hs.Output = structs.SerfCheckFailedOutput
}
hsData, err := anypb.New(hs)
if err != nil {
return nil, fmt.Errorf("could not convert HealthStatus to 'any' type: %w", err)
}
hsId := &pbresource.ID{
Name: fmt.Sprintf("%s%s", consulWorkloadPrefix, types.NodeID(member.Tags["id"])),
Type: pbcatalog.HealthStatusType,
Tenancy: resource.DefaultNamespacedTenancy(),
}
hsId.Tenancy.Partition = nodeEntMeta.PartitionOrDefault()
return &pbresource.Resource{
Id: hsId,
Data: hsData,
Owner: workloadId,
}, nil
}
// HandleFailedMember is used to mark the workload's associated HealthStatus.
func (r V2ConsulRegistrator) HandleFailedMember(member serf.Member, nodeEntMeta *acl.EnterpriseMeta) error {
if valid, _ := metadata.IsConsulServer(member); !valid {
return nil
}
if nodeEntMeta == nil {
nodeEntMeta = structs.NodeEnterpriseMetaInDefaultPartition()
}
r.Logger.Info("member failed",
"member", member.Name,
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(),
)
// Validate that the associated workload exists
workloadId := &pbresource.ID{
Name: fmt.Sprintf("%s%s", consulWorkloadPrefix, types.NodeID(member.Tags["id"])),
Type: pbcatalog.WorkloadType,
Tenancy: resource.DefaultNamespacedTenancy(),
}
workloadId.Tenancy.Partition = nodeEntMeta.PartitionOrDefault()
res, err := r.Client.Read(context.TODO(), &pbresource.ReadRequest{Id: workloadId})
if err != nil && !grpcNotFoundErr(err) {
return fmt.Errorf("error checking for existing Workload %s: %w", workloadId.Name, err)
}
if grpcNotFoundErr(err) {
r.Logger.Info("ignoring failed event for member because it does not exist in the catalog",
"member", member.Name,
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(),
)
return nil
}
// Overwrite the workload ID with the one that has UID populated.
existingWorkload := res.GetResource()
hsResource, err := r.createHealthStatusFromMember(member, existingWorkload.Id, false, nodeEntMeta)
if err != nil {
return err
}
res, err = r.Client.Read(context.TODO(), &pbresource.ReadRequest{Id: hsResource.Id})
if err != nil && !grpcNotFoundErr(err) {
return fmt.Errorf("error checking for existing HealthStatus %s: %w", hsResource.Id.Name, err)
}
if err == nil {
existingHS := res.GetResource()
r.Logger.Debug("existing HealthStatus matching the member found",
"member", member.Name,
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(),
)
// If the HealthStatus is identical, we're done.
if cmp.Equal(hsResource, existingHS, resourceCmpOptions...) {
r.Logger.Debug("no updates to perform on member HealthStatus",
"member", member.Name,
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(),
)
return nil
}
// If the existing HealthStatus is different, add the Version to the patch for CAS write.
hsResource.Id = existingHS.Id
hsResource.Version = existingHS.Version
}
if _, err := r.Client.Write(context.TODO(), &pbresource.WriteRequest{Resource: hsResource}); err != nil {
return fmt.Errorf("failed to write HealthStatus %s: %w", hsResource.Id.Name, err)
}
r.Logger.Info("updated consul HealthStatus in catalog",
"member", member.Name,
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(),
)
return nil
}
// HandleLeftMember is used to handle members that gracefully
// left. They are removed if necessary.
func (r V2ConsulRegistrator) HandleLeftMember(member serf.Member, nodeEntMeta *acl.EnterpriseMeta, removeServerFunc func(m serf.Member) error) error {
return r.handleDeregisterMember("left", member, nodeEntMeta, removeServerFunc)
}
// HandleReapMember is used to handle members that have been
// reaped after a prolonged failure. They are removed from the catalog.
func (r V2ConsulRegistrator) HandleReapMember(member serf.Member, nodeEntMeta *acl.EnterpriseMeta, removeServerFunc func(m serf.Member) error) error {
return r.handleDeregisterMember("reaped", member, nodeEntMeta, removeServerFunc)
}
// handleDeregisterMember is used to remove a member of a given reason
func (r V2ConsulRegistrator) handleDeregisterMember(reason string, member serf.Member, nodeEntMeta *acl.EnterpriseMeta, removeServerFunc func(m serf.Member) error) error {
if valid, _ := metadata.IsConsulServer(member); !valid {
return nil
}
if nodeEntMeta == nil {
nodeEntMeta = structs.NodeEnterpriseMetaInDefaultPartition()
}
r.Logger.Info("removing member",
"member", member.Name,
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(),
"reason", reason,
)
if err := removeServerFunc(member); err != nil {
return err
}
// Do not remove our self. This can only happen if the current leader
// is leaving. Instead, we should allow a follower to take-over and
// remove us later.
if strings.EqualFold(member.Name, r.NodeName) &&
strings.EqualFold(nodeEntMeta.PartitionOrDefault(), r.EntMeta.PartitionOrDefault()) {
r.Logger.Warn("removing self should be done by follower",
"name", r.NodeName,
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(),
"reason", reason,
)
return nil
}
// Check if the workload exists
workloadID := &pbresource.ID{
Name: fmt.Sprintf("%s%s", consulWorkloadPrefix, types.NodeID(member.Tags["id"])),
Type: pbcatalog.WorkloadType,
Tenancy: resource.DefaultNamespacedTenancy(),
}
workloadID.Tenancy.Partition = nodeEntMeta.PartitionOrDefault()
res, err := r.Client.Read(context.TODO(), &pbresource.ReadRequest{Id: workloadID})
if err != nil && !grpcNotFoundErr(err) {
return fmt.Errorf("error checking for existing Workload %s: %w", workloadID.Name, err)
}
if grpcNotFoundErr(err) {
r.Logger.Info("ignoring reap event for member because it does not exist in the catalog",
"member", member.Name,
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(),
)
return nil
}
existingWorkload := res.GetResource()
// The HealthStatus should be reaped automatically
if _, err := r.Client.Delete(context.TODO(), &pbresource.DeleteRequest{Id: existingWorkload.Id}); err != nil {
return fmt.Errorf("failed to delete Workload %s: %w", existingWorkload.Id.Name, err)
}
r.Logger.Info("deleted consul Workload",
"member", member.Name,
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(),
)
return err
}
func grpcNotFoundErr(err error) bool {
if err == nil {
return false
}
s, ok := status.FromError(err)
return ok && s.Code() == codes.NotFound
}

View File

@ -0,0 +1,583 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package consul
import (
"fmt"
"net"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/serf/serf"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/structs"
mockpbresource "github.com/hashicorp/consul/grpcmocks/proto-public/pbresource"
"github.com/hashicorp/consul/internal/resource"
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1"
"github.com/hashicorp/consul/proto-public/pbresource"
)
var (
fakeWrappedErr = fmt.Errorf("fake test error")
)
type testCase struct {
name string
member serf.Member
nodeNameOverride string // This is used in the HandleLeftMember test to avoid deregistering ourself
existingWorkload *pbresource.Resource
workloadReadErr bool
workloadWriteErr bool
workloadDeleteErr bool
existingHealthStatus *pbresource.Resource
healthstatusReadErr bool
healthstatusWriteErr bool
mutatedWorkload *pbresource.Resource // leaving one of these out means the mock expects not to have a write/delete called
mutatedHealthStatus *pbresource.Resource
expErr string
}
func Test_HandleAliveMember(t *testing.T) {
t.Parallel()
run := func(t *testing.T, tt testCase) {
client := mockpbresource.NewResourceServiceClient(t)
mockClient := client.EXPECT()
// Build mock expectations based on the order of HandleAliveMember resource calls
setupReadExpectation(t, mockClient, getTestWorkloadId(), tt.existingWorkload, tt.workloadReadErr)
setupWriteExpectation(t, mockClient, tt.mutatedWorkload, tt.workloadWriteErr)
if !tt.workloadReadErr && !tt.workloadWriteErr {
// We expect to bail before this read if there is an error earlier in the function
setupReadExpectation(t, mockClient, getTestHealthstatusId(), tt.existingHealthStatus, tt.healthstatusReadErr)
}
setupWriteExpectation(t, mockClient, tt.mutatedHealthStatus, tt.healthstatusWriteErr)
registrator := V2ConsulRegistrator{
Logger: hclog.New(&hclog.LoggerOptions{}),
NodeName: "test-server-1",
Client: client,
}
// Mock join function
var joinMockCalled bool
joinMock := func(_ serf.Member, _ *metadata.Server) error {
joinMockCalled = true
return nil
}
err := registrator.HandleAliveMember(tt.member, acl.DefaultEnterpriseMeta(), joinMock)
if tt.expErr != "" {
require.Contains(t, err.Error(), tt.expErr)
} else {
require.NoError(t, err)
}
require.True(t, joinMockCalled, "the mock join function was not called")
}
tests := []testCase{
{
name: "New alive member",
member: getTestSerfMember(serf.StatusAlive),
mutatedWorkload: getTestWorkload(t),
mutatedHealthStatus: getTestHealthStatus(t, true),
},
{
name: "No updates needed",
member: getTestSerfMember(serf.StatusAlive),
existingWorkload: getTestWorkload(t),
existingHealthStatus: getTestHealthStatus(t, true),
},
{
name: "Existing Workload and HS need to be updated",
member: getTestSerfMember(serf.StatusAlive),
existingWorkload: getTestWorkloadWithPort(t, 8301),
existingHealthStatus: getTestHealthStatus(t, false),
mutatedWorkload: getTestWorkload(t),
mutatedHealthStatus: getTestHealthStatus(t, true),
},
{
name: "Only the HS needs to be updated",
member: getTestSerfMember(serf.StatusAlive),
existingWorkload: getTestWorkload(t),
existingHealthStatus: getTestHealthStatus(t, false),
mutatedHealthStatus: getTestHealthStatus(t, true),
},
{
name: "Error reading Workload",
member: getTestSerfMember(serf.StatusAlive),
workloadReadErr: true,
expErr: "error checking for existing Workload",
},
{
name: "Error writing Workload",
member: getTestSerfMember(serf.StatusAlive),
workloadWriteErr: true,
mutatedWorkload: getTestWorkload(t),
expErr: "failed to write Workload",
},
{
name: "Error reading HealthStatus",
member: getTestSerfMember(serf.StatusAlive),
healthstatusReadErr: true,
mutatedWorkload: getTestWorkload(t),
expErr: "error checking for existing HealthStatus",
},
{
name: "Error writing HealthStatus",
member: getTestSerfMember(serf.StatusAlive),
healthstatusWriteErr: true,
mutatedWorkload: getTestWorkload(t),
mutatedHealthStatus: getTestHealthStatus(t, true),
expErr: "failed to write HealthStatus",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
run(t, tt)
})
}
}
func Test_HandleFailedMember(t *testing.T) {
t.Parallel()
run := func(t *testing.T, tt testCase) {
client := mockpbresource.NewResourceServiceClient(t)
mockClient := client.EXPECT()
// Build mock expectations based on the order of HandleFailed resource calls
setupReadExpectation(t, mockClient, getTestWorkloadId(), tt.existingWorkload, tt.workloadReadErr)
if !tt.workloadReadErr && tt.existingWorkload != nil {
// We expect to bail before this read if there is an error earlier in the function or there is no workload
setupReadExpectation(t, mockClient, getTestHealthstatusId(), tt.existingHealthStatus, tt.healthstatusReadErr)
}
setupWriteExpectation(t, mockClient, tt.mutatedHealthStatus, tt.healthstatusWriteErr)
registrator := V2ConsulRegistrator{
Logger: hclog.New(&hclog.LoggerOptions{}),
NodeName: "test-server-1",
Client: client,
}
err := registrator.HandleFailedMember(tt.member, acl.DefaultEnterpriseMeta())
if tt.expErr != "" {
require.Contains(t, err.Error(), tt.expErr)
} else {
require.NoError(t, err)
}
}
tests := []testCase{
{
name: "Update non-existent HealthStatus",
member: getTestSerfMember(serf.StatusFailed),
existingWorkload: getTestWorkload(t),
mutatedHealthStatus: getTestHealthStatus(t, false),
},
{
name: "Underlying Workload does not exist",
member: getTestSerfMember(serf.StatusFailed),
},
{
name: "Update an existing HealthStatus",
member: getTestSerfMember(serf.StatusFailed),
existingWorkload: getTestWorkload(t),
existingHealthStatus: getTestHealthStatus(t, true),
mutatedHealthStatus: getTestHealthStatus(t, false),
},
{
name: "HealthStatus is already critical - no updates needed",
member: getTestSerfMember(serf.StatusFailed),
existingWorkload: getTestWorkload(t),
existingHealthStatus: getTestHealthStatus(t, false),
},
{
name: "Error reading Workload",
member: getTestSerfMember(serf.StatusFailed),
workloadReadErr: true,
expErr: "error checking for existing Workload",
},
{
name: "Error reading HealthStatus",
member: getTestSerfMember(serf.StatusFailed),
existingWorkload: getTestWorkload(t),
healthstatusReadErr: true,
expErr: "error checking for existing HealthStatus",
},
{
name: "Error writing HealthStatus",
member: getTestSerfMember(serf.StatusFailed),
existingWorkload: getTestWorkload(t),
healthstatusWriteErr: true,
mutatedHealthStatus: getTestHealthStatus(t, false),
expErr: "failed to write HealthStatus",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
run(t, tt)
})
}
}
// Test_HandleLeftMember also tests HandleReapMembers, which are the same core logic with some different logs.
func Test_HandleLeftMember(t *testing.T) {
t.Parallel()
run := func(t *testing.T, tt testCase) {
client := mockpbresource.NewResourceServiceClient(t)
mockClient := client.EXPECT()
// Build mock expectations based on the order of HandleLeftMember resource calls
// We check for the override, which we use to skip self de-registration
if tt.nodeNameOverride == "" {
setupReadExpectation(t, mockClient, getTestWorkloadId(), tt.existingWorkload, tt.workloadReadErr)
if tt.existingWorkload != nil && !tt.workloadReadErr {
setupDeleteExpectation(t, mockClient, tt.mutatedWorkload, tt.workloadDeleteErr)
}
}
nodeName := "test-server-2" // This is not the same as the serf node so we don't dergister ourself.
if tt.nodeNameOverride != "" {
nodeName = tt.nodeNameOverride
}
registrator := V2ConsulRegistrator{
Logger: hclog.New(&hclog.LoggerOptions{}),
NodeName: nodeName, // We change this so that we don't deregister ourself
Client: client,
}
// Mock join function
var removeMockCalled bool
removeMock := func(_ serf.Member) error {
removeMockCalled = true
return nil
}
err := registrator.HandleLeftMember(tt.member, acl.DefaultEnterpriseMeta(), removeMock)
if tt.expErr != "" {
require.Contains(t, err.Error(), tt.expErr)
} else {
require.NoError(t, err)
}
require.True(t, removeMockCalled, "the mock remove function was not called")
}
tests := []testCase{
{
name: "Remove member",
member: getTestSerfMember(serf.StatusAlive),
existingWorkload: getTestWorkload(t),
mutatedWorkload: getTestWorkload(t),
},
{
name: "Don't deregister ourself",
member: getTestSerfMember(serf.StatusAlive),
nodeNameOverride: "test-server-1",
},
{
name: "Don't do anything if the Workload is already gone",
member: getTestSerfMember(serf.StatusAlive),
},
{
name: "Remove member regardless of Workload payload",
member: getTestSerfMember(serf.StatusAlive),
existingWorkload: getTestWorkloadWithPort(t, 8301),
mutatedWorkload: getTestWorkload(t),
},
{
name: "Error reading Workload",
member: getTestSerfMember(serf.StatusAlive),
workloadReadErr: true,
expErr: "error checking for existing Workload",
},
{
name: "Error deleting Workload",
member: getTestSerfMember(serf.StatusAlive),
workloadDeleteErr: true,
existingWorkload: getTestWorkloadWithPort(t, 8301),
mutatedWorkload: getTestWorkload(t),
expErr: "failed to delete Workload",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
run(t, tt)
})
}
}
func setupReadExpectation(
t *testing.T,
mockClient *mockpbresource.ResourceServiceClient_Expecter,
expectedId *pbresource.ID,
existingResource *pbresource.Resource,
sendErr bool) {
if sendErr {
mockClient.Read(mock.Anything, mock.Anything).
Return(nil, fakeWrappedErr).
Once().
Run(func(args mock.Arguments) {
req := args.Get(1).(*pbresource.ReadRequest)
require.True(t, proto.Equal(expectedId, req.Id))
})
} else if existingResource != nil {
mockClient.Read(mock.Anything, mock.Anything).
Return(&pbresource.ReadResponse{
Resource: existingResource,
}, nil).
Once().
Run(func(args mock.Arguments) {
req := args.Get(1).(*pbresource.ReadRequest)
require.True(t, proto.Equal(expectedId, req.Id))
})
} else {
mockClient.Read(mock.Anything, mock.Anything).
Return(nil, status.Error(codes.NotFound, "not found")).
Once().
Run(func(args mock.Arguments) {
req := args.Get(1).(*pbresource.ReadRequest)
require.True(t, proto.Equal(expectedId, req.Id))
})
}
}
func setupWriteExpectation(
t *testing.T,
mockClient *mockpbresource.ResourceServiceClient_Expecter,
expectedResource *pbresource.Resource,
sendErr bool) {
// If there is no expected resource, we take that to mean we don't expect any client writes.
if expectedResource == nil {
return
}
if sendErr {
mockClient.Write(mock.Anything, mock.Anything).
Return(nil, fakeWrappedErr).
Once().
Run(func(args mock.Arguments) {
req := args.Get(1).(*pbresource.WriteRequest)
require.True(t, proto.Equal(expectedResource, req.Resource))
})
} else {
mockClient.Write(mock.Anything, mock.Anything).
Return(nil, nil).
Once().
Run(func(args mock.Arguments) {
req := args.Get(1).(*pbresource.WriteRequest)
require.True(t, proto.Equal(expectedResource, req.Resource))
})
}
}
func setupDeleteExpectation(
t *testing.T,
mockClient *mockpbresource.ResourceServiceClient_Expecter,
expectedResource *pbresource.Resource,
sendErr bool) {
expectedId := expectedResource.GetId()
if sendErr {
mockClient.Delete(mock.Anything, mock.Anything).
Return(nil, fakeWrappedErr).
Once().
Run(func(args mock.Arguments) {
req := args.Get(1).(*pbresource.DeleteRequest)
require.True(t, proto.Equal(expectedId, req.Id))
})
} else {
mockClient.Delete(mock.Anything, mock.Anything).
Return(nil, nil).
Once().
Run(func(args mock.Arguments) {
req := args.Get(1).(*pbresource.DeleteRequest)
require.True(t, proto.Equal(expectedId, req.Id))
})
}
}
func getTestWorkload(t *testing.T) *pbresource.Resource {
return getTestWorkloadWithPort(t, 8300)
}
func getTestWorkloadWithPort(t *testing.T, port int) *pbresource.Resource {
workload := &pbcatalog.Workload{
Addresses: []*pbcatalog.WorkloadAddress{
{Host: "127.0.0.1", Ports: []string{consulPortNameServer}},
},
Ports: map[string]*pbcatalog.WorkloadPort{
consulPortNameServer: {
Port: uint32(port),
Protocol: pbcatalog.Protocol_PROTOCOL_TCP,
},
},
}
data, err := anypb.New(workload)
require.NoError(t, err)
return &pbresource.Resource{
Id: getTestWorkloadId(),
Data: data,
Metadata: map[string]string{
"read_replica": "false",
"raft_version": "3",
"serf_protocol_current": "2",
"serf_protocol_min": "1",
"serf_protocol_max": "5",
"version": "1.18.0",
"grpc_port": "8502",
},
}
}
func getTestWorkloadId() *pbresource.ID {
return &pbresource.ID{
Tenancy: resource.DefaultNamespacedTenancy(),
Type: pbcatalog.WorkloadType,
Name: "consul-server-72af047d-1857-2493-969e-53614a70b25a",
}
}
func getTestHealthStatus(t *testing.T, passing bool) *pbresource.Resource {
healthStatus := &pbcatalog.HealthStatus{
Type: string(structs.SerfCheckID),
Description: structs.SerfCheckName,
}
if passing {
healthStatus.Status = pbcatalog.Health_HEALTH_PASSING
healthStatus.Output = structs.SerfCheckAliveOutput
} else {
healthStatus.Status = pbcatalog.Health_HEALTH_CRITICAL
healthStatus.Output = structs.SerfCheckFailedOutput
}
data, err := anypb.New(healthStatus)
require.NoError(t, err)
return &pbresource.Resource{
Id: getTestHealthstatusId(),
Data: data,
Owner: getTestWorkloadId(),
}
}
func getTestHealthstatusId() *pbresource.ID {
return &pbresource.ID{
Tenancy: resource.DefaultNamespacedTenancy(),
Type: pbcatalog.HealthStatusType,
Name: "consul-server-72af047d-1857-2493-969e-53614a70b25a",
}
}
func getTestSerfMember(status serf.MemberStatus) serf.Member {
return serf.Member{
Name: "test-server-1",
Addr: net.ParseIP("127.0.0.1"),
Port: 8300,
// representative tags from a local dev deployment of ENT
Tags: map[string]string{
"vsn_min": "2",
"vsn": "2",
"acls": "1",
"ft_si": "1",
"raft_vsn": "3",
"grpc_port": "8502",
"wan_join_port": "8500",
"dc": "dc1",
"segment": "",
"id": "72af047d-1857-2493-969e-53614a70b25a",
"ft_admpart": "1",
"role": "consul",
"build": "1.18.0",
"ft_ns": "1",
"vsn_max": "3",
"bootstrap": "1",
"expect": "1",
"port": "8300",
},
Status: status,
ProtocolMin: 1,
ProtocolMax: 5,
ProtocolCur: 2,
DelegateMin: 2,
DelegateMax: 5,
DelegateCur: 4,
}
}
// Test_ResourceCmpOptions_GeneratedFieldInsensitive makes sure are protocmp options are working as expected.
func Test_ResourceCmpOptions_GeneratedFieldInsensitive(t *testing.T) {
t.Parallel()
res1 := getTestWorkload(t)
res2 := getTestWorkload(t)
// Modify the generated fields
res2.Id.Uid = "123456"
res2.Version = "789"
res2.Generation = "millenial"
res2.Status = map[string]*pbresource.Status{
"foo": {ObservedGeneration: "124"},
}
require.True(t, cmp.Equal(res1, res2, resourceCmpOptions...))
res1.Metadata["foo"] = "bar"
require.False(t, cmp.Equal(res1, res2, resourceCmpOptions...))
}
// Test gRPC Error Codes Conditions
func Test_grpcNotFoundErr(t *testing.T) {
t.Parallel()
tests := []struct {
name string
err error
expected bool
}{
{
name: "Nil Error",
},
{
name: "Nonsense Error",
err: fmt.Errorf("boooooo!"),
},
{
name: "gRPC Permission Denied Error",
err: status.Error(codes.PermissionDenied, "permission denied is not NotFound"),
},
{
name: "gRPC NotFound Error",
err: status.Error(codes.NotFound, "bingo: not found"),
expected: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require.Equal(t, tt.expected, grpcNotFoundErr(tt.err))
})
}
}

View File

@ -10,7 +10,6 @@ import (
"fmt"
"io"
"os"
"strconv"
"strings"
"testing"
"time"
@ -24,878 +23,73 @@ import (
msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/leafcert"
"github.com/hashicorp/consul/agent/structs"
tokenStore "github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/sdk/freeport"
"github.com/hashicorp/consul/internal/resource"
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v2beta1"
"github.com/hashicorp/consul/proto-public/pbresource"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc"
)
func TestLeader_RegisterMember(t *testing.T) {
func enableV2(t *testing.T) func(deps *Deps) {
return func(deps *Deps) {
deps.Experiments = []string{"resource-apis"}
m, _ := leafcert.NewTestManager(t, nil)
deps.LeafCertManager = m
}
}
// Test that Consul service is created in V2.
// In V1, the service is implicitly created - this is covered in leader_registrator_v1_test.go
func Test_InitConsulService(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
dir, s := testServerWithDepsAndConfig(t, enableV2(t),
func(c *Config) {
c.PrimaryDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLInitialManagementToken = "root"
c.ACLResolverSettings.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
defer os.RemoveAll(dir)
defer s.Shutdown()
dir2, c1 := testClient(t)
defer os.RemoveAll(dir2)
defer c1.Shutdown()
testrpc.WaitForRaftLeader(t, s.RPC, "dc1", testrpc.WithToken("root"))
// Try to join
joinLAN(t, c1, s1)
client := s.insecureResourceServiceClient
testrpc.WaitForLeader(t, s1.RPC, "dc1")
consulServiceID := &pbresource.ID{
Name: structs.ConsulServiceName,
Type: pbcatalog.ServiceType,
Tenancy: resource.DefaultNamespacedTenancy(),
}
// Client should be registered
state := s1.fsm.State()
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(c1.config.NodeName, nil, "")
res, err := client.Read(context.Background(), &pbresource.ReadRequest{Id: consulServiceID})
if err != nil {
r.Fatalf("err: %v", err)
}
if node == nil {
r.Fatal("client not registered")
}
})
data := res.GetResource().GetData()
require.NotNil(r, data)
// Should have a check
_, checks, err := state.NodeChecks(nil, c1.config.NodeName, nil, "")
if err != nil {
t.Fatalf("err: %v", err)
}
if len(checks) != 1 {
t.Fatalf("client missing check")
}
if checks[0].CheckID != structs.SerfCheckID {
t.Fatalf("bad check: %v", checks[0])
}
if checks[0].Name != structs.SerfCheckName {
t.Fatalf("bad check: %v", checks[0])
}
if checks[0].Status != api.HealthPassing {
t.Fatalf("bad check: %v", checks[0])
}
// Server should be registered
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(s1.config.NodeName, nil, "")
if err != nil {
r.Fatalf("err: %v", err)
}
if node == nil {
r.Fatalf("server not registered")
}
})
// Service should be registered
_, services, err := state.NodeServices(nil, s1.config.NodeName, nil, "")
if err != nil {
t.Fatalf("err: %v", err)
}
if _, ok := services.Services["consul"]; !ok {
t.Fatalf("consul service not registered: %v", services)
}
}
func TestLeader_FailedMember(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLInitialManagementToken = "root"
c.ACLResolverSettings.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, c1 := testClient(t)
defer os.RemoveAll(dir2)
defer c1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// Try to join
joinLAN(t, c1, s1)
// Fail the member
c1.Shutdown()
// Should be registered
state := s1.fsm.State()
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(c1.config.NodeName, nil, "")
if err != nil {
r.Fatalf("err: %v", err)
}
if node == nil {
r.Fatal("client not registered")
}
})
// Should have a check
_, checks, err := state.NodeChecks(nil, c1.config.NodeName, nil, "")
if err != nil {
t.Fatalf("err: %v", err)
}
if len(checks) != 1 {
t.Fatalf("client missing check")
}
if checks[0].CheckID != structs.SerfCheckID {
t.Fatalf("bad check: %v", checks[0])
}
if checks[0].Name != structs.SerfCheckName {
t.Fatalf("bad check: %v", checks[0])
}
retry.Run(t, func(r *retry.R) {
_, checks, err = state.NodeChecks(nil, c1.config.NodeName, nil, "")
if err != nil {
r.Fatalf("err: %v", err)
}
if len(checks) != 1 {
r.Fatalf("client missing check")
}
if got, want := checks[0].Status, api.HealthCritical; got != want {
r.Fatalf("got status %q want %q", got, want)
}
})
}
func TestLeader_LeftMember(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLInitialManagementToken = "root"
c.ACLResolverSettings.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, c1 := testClient(t)
defer os.RemoveAll(dir2)
defer c1.Shutdown()
// Try to join
joinLAN(t, c1, s1)
state := s1.fsm.State()
// Should be registered
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(c1.config.NodeName, nil, "")
var service pbcatalog.Service
err = data.UnmarshalTo(&service)
require.NoError(r, err)
require.NotNil(r, node, "client not registered")
// Spot check the Service
require.Equal(r, service.GetWorkloads().GetPrefixes(), []string{consulWorkloadPrefix})
require.GreaterOrEqual(r, len(service.GetPorts()), 1)
//Since we're not running a full agent w/ serf, we can't check for valid endpoints
})
// Node should leave
c1.Leave()
c1.Shutdown()
// Should be deregistered
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(c1.config.NodeName, nil, "")
require.NoError(r, err)
require.Nil(r, node, "client still registered")
})
}
func TestLeader_ReapMember(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLInitialManagementToken = "root"
c.ACLResolverSettings.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, c1 := testClient(t)
defer os.RemoveAll(dir2)
defer c1.Shutdown()
// Try to join
joinLAN(t, c1, s1)
state := s1.fsm.State()
// Should be registered
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(c1.config.NodeName, nil, "")
require.NoError(r, err)
require.NotNil(r, node, "client not registered")
})
// Simulate a node reaping
mems := s1.LANMembersInAgentPartition()
var c1mem serf.Member
for _, m := range mems {
if m.Name == c1.config.NodeName {
c1mem = m
c1mem.Status = StatusReap
break
}
}
s1.reconcileCh <- c1mem
// Should be deregistered; we have to poll quickly here because
// anti-entropy will put it back.
reaped := false
for start := time.Now(); time.Since(start) < 5*time.Second; {
_, node, err := state.GetNode(c1.config.NodeName, nil, "")
require.NoError(t, err)
if node == nil {
reaped = true
break
}
}
if !reaped {
t.Fatalf("client should not be registered")
}
}
func TestLeader_ReapOrLeftMember_IgnoreSelf(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
run := func(t *testing.T, status serf.MemberStatus, nameFn func(string) string) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLInitialManagementToken = "root"
c.ACLResolverSettings.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
nodeName := s1.config.NodeName
if nameFn != nil {
nodeName = nameFn(nodeName)
}
state := s1.fsm.State()
// Should be registered
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(nodeName, nil, "")
require.NoError(r, err)
require.NotNil(r, node, "server not registered")
})
// Simulate THIS node reaping or leaving
mems := s1.LANMembersInAgentPartition()
var s1mem serf.Member
for _, m := range mems {
if strings.EqualFold(m.Name, nodeName) {
s1mem = m
s1mem.Status = status
s1mem.Name = nodeName
break
}
}
s1.reconcileCh <- s1mem
// Should NOT be deregistered; we have to poll quickly here because
// anti-entropy will put it back if it did get deleted.
reaped := false
for start := time.Now(); time.Since(start) < 5*time.Second; {
_, node, err := state.GetNode(nodeName, nil, "")
require.NoError(t, err)
if node == nil {
reaped = true
break
}
}
if reaped {
t.Fatalf("server should still be registered")
}
}
t.Run("original name", func(t *testing.T) {
t.Parallel()
t.Run("left", func(t *testing.T) {
run(t, serf.StatusLeft, nil)
})
t.Run("reap", func(t *testing.T) {
run(t, StatusReap, nil)
})
})
t.Run("uppercased name", func(t *testing.T) {
t.Parallel()
t.Run("left", func(t *testing.T) {
run(t, serf.StatusLeft, strings.ToUpper)
})
t.Run("reap", func(t *testing.T) {
run(t, StatusReap, strings.ToUpper)
})
})
}
func TestLeader_CheckServersMeta(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
ports := freeport.GetN(t, 2) // s3 grpc, s3 grpc_tls
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLInitialManagementToken = "root"
c.ACLResolverSettings.ACLDefaultPolicy = "allow"
c.Bootstrap = true
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLInitialManagementToken = "root"
c.ACLResolverSettings.ACLDefaultPolicy = "allow"
c.Bootstrap = false
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLInitialManagementToken = "root"
c.ACLResolverSettings.ACLDefaultPolicy = "allow"
c.Bootstrap = false
c.GRPCPort = ports[0]
c.GRPCTLSPort = ports[1]
})
defer os.RemoveAll(dir3)
defer s3.Shutdown()
// Try to join
joinLAN(t, s1, s2)
joinLAN(t, s1, s3)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForLeader(t, s2.RPC, "dc1")
testrpc.WaitForLeader(t, s3.RPC, "dc1")
state := s1.fsm.State()
consulService := &structs.NodeService{
ID: "consul",
Service: "consul",
}
// s3 should be registered
retry.Run(t, func(r *retry.R) {
_, service, err := state.NodeService(nil, s3.config.NodeName, "consul", &consulService.EnterpriseMeta, "")
if err != nil {
r.Fatalf("err: %v", err)
}
if service == nil {
r.Fatal("client not registered")
}
if service.Meta["non_voter"] != "false" {
r.Fatalf("Expected to be non_voter == false, was: %s", service.Meta["non_voter"])
}
})
member := serf.Member{}
for _, m := range s1.serfLAN.Members() {
if m.Name == s3.config.NodeName {
member = m
member.Tags = make(map[string]string)
for key, value := range m.Tags {
member.Tags[key] = value
}
}
}
if member.Name != s3.config.NodeName {
t.Fatal("could not find node in serf members")
}
versionToExpect := "19.7.9"
retry.Run(t, func(r *retry.R) {
// DEPRECATED - remove nonvoter tag in favor of read_replica in a future version of consul
member.Tags["nonvoter"] = "1"
member.Tags["read_replica"] = "1"
member.Tags["build"] = versionToExpect
err := s1.handleAliveMember(member, nil)
if err != nil {
r.Fatalf("Unexpected error :%v", err)
}
_, service, err := state.NodeService(nil, s3.config.NodeName, "consul", &consulService.EnterpriseMeta, "")
if err != nil {
r.Fatalf("err: %v", err)
}
if service == nil {
r.Fatal("client not registered")
}
// DEPRECATED - remove non_voter in favor of read_replica in a future version of consul
if service.Meta["non_voter"] != "true" {
r.Fatalf("Expected to be non_voter == true, was: %s", service.Meta["non_voter"])
}
if service.Meta["read_replica"] != "true" {
r.Fatalf("Expected to be read_replica == true, was: %s", service.Meta["non_voter"])
}
newVersion := service.Meta["version"]
if newVersion != versionToExpect {
r.Fatalf("Expected version to be updated to %s, was %s", versionToExpect, newVersion)
}
grpcPort := service.Meta["grpc_port"]
if grpcPort != strconv.Itoa(ports[0]) {
r.Fatalf("Expected grpc port to be %d, was %s", ports[0], grpcPort)
}
grpcTLSPort := service.Meta["grpc_tls_port"]
if grpcTLSPort != strconv.Itoa(ports[1]) {
r.Fatalf("Expected grpc tls port to be %d, was %s", ports[1], grpcTLSPort)
}
})
}
func TestLeader_ReapServer(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLInitialManagementToken = "root"
c.ACLResolverSettings.ACLDefaultPolicy = "allow"
c.Bootstrap = true
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLInitialManagementToken = "root"
c.ACLResolverSettings.ACLDefaultPolicy = "allow"
c.Bootstrap = false
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLInitialManagementToken = "root"
c.ACLResolverSettings.ACLDefaultPolicy = "allow"
c.Bootstrap = false
})
defer os.RemoveAll(dir3)
defer s3.Shutdown()
// Try to join
joinLAN(t, s1, s2)
joinLAN(t, s1, s3)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
testrpc.WaitForLeader(t, s2.RPC, "dc1")
testrpc.WaitForLeader(t, s3.RPC, "dc1")
state := s1.fsm.State()
// s3 should be registered
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(s3.config.NodeName, nil, "")
if err != nil {
r.Fatalf("err: %v", err)
}
if node == nil {
r.Fatal("client not registered")
}
})
// call reconcileReaped with a map that does not contain s3
knownMembers := make(map[string]struct{})
knownMembers[s1.config.NodeName] = struct{}{}
knownMembers[s2.config.NodeName] = struct{}{}
err := s1.reconcileReaped(knownMembers, nil)
if err != nil {
t.Fatalf("Unexpected error :%v", err)
}
// s3 should be deregistered
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(s3.config.NodeName, nil, "")
if err != nil {
r.Fatalf("err: %v", err)
}
if node != nil {
r.Fatalf("server with id %v should not be registered", s3.config.NodeID)
}
})
}
func TestLeader_Reconcile_ReapMember(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLInitialManagementToken = "root"
c.ACLResolverSettings.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// Register a non-existing member
dead := structs.RegisterRequest{
Datacenter: s1.config.Datacenter,
Node: "no-longer-around",
Address: "127.1.1.1",
Check: &structs.HealthCheck{
Node: "no-longer-around",
CheckID: structs.SerfCheckID,
Name: structs.SerfCheckName,
Status: api.HealthCritical,
},
WriteRequest: structs.WriteRequest{
Token: "root",
},
}
var out struct{}
if err := s1.RPC(context.Background(), "Catalog.Register", &dead, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Force a reconciliation
if err := s1.reconcile(); err != nil {
t.Fatalf("err: %v", err)
}
// Node should be gone
state := s1.fsm.State()
_, node, err := state.GetNode("no-longer-around", nil, "")
if err != nil {
t.Fatalf("err: %v", err)
}
if node != nil {
t.Fatalf("client registered")
}
}
func TestLeader_Reconcile(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.PrimaryDatacenter = "dc1"
c.ACLsEnabled = true
c.ACLInitialManagementToken = "root"
c.ACLResolverSettings.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, c1 := testClient(t)
defer os.RemoveAll(dir2)
defer c1.Shutdown()
// Join before we have a leader, this should cause a reconcile!
joinLAN(t, c1, s1)
// Should not be registered
state := s1.fsm.State()
_, node, err := state.GetNode(c1.config.NodeName, nil, "")
if err != nil {
t.Fatalf("err: %v", err)
}
if node != nil {
t.Fatalf("client registered")
}
// Should be registered
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(c1.config.NodeName, nil, "")
if err != nil {
r.Fatalf("err: %v", err)
}
if node == nil {
r.Fatal("client not registered")
}
})
}
func TestLeader_Reconcile_Races(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
dir2, c1 := testClient(t)
defer os.RemoveAll(dir2)
defer c1.Shutdown()
joinLAN(t, c1, s1)
// Wait for the server to reconcile the client and register it.
state := s1.fsm.State()
var nodeAddr string
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(c1.config.NodeName, nil, "")
if err != nil {
r.Fatalf("err: %v", err)
}
if node == nil {
r.Fatal("client not registered")
}
nodeAddr = node.Address
})
// Add in some metadata via the catalog (as if the agent synced it
// there). We also set the serfHealth check to failing so the reconcile
// will attempt to flip it back
req := structs.RegisterRequest{
Datacenter: s1.config.Datacenter,
Node: c1.config.NodeName,
ID: c1.config.NodeID,
Address: nodeAddr,
NodeMeta: map[string]string{"hello": "world"},
Check: &structs.HealthCheck{
Node: c1.config.NodeName,
CheckID: structs.SerfCheckID,
Name: structs.SerfCheckName,
Status: api.HealthCritical,
Output: "",
},
}
var out struct{}
if err := s1.RPC(context.Background(), "Catalog.Register", &req, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Force a reconcile and make sure the metadata stuck around.
if err := s1.reconcile(); err != nil {
t.Fatalf("err: %v", err)
}
_, node, err := state.GetNode(c1.config.NodeName, nil, "")
if err != nil {
t.Fatalf("err: %v", err)
}
if node == nil {
t.Fatalf("bad")
}
if hello, ok := node.Meta["hello"]; !ok || hello != "world" {
t.Fatalf("bad")
}
// Fail the member and wait for the health to go critical.
c1.Shutdown()
retry.Run(t, func(r *retry.R) {
_, checks, err := state.NodeChecks(nil, c1.config.NodeName, nil, "")
if err != nil {
r.Fatalf("err: %v", err)
}
if len(checks) != 1 {
r.Fatalf("client missing check")
}
if got, want := checks[0].Status, api.HealthCritical; got != want {
r.Fatalf("got state %q want %q", got, want)
}
})
// Make sure the metadata didn't get clobbered.
_, node, err = state.GetNode(c1.config.NodeName, nil, "")
if err != nil {
t.Fatalf("err: %v", err)
}
if node == nil {
t.Fatalf("bad")
}
if hello, ok := node.Meta["hello"]; !ok || hello != "world" {
t.Fatalf("bad")
}
}
func TestLeader_LeftServer(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir3)
defer s3.Shutdown()
// Put s1 last so we don't trigger a leader election.
servers := []*Server{s2, s3, s1}
// Try to join
joinLAN(t, s2, s1)
joinLAN(t, s3, s1)
for _, s := range servers {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) })
}
// Kill any server
servers[0].Shutdown()
// Force remove the non-leader (transition to left state)
if err := servers[1].RemoveFailedNode(servers[0].config.NodeName, false, nil); err != nil {
t.Fatalf("err: %v", err)
}
// Wait until the remaining servers show only 2 peers.
for _, s := range servers[1:] {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 2)) })
}
s1.Shutdown()
}
func TestLeader_LeftLeader(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerDCBootstrap(t, "dc1", false)
defer os.RemoveAll(dir3)
defer s3.Shutdown()
servers := []*Server{s1, s2, s3}
// Try to join
joinLAN(t, s2, s1)
joinLAN(t, s3, s1)
for _, s := range servers {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) })
}
// Kill the leader!
var leader *Server
for _, s := range servers {
if s.IsLeader() {
leader = s
break
}
}
if leader == nil {
t.Fatalf("Should have a leader")
}
if !leader.isReadyForConsistentReads() {
t.Fatalf("Expected leader to be ready for consistent reads ")
}
leader.Leave()
if leader.isReadyForConsistentReads() {
t.Fatalf("Expected consistent read state to be false ")
}
leader.Shutdown()
time.Sleep(100 * time.Millisecond)
var remain *Server
for _, s := range servers {
if s == leader {
continue
}
remain = s
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 2)) })
}
// Verify the old leader is deregistered
state := remain.fsm.State()
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(leader.config.NodeName, nil, "")
if err != nil {
r.Fatalf("err: %v", err)
}
if node != nil {
r.Fatal("leader should be deregistered")
}
})
}
func TestLeader_MultiBootstrap(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServer(t)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
servers := []*Server{s1, s2}
// Try to join
joinLAN(t, s2, s1)
for _, s := range servers {
retry.Run(t, func(r *retry.R) {
if got, want := len(s.serfLAN.Members()), 2; got != want {
r.Fatalf("got %d peers want %d", got, want)
}
})
}
// Ensure we don't have multiple raft peers
for _, s := range servers {
peers, _ := s.autopilot.NumVoters()
if peers != 1 {
t.Fatalf("should only have 1 raft peer!")
}
}
}
func TestLeader_TombstoneGC_Reset(t *testing.T) {

View File

@ -414,6 +414,9 @@ type Server struct {
// Manager to handle starting/stopping go routines when establishing/revoking raft leadership
leaderRoutineManager *routine.Manager
// registrator is an implemenation that translates serf events of Consul servers into catalog events
registrator ConsulRegistrator
// publisher is the EventPublisher to be shared amongst various server components. Events from
// modifications to the FSM, autopilot and others will flow through here. If in the future we
// need Events generated outside of the Server and all its components, then we could move
@ -883,6 +886,24 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server,
// as establishing leadership could attempt to use autopilot and cause a panic.
s.initAutopilot(config)
// Construct the registrator that makes sense for the catalog version
if s.useV2Resources {
s.registrator = V2ConsulRegistrator{
Logger: serverLogger,
NodeName: s.config.NodeName,
EntMeta: s.config.AgentEnterpriseMeta(),
Client: s.insecureResourceServiceClient,
}
} else {
s.registrator = V1ConsulRegistrator{
Datacenter: s.config.Datacenter,
FSM: s.fsm,
Logger: serverLogger,
NodeName: s.config.NodeName,
RaftApplyFunc: s.raftApplyMsgpack,
}
}
// Start monitoring leadership. This must happen after Serf is set up
// since it can fire events when leadership is obtained.
go s.monitorLeadership()

View File

@ -230,6 +230,12 @@ func testServerDCExpect(t *testing.T, dc string, expect int) (string, *Server) {
}
func testServerWithConfig(t *testing.T, configOpts ...func(*Config)) (string, *Server) {
return testServerWithDepsAndConfig(t, nil, configOpts...)
}
// testServerWithDepsAndConfig is similar to testServerWithConfig except that it also allows modifying dependencies.
// This is useful for things like injecting experiment flags.
func testServerWithDepsAndConfig(t *testing.T, depOpts func(*Deps), configOpts ...func(*Config)) (string, *Server) {
var dir string
var srv *Server
@ -251,6 +257,11 @@ func testServerWithConfig(t *testing.T, configOpts ...func(*Config)) (string, *S
var err error
deps = newDefaultDeps(r, config)
if depOpts != nil {
depOpts(&deps)
}
srv, err = newServerWithDeps(r, config, deps)
if err != nil {
r.Fatalf("err: %v", err)

View File

@ -47,6 +47,37 @@ func WaitForLeader(t *testing.T, rpc rpcFn, dc string, options ...waitOption) {
})
}
// WaitForRaftLeader is a V2-compatible version of WaitForLeader.
// Unlike WaitForLeader, it requires a token with operator:read access.
func WaitForRaftLeader(t *testing.T, rpc rpcFn, dc string, options ...waitOption) {
t.Helper()
flat := flattenOptions(options)
if flat.WaitForAntiEntropySync {
t.Fatalf("WaitForRaftLeader doesn't accept the WaitForAntiEntropySync option")
}
var out structs.RaftConfigurationResponse
retry.Run(t, func(r *retry.R) {
args := &structs.DCSpecificRequest{
Datacenter: dc,
QueryOptions: structs.QueryOptions{Token: flat.Token},
}
if err := rpc(context.Background(), "Operator.RaftGetConfiguration", args, &out); err != nil {
r.Fatalf("Operator.RaftGetConfiguration failed: %v", err)
}
// Don't check the Raft index. With other things are going on in V2 the assumption the index >= 2 is
// no longer valid.
for _, server := range out.Servers {
if server.Leader {
return
}
}
r.Fatalf("No leader")
})
}
// WaitUntilNoLeader ensures no leader is present, useful for testing lost leadership.
func WaitUntilNoLeader(t *testing.T, rpc rpcFn, dc string, options ...waitOption) {
t.Helper()