partitions: various refactors to support partitioning the serf LAN pool (#11568)

pull/11574/head
R.B. Boyer 2021-11-15 09:51:14 -06:00 committed by GitHub
parent 86a30e9ce7
commit eb21649f82
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
38 changed files with 938 additions and 532 deletions

3
.changelog/_1202.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:feature
partitions: **(Enterprise only)** segment serf LAN gossip between nodes in different partitions
```

View File

@ -144,7 +144,20 @@ type delegate interface {
// This is limited to segments and partitions that the node is a member of.
LANMembers(f consul.LANMemberFilter) ([]serf.Member, error)
// GetLANCoordinate returns the coordinate of the node in the LAN gossip pool.
// GetLANCoordinate returns the coordinate of the node in the LAN gossip
// pool.
//
// - Clients return a single coordinate for the single gossip pool they are
// in (default, segment, or partition).
//
// - Servers return one coordinate for their canonical gossip pool (i.e.
// default partition/segment) and one per segment they are also ancillary
// members of.
//
// NOTE: servers do not emit coordinates for partitioned gossip pools they
// are ancillary members of.
//
// NOTE: This assumes coordinates are enabled, so check that before calling.
GetLANCoordinate() (lib.CoordinateSet, error)
// JoinLAN is used to have Consul join the inner-DC pool The target address
@ -1264,6 +1277,7 @@ func segmentConfig(config *config.RuntimeConfig) ([]consul.NetworkSegment, error
var segments []consul.NetworkSegment
for _, s := range config.Segments {
// TODO: use consul.CloneSerfLANConfig(config.SerfLANConfig) here?
serfConf := consul.DefaultConfig().SerfLANConfig
serfConf.MemberlistConfig.BindAddr = s.Bind.IP.String()
@ -1541,10 +1555,6 @@ func (a *Agent) RefreshPrimaryGatewayFallbackAddresses(addrs []string) error {
// ForceLeave is used to remove a failed node from the cluster
func (a *Agent) ForceLeave(node string, prune bool, entMeta *structs.EnterpriseMeta) (err error) {
a.logger.Info("Force leaving node", "node", node)
// TODO(partitions): merge IsMember into the RemoveFailedNode call.
if ok := a.IsMember(node); !ok {
return fmt.Errorf("agent: No node found with name '%s'", node)
}
err = a.delegate.RemoveFailedNode(node, prune, entMeta)
if err != nil {
a.logger.Warn("Failed to remove node",
@ -1585,18 +1595,6 @@ func (a *Agent) WANMembers() []serf.Member {
return nil
}
// IsMember is used to check if a node with the given nodeName
// is a member
func (a *Agent) IsMember(nodeName string) bool {
for _, m := range a.LANMembersInAgentPartition() {
if m.Name == nodeName {
return true
}
}
return false
}
// StartSync is called once Services and Checks are registered.
// This is called to prevent a race between clients and the anti-entropy routines
func (a *Agent) StartSync() {

View File

@ -193,7 +193,13 @@ func (c *Client) Leave() error {
// JoinLAN is used to have Consul join the inner-DC pool The target address
// should be another node inside the DC listening on the Serf LAN address
func (c *Client) JoinLAN(addrs []string, entMeta *structs.EnterpriseMeta) (int, error) {
// TODO(partitions): assert that the partitions match
// Partitions definitely have to match.
if c.config.AgentEnterpriseMeta().PartitionOrDefault() != entMeta.PartitionOrDefault() {
return 0, fmt.Errorf("target partition %q must match client agent partition %q",
entMeta.PartitionOrDefault(),
c.config.AgentEnterpriseMeta().PartitionOrDefault(),
)
}
return c.serf.Join(addrs, true)
}
@ -221,7 +227,10 @@ func (c *Client) LANMembers(filter LANMemberFilter) ([]serf.Member, error) {
return nil, err
}
// TODO(partitions): assert that the partitions match
// Partitions definitely have to match.
if c.config.AgentEnterpriseMeta().PartitionOrDefault() != filter.PartitionOrDefault() {
return nil, fmt.Errorf("partition %q not found", filter.PartitionOrDefault())
}
if !filter.AllSegments && filter.Segment != c.config.Segment {
return nil, fmt.Errorf("segment %q not found", filter.Segment)
@ -232,7 +241,14 @@ func (c *Client) LANMembers(filter LANMemberFilter) ([]serf.Member, error) {
// RemoveFailedNode is used to remove a failed node from the cluster.
func (c *Client) RemoveFailedNode(node string, prune bool, entMeta *structs.EnterpriseMeta) error {
// TODO(partitions): assert that the partitions match
// Partitions definitely have to match.
if c.config.AgentEnterpriseMeta().PartitionOrDefault() != entMeta.PartitionOrDefault() {
return fmt.Errorf("client agent in partition %q cannot remove node in different partition %q",
c.config.AgentEnterpriseMeta().PartitionOrDefault(), entMeta.PartitionOrDefault())
}
if !isSerfMember(c.serf, node) {
return fmt.Errorf("agent: No node found with name '%s'", node)
}
if prune {
return c.serf.RemoveFailedNodePrune(node)
}
@ -371,9 +387,21 @@ func (c *Client) Stats() map[string]map[string]string {
return stats
}
// GetLANCoordinate returns the coordinate of the node in the LAN gossip pool.
// GetLANCoordinate returns the coordinate of the node in the LAN gossip
// pool.
//
// - Clients return a single coordinate for the single gossip pool they are
// in (default, segment, or partition).
//
// - Servers return one coordinate for their canonical gossip pool (i.e.
// default partition/segment) and one per segment they are also ancillary
// members of.
//
// NOTE: servers do not emit coordinates for partitioned gossip pools they
// are ancillary members of.
//
// NOTE: This assumes coordinates are enabled, so check that before calling.
func (c *Client) GetLANCoordinate() (lib.CoordinateSet, error) {
// TODO(partitions): possibly something here
lan, err := c.serf.GetCoordinate()
if err != nil {
return nil, err
@ -389,3 +417,11 @@ func (c *Client) ReloadConfig(config ReloadableConfig) error {
c.rpcLimiter.Store(rate.NewLimiter(config.RPCRateLimit, config.RPCMaxBurst))
return nil
}
func (c *Client) AgentEnterpriseMeta() *structs.EnterpriseMeta {
return c.config.AgentEnterpriseMeta()
}
func (c *Client) agentSegmentName() string {
return c.config.Segment
}

View File

@ -49,11 +49,12 @@ func (c *Client) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (
conf.ProtocolVersion = protocolVersionMap[c.config.ProtocolVersion]
conf.RejoinAfterLeave = c.config.RejoinAfterLeave
conf.Merge = &lanMergeDelegate{
dc: c.config.Datacenter,
nodeID: c.config.NodeID,
nodeName: c.config.NodeName,
segment: c.config.Segment,
server: false,
dc: c.config.Datacenter,
nodeID: c.config.NodeID,
nodeName: c.config.NodeName,
segment: c.config.Segment,
server: false,
partition: c.config.AgentEnterpriseMeta().PartitionOrDefault(),
}
conf.SnapshotPath = filepath.Join(c.config.DataDir, path)
@ -65,6 +66,8 @@ func (c *Client) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (
conf.ReconnectTimeoutOverride = libserf.NewReconnectOverride(c.logger)
enterpriseModifyClientSerfConfigLAN(c.config, conf)
return serf.Create(conf)
}

View File

@ -19,26 +19,28 @@ func TestCloneSerfLANConfig(t *testing.T) {
"Alive",
"AwarenessMaxMultiplier",
"Conflict",
"DNSConfigPath",
"Delegate",
"DelegateProtocolMax",
"DelegateProtocolMin",
"DelegateProtocolVersion",
"DisableTcpPings",
"DisableTcpPingsForNode",
"DNSConfigPath",
"EnableCompression",
"Events",
"GossipToTheDeadTime",
"HandoffQueueDepth",
"IndirectChecks",
"LogOutput",
"Label",
"Logger",
"LogOutput",
"Merge",
"Name",
"Ping",
"ProtocolVersion",
"PushPullInterval",
"RequireNodeNames",
"SkipInboundLabelCheck",
"SuspicionMaxTimeoutMult",
"TCPTimeout",
"Transport",

View File

@ -24,8 +24,9 @@ type Coordinate struct {
logger hclog.Logger
// updates holds pending coordinate updates for the given nodes. This is
// keyed by node:segment so we can get a coordinate for each segment for
// servers, and we only track the latest update per node:segment.
// keyed by partition/node:segment so we can get a coordinate for each
// segment for servers, and we only track the latest update per
// partition/node:segment.
updates map[string]*structs.CoordinateUpdateRequest
// updatesLock synchronizes access to the updates map.
@ -132,7 +133,7 @@ func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct
// Since this is a coordinate coming from some place else we harden this
// and look for dimensionality problems proactively.
coord, err := c.srv.serfLAN.GetCoordinate()
coord, err := c.srv.GetMatchingLANCoordinate(args.PartitionOrDefault(), args.Segment)
if err != nil {
return err
}
@ -157,7 +158,7 @@ func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct
}
// Add the coordinate to the map of pending updates.
key := fmt.Sprintf("%s:%s", args.Node, args.Segment)
key := fmt.Sprintf("%s/%s:%s", args.PartitionOrDefault(), args.Node, args.Segment)
c.updatesLock.Lock()
c.updates[key] = args
c.updatesLock.Unlock()
@ -174,8 +175,6 @@ func (c *Coordinate) ListDatacenters(args *struct{}, reply *[]structs.Datacenter
return err
}
// TODO(partitions): should we filter any of this out?
var out []structs.DatacenterMap
// Strip the datacenter suffixes from all the node names.
@ -253,8 +252,6 @@ func (c *Coordinate) Node(args *structs.NodeSpecificRequest, reply *structs.Inde
return acl.ErrPermissionDenied
}
// TODO(partitions): do we have to add EnterpriseMeta to the reply like in Catalog.ListServices?
return c.srv.blockingQuery(&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {

View File

@ -1,3 +1,4 @@
//go:build !consulent
// +build !consulent
package consul
@ -12,6 +13,10 @@ func (c *Client) initEnterprise(_ Deps) error {
return nil
}
func enterpriseModifyClientSerfConfigLAN(_ *Config, _ *serf.Config) {
// nothing
}
func (c *Client) startEnterprise() error {
return nil
}
@ -20,10 +25,6 @@ func (c *Client) handleEnterpriseUserEvents(event serf.UserEvent) bool {
return false
}
func (_ *Client) addEnterpriseSerfTags(_ map[string]string) {
// do nothing
}
func (c *Client) enterpriseStats() map[string]map[string]string {
return nil
}

View File

@ -84,6 +84,31 @@ func (s *Server) validateEnterpriseIntentionNamespace(ns string, _ bool) error {
return errors.New("Namespaces is a Consul Enterprise feature")
}
// setupSerfLAN is used to setup and initialize a Serf for the LAN
func (s *Server) setupSerfLAN(config *Config) error {
var err error
// Initialize the LAN Serf for the default network segment.
s.serfLAN, err = s.setupSerf(setupSerfOptions{
Config: config.SerfLANConfig,
EventCh: s.eventChLAN,
SnapshotPath: serfLANSnapshot,
Listener: s.Listener,
WAN: false,
Segment: "",
Partition: "",
})
if err != nil {
return err
}
return nil
}
func (s *Server) shutdownSerfLAN() {
if s.serfLAN != nil {
s.serfLAN.Shutdown()
}
}
func addEnterpriseSerfTags(_ map[string]string, _ *structs.EnterpriseMeta) {
// do nothing
}

View File

@ -87,11 +87,19 @@ func wantRaft(servers []*Server) error {
// joinAddrLAN returns the address other servers can
// use to join the cluster on the LAN interface.
func joinAddrLAN(s *Server) string {
func joinAddrLAN(s clientOrServer) string {
if s == nil {
panic("no server")
panic("no client or server")
}
var port int
switch x := s.(type) {
case *Server:
port = x.config.SerfLANConfig.MemberlistConfig.BindPort
case *Client:
port = x.config.SerfLANConfig.MemberlistConfig.BindPort
default:
panic(fmt.Sprintf("unhandled type %T", s))
}
port := s.config.SerfLANConfig.MemberlistConfig.BindPort
return fmt.Sprintf("127.0.0.1:%d", port)
}
@ -110,6 +118,8 @@ func joinAddrWAN(s *Server) string {
type clientOrServer interface {
JoinLAN(addrs []string, entMeta *structs.EnterpriseMeta) (int, error)
LANMembersInAgentPartition() []serf.Member
AgentEnterpriseMeta() *structs.EnterpriseMeta
agentSegmentName() string
}
// joinLAN is a convenience function for
@ -117,27 +127,54 @@ type clientOrServer interface {
// member.JoinLAN("127.0.0.1:"+leader.config.SerfLANConfig.MemberlistConfig.BindPort)
func joinLAN(t *testing.T, member clientOrServer, leader *Server) {
t.Helper()
joinLANWithOptions(t, member, leader, true)
}
func joinLANWithNoMembershipChecks(t *testing.T, member clientOrServer, leader *Server) {
t.Helper()
joinLANWithOptions(t, member, leader, false)
}
func joinLANWithOptions(t *testing.T, member clientOrServer, leader *Server, doMembershipChecks bool) {
t.Helper()
if member == nil || leader == nil {
panic("no server")
}
var memberAddr string
switch x := member.(type) {
case *Server:
memberAddr = joinAddrLAN(x)
case *Client:
memberAddr = fmt.Sprintf("127.0.0.1:%d", x.config.SerfLANConfig.MemberlistConfig.BindPort)
}
memberAddr := joinAddrLAN(member)
var (
memberEntMeta = member.AgentEnterpriseMeta()
memberPartition = memberEntMeta.PartitionOrDefault()
memberSegment = member.agentSegmentName()
)
leaderAddr := joinAddrLAN(leader)
if _, err := member.JoinLAN([]string{leaderAddr}, nil); err != nil {
if memberSegment != "" {
leaderAddr = leader.LANSegmentAddr(memberSegment)
}
if _, err := member.JoinLAN([]string{leaderAddr}, memberEntMeta); err != nil {
t.Fatal(err)
}
if !doMembershipChecks {
return
}
f := LANMemberFilter{
Partition: memberPartition,
Segment: memberSegment,
}
retry.Run(t, func(r *retry.R) {
if !seeEachOther(leader.LANMembersInAgentPartition(), member.LANMembersInAgentPartition(), leaderAddr, memberAddr) {
leaderView, err := leader.LANMembers(f)
require.NoError(r, err)
if !seeEachOther(leaderView, member.LANMembersInAgentPartition(), leaderAddr, memberAddr) {
r.Fatalf("leader and member cannot see each other on LAN")
}
})
if !seeEachOther(leader.LANMembersInAgentPartition(), member.LANMembersInAgentPartition(), leaderAddr, memberAddr) {
leaderView, err := leader.LANMembers(f)
require.NoError(t, err)
if !seeEachOther(leaderView, member.LANMembersInAgentPartition(), leaderAddr, memberAddr) {
t.Fatalf("leader and member cannot see each other on LAN")
}
}
@ -147,6 +184,14 @@ func joinLAN(t *testing.T, member clientOrServer, leader *Server) {
// member.JoinWAN("127.0.0.1:"+leader.config.SerfWANConfig.MemberlistConfig.BindPort)
func joinWAN(t *testing.T, member, leader *Server) {
t.Helper()
joinWANWithOptions(t, member, leader, true)
}
func joinWANWithNoMembershipChecks(t *testing.T, member, leader *Server) {
t.Helper()
joinWANWithOptions(t, member, leader, false)
}
func joinWANWithOptions(t *testing.T, member, leader *Server, doMembershipChecks bool) {
t.Helper()
if member == nil || leader == nil {
panic("no server")
@ -155,6 +200,11 @@ func joinWAN(t *testing.T, member, leader *Server) {
if _, err := member.JoinWAN([]string{leaderAddr}); err != nil {
t.Fatal(err)
}
if !doMembershipChecks {
return
}
retry.Run(t, func(r *retry.R) {
if !seeEachOther(leader.WANMembers(), member.WANMembers(), leaderAddr, memberAddr) {
r.Fatalf("leader and member cannot see each other on WAN")

View File

@ -6,7 +6,6 @@ import (
bexpr "github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/serf/serf"
"github.com/hashicorp/consul/acl"
@ -418,16 +417,7 @@ func (m *Internal) EventFire(args *structs.EventFireRequest,
eventName := userEventName(args.Name)
// Fire the event on all LAN segments
segments := m.srv.LANSegments()
var errs error
for name, segment := range segments {
err := segment.UserEvent(eventName, args.Payload, false)
if err != nil {
err = fmt.Errorf("error broadcasting event to segment %q: %v", name, err)
errs = multierror.Append(errs, err)
}
}
return errs
return m.srv.LANSendUserEvent(eventName, args.Payload, false)
}
// KeyringOperation will query the WAN and LAN gossip keyrings of all nodes.
@ -492,14 +482,18 @@ func (m *Internal) KeyringOperation(
func (m *Internal) executeKeyringOpLAN(args *structs.KeyringRequest) []*structs.KeyringResponse {
responses := []*structs.KeyringResponse{}
segments := m.srv.LANSegments()
for name, segment := range segments {
mgr := segment.KeyManager()
_ = m.srv.DoWithLANSerfs(func(poolName, poolKind string, pool *serf.Serf) error {
mgr := pool.KeyManager()
serfResp, err := m.executeKeyringOpMgr(mgr, args)
resp := translateKeyResponseToKeyringResponse(serfResp, m.srv.config.Datacenter, err)
resp.Segment = name
if poolKind == PoolKindSegment {
resp.Segment = poolName
} else {
resp.Partition = poolName
}
responses = append(responses, &resp)
}
return nil
}, nil)
return responses
}

View File

@ -134,13 +134,8 @@ func (s *Server) leaderLoop(stopCh chan struct{}) {
// Fire a user event indicating a new leader
payload := []byte(s.config.NodeName)
for name, segment := range s.LANSegments() {
if err := segment.UserEvent(newLeaderEvent, payload, false); err != nil {
s.logger.Warn("failed to broadcast new leader event on segment",
"segment", name,
"error", err,
)
}
if err := s.LANSendUserEvent(newLeaderEvent, payload, false); err != nil {
s.logger.Warn("failed to broadcast new leader event", "error", err)
}
// Reconcile channel is only used once initial reconcile

View File

@ -0,0 +1,14 @@
//go:build !consulent
// +build !consulent
package consul
import libserf "github.com/hashicorp/consul/lib/serf"
func updateSerfTags(s *Server, key, value string) {
libserf.UpdateTag(s.serfLAN, key, value)
if s.serfWAN != nil {
libserf.UpdateTag(s.serfWAN, key, value)
}
}

View File

@ -17,7 +17,6 @@ import (
"github.com/hashicorp/consul/agent/structs"
tokenStore "github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/api"
libserf "github.com/hashicorp/consul/lib/serf"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc"
@ -1770,14 +1769,6 @@ func TestDatacenterSupportsFederationStates(t *testing.T) {
})
}
func updateSerfTags(s *Server, key, value string) {
libserf.UpdateTag(s.serfLAN, key, value)
if s.serfWAN != nil {
libserf.UpdateTag(s.serfWAN, key, value)
}
}
func TestDatacenterSupportsIntentionsAsConfigEntries(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")

View File

@ -14,16 +14,10 @@ import (
// ring. We check that the peers are in the same datacenter and abort the
// merge if there is a mis-match.
type lanMergeDelegate struct {
dc string
nodeID types.NodeID
nodeName string
segment string
// TODO(partitions): use server and partition to reject gossip messages
// from nodes in the wrong partition depending upon the role the node is
// playing. For example servers will always be in the default partition,
// but all clients in all partitions should be aware of the servers so that
// general RPC routing works.
dc string
nodeID types.NodeID
nodeName string
segment string
server bool
partition string
}
@ -81,9 +75,8 @@ func (md *lanMergeDelegate) NotifyMerge(members []*serf.Member) error {
}
}
if segment := m.Tags["segment"]; segment != md.segment {
return fmt.Errorf("Member '%s' part of wrong segment '%s' (expected '%s')",
m.Name, segment, md.segment)
if err := md.enterpriseNotifyMergeMember(m); err != nil {
return err
}
}
return nil

22
agent/consul/merge_oss.go Normal file
View File

@ -0,0 +1,22 @@
//go:build !consulent
// +build !consulent
package consul
import (
"fmt"
"github.com/hashicorp/serf/serf"
)
func (md *lanMergeDelegate) enterpriseNotifyMergeMember(m *serf.Member) error {
if memberPartition := m.Tags["ap"]; memberPartition != "" {
return fmt.Errorf("Member '%s' part of partition '%s'; Partitions are a Consul Enterprise feature",
m.Name, memberPartition)
}
if segment := m.Tags["segment"]; segment != "" {
return fmt.Errorf("Member '%s' part of segment '%s'; Network Segments are a Consul Enterprise feature",
m.Name, segment)
}
return nil
}

View File

@ -0,0 +1,76 @@
//go:build !consulent
// +build !consulent
package consul
import (
"testing"
"github.com/hashicorp/serf/serf"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/types"
)
func TestMerge_OSS_LAN(t *testing.T) {
type testcase struct {
segment string
server bool
partition string
members []*serf.Member
expect string
}
const thisNodeID = "ee954a2f-80de-4b34-8780-97b942a50a99"
run := func(t *testing.T, tc testcase) {
delegate := &lanMergeDelegate{
dc: "dc1",
nodeID: types.NodeID(thisNodeID),
nodeName: "node0",
segment: tc.segment,
server: tc.server,
partition: tc.partition,
}
err := delegate.NotifyMerge(tc.members)
if tc.expect == "" {
require.NoError(t, err)
} else {
testutil.RequireErrorContains(t, err, tc.expect)
}
}
cases := map[string]testcase{
"node in a segment": {
members: []*serf.Member{
makeTestNode(t, testMember{
dc: "dc1",
name: "node1",
build: "0.7.5",
segment: "alpha",
}),
},
expect: `Member 'node1' part of segment 'alpha'; Network Segments are a Consul Enterprise feature`,
},
"node in a partition": {
members: []*serf.Member{
makeTestNode(t, testMember{
dc: "dc1",
name: "node1",
build: "0.7.5",
partition: "part1",
}),
},
expect: `Member 'node1' part of partition 'part1'; Partitions are a Consul Enterprise feature`,
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
run(t, tc)
})
}
}

View File

@ -1,190 +1,232 @@
package consul
import (
"strings"
"testing"
"github.com/hashicorp/consul/types"
uuid "github.com/hashicorp/go-uuid"
"github.com/hashicorp/serf/serf"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/types"
)
func makeNode(dc, name, id string, server bool, build string) *serf.Member {
var role string
if server {
role = "consul"
} else {
role = "node"
func TestMerge_LAN(t *testing.T) {
type testcase struct {
members []*serf.Member
expect string
}
return &serf.Member{
Name: name,
const thisNodeID = "ee954a2f-80de-4b34-8780-97b942a50a99"
run := func(t *testing.T, tc testcase) {
delegate := &lanMergeDelegate{
dc: "dc1",
nodeID: types.NodeID(thisNodeID),
nodeName: "node0",
}
err := delegate.NotifyMerge(tc.members)
if tc.expect == "" {
require.NoError(t, err)
} else {
testutil.RequireErrorContains(t, err, tc.expect)
}
}
cases := map[string]testcase{
"client in the wrong datacenter": {
members: []*serf.Member{
makeTestNode(t, testMember{
dc: "dc2",
name: "node1",
server: false,
build: "0.7.5",
}),
},
expect: "wrong datacenter",
},
"server in the wrong datacenter": {
members: []*serf.Member{
makeTestNode(t, testMember{
dc: "dc2",
name: "node1",
server: true,
build: "0.7.5",
}),
},
expect: "wrong datacenter",
},
"node ID conflict with delegate's ID": {
members: []*serf.Member{
makeTestNode(t, testMember{
dc: "dc1",
name: "node1",
id: thisNodeID,
server: true,
build: "0.7.5",
}),
},
expect: "with this agent's ID",
},
"cluster with existing conflicting node IDs": {
members: []*serf.Member{
makeTestNode(t, testMember{
dc: "dc1",
name: "node1",
id: "6185913b-98d7-4441-bd8f-f7f7d854a4af",
server: true,
build: "0.8.5",
}),
makeTestNode(t, testMember{
dc: "dc1",
name: "node2",
id: "6185913b-98d7-4441-bd8f-f7f7d854a4af",
server: true,
build: "0.9.0",
}),
},
expect: "with member",
},
"cluster with existing conflicting node IDs, but version is old enough to skip the check": {
members: []*serf.Member{
makeTestNode(t, testMember{
dc: "dc1",
name: "node1",
id: "6185913b-98d7-4441-bd8f-f7f7d854a4af",
server: true,
build: "0.8.5",
}),
makeTestNode(t, testMember{
dc: "dc1",
name: "node2",
id: "6185913b-98d7-4441-bd8f-f7f7d854a4af",
server: true,
build: "0.8.4",
}),
},
expect: "with member",
},
"good cluster": {
members: []*serf.Member{
makeTestNode(t, testMember{
dc: "dc1",
name: "node1",
server: true,
build: "0.8.5",
}),
makeTestNode(t, testMember{
dc: "dc1",
name: "node2",
server: true,
build: "0.8.5",
}),
},
expect: "",
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
run(t, tc)
})
}
}
func TestMerge_WAN(t *testing.T) {
type testcase struct {
members []*serf.Member
expect string
}
run := func(t *testing.T, tc testcase) {
delegate := &wanMergeDelegate{}
err := delegate.NotifyMerge(tc.members)
if tc.expect == "" {
require.NoError(t, err)
} else {
testutil.RequireErrorContains(t, err, tc.expect)
}
}
cases := map[string]testcase{
"not a server": {
members: []*serf.Member{
makeTestNode(t, testMember{
dc: "dc2",
name: "node1",
server: false,
build: "0.7.5",
}),
},
expect: "not a server",
},
"good cluster": {
members: []*serf.Member{
makeTestNode(t, testMember{
dc: "dc2",
name: "node1",
server: true,
build: "0.7.5",
}),
makeTestNode(t, testMember{
dc: "dc3",
name: "node2",
server: true,
build: "0.7.5",
}),
},
expect: "",
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
run(t, tc)
})
}
}
type testMember struct {
dc string
name string
id string
server bool
build string
segment string
partition string
}
func (tm testMember) role() string {
if tm.server {
return "consul"
}
return "node"
}
func makeTestNode(t *testing.T, tm testMember) *serf.Member {
if tm.id == "" {
uuid, err := uuid.GenerateUUID()
require.NoError(t, err)
tm.id = uuid
}
m := &serf.Member{
Name: tm.name,
Tags: map[string]string{
"role": role,
"dc": dc,
"id": id,
"role": tm.role(),
"dc": tm.dc,
"id": tm.id,
"port": "8300",
"build": build,
"segment": tm.segment,
"build": tm.build,
"vsn": "2",
"vsn_max": "3",
"vsn_min": "2",
},
}
}
func TestMerge_LAN(t *testing.T) {
t.Parallel()
cases := []struct {
members []*serf.Member
expect string
}{
// Client in the wrong datacenter.
{
members: []*serf.Member{
makeNode("dc2",
"node1",
"96430788-246f-4379-94ce-257f7429e340",
false,
"0.7.5"),
},
expect: "wrong datacenter",
},
// Server in the wrong datacenter.
{
members: []*serf.Member{
makeNode("dc2",
"node1",
"96430788-246f-4379-94ce-257f7429e340",
true,
"0.7.5"),
},
expect: "wrong datacenter",
},
// Node ID conflict with delegate's ID.
{
members: []*serf.Member{
makeNode("dc1",
"node1",
"ee954a2f-80de-4b34-8780-97b942a50a99",
true,
"0.7.5"),
},
expect: "with this agent's ID",
},
// Cluster with existing conflicting node IDs.
{
members: []*serf.Member{
makeNode("dc1",
"node1",
"6185913b-98d7-4441-bd8f-f7f7d854a4af",
true,
"0.8.5"),
makeNode("dc1",
"node2",
"6185913b-98d7-4441-bd8f-f7f7d854a4af",
true,
"0.9.0"),
},
expect: "with member",
},
// Cluster with existing conflicting node IDs, but version is
// old enough to skip the check.
{
members: []*serf.Member{
makeNode("dc1",
"node1",
"6185913b-98d7-4441-bd8f-f7f7d854a4af",
true,
"0.8.5"),
makeNode("dc1",
"node2",
"6185913b-98d7-4441-bd8f-f7f7d854a4af",
true,
"0.8.4"),
},
expect: "with member",
},
// Good cluster.
{
members: []*serf.Member{
makeNode("dc1",
"node1",
"6185913b-98d7-4441-bd8f-f7f7d854a4af",
true,
"0.8.5"),
makeNode("dc1",
"node2",
"cda916bc-a357-4a19-b886-59419fcee50c",
true,
"0.8.5"),
},
expect: "",
},
}
delegate := &lanMergeDelegate{
dc: "dc1",
nodeID: types.NodeID("ee954a2f-80de-4b34-8780-97b942a50a99"),
nodeName: "node0",
segment: "",
}
for i, c := range cases {
if err := delegate.NotifyMerge(c.members); c.expect == "" {
if err != nil {
t.Fatalf("case %d: err: %v", i+1, err)
}
} else {
if err == nil || !strings.Contains(err.Error(), c.expect) {
t.Fatalf("case %d: err: %v", i+1, err)
}
}
}
}
func TestMerge_WAN(t *testing.T) {
t.Parallel()
cases := []struct {
members []*serf.Member
expect string
}{
// Not a server
{
members: []*serf.Member{
makeNode("dc2",
"node1",
"96430788-246f-4379-94ce-257f7429e340",
false,
"0.7.5"),
},
expect: "not a server",
},
// Good cluster.
{
members: []*serf.Member{
makeNode("dc2",
"node1",
"6185913b-98d7-4441-bd8f-f7f7d854a4af",
true,
"0.7.5"),
makeNode("dc3",
"node2",
"cda916bc-a357-4a19-b886-59419fcee50c",
true,
"0.7.5"),
},
expect: "",
},
}
delegate := &wanMergeDelegate{}
for i, c := range cases {
if err := delegate.NotifyMerge(c.members); c.expect == "" {
if err != nil {
t.Fatalf("case %d: err: %v", i+1, err)
}
} else {
if err == nil || !strings.Contains(err.Error(), c.expect) {
t.Fatalf("case %d: err: %v", i+1, err)
}
}
}
if tm.partition != "" {
m.Tags["ap"] = tm.partition
}
return m
}

View File

@ -1,14 +1,12 @@
//go:build !consulent
// +build !consulent
package consul
import (
"net"
"time"
"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/serf/serf"
"github.com/hashicorp/consul/agent/structs"
)
@ -37,7 +35,7 @@ func (s *Server) setupSegmentRPC() (map[string]net.Listener, error) {
// setupSegments returns an error if any segments are defined since the OSS
// version of Consul doesn't support them.
func (s *Server) setupSegments(config *Config, port int, rpcListeners map[string]net.Listener) error {
func (s *Server) setupSegments(config *Config, rpcListeners map[string]net.Listener) error {
if len(config.Segments) > 0 {
return structs.ErrSegmentsNotSupported
}
@ -48,28 +46,3 @@ func (s *Server) setupSegments(config *Config, port int, rpcListeners map[string
// floodSegments is a NOP in the OSS version of Consul.
func (s *Server) floodSegments(config *Config) {
}
func getSerfMemberEnterpriseMeta(member serf.Member) *structs.EnterpriseMeta {
return structs.NodeEnterpriseMetaInDefaultPartition()
}
// reconcile is used to reconcile the differences between Serf membership and
// what is reflected in our strongly consistent store. Mainly we need to ensure
// all live nodes are registered, all failed nodes are marked as such, and all
// left nodes are deregistered.
func (s *Server) reconcile() (err error) {
defer metrics.MeasureSince([]string{"leader", "reconcile"}, time.Now())
members := s.serfLAN.Members()
knownMembers := make(map[string]struct{})
for _, member := range members {
if err := s.reconcileMember(member); err != nil {
return err
}
knownMembers[member.Name] = struct{}{}
}
// Reconcile any members that have been reaped while we were not the
// leader.
return s.reconcileReaped(knownMembers, nil)
}

View File

@ -121,6 +121,11 @@ var (
ErrWANFederationDisabled = fmt.Errorf("WAN Federation is disabled")
)
const (
PoolKindPartition = "partition"
PoolKindSegment = "segment"
)
// Server is Consul server which manages the service discovery,
// health checking, DC forwarding, Raft, and multiple Serf pools.
type Server struct {
@ -248,11 +253,15 @@ type Server struct {
// serfLAN is the Serf cluster maintained inside the DC
// which contains all the DC nodes
//
// - If Network Segments are active, this only contains members in the
// default segment.
//
// - If Admin Partitions are active, this only contains members in the
// default partition.
//
serfLAN *serf.Serf
// segmentLAN maps segment names to their Serf cluster
segmentLAN map[string]*serf.Serf
// serfWAN is the Serf cluster maintained between DC's
// which SHOULD only consist of Consul servers
serfWAN *serf.Serf
@ -362,7 +371,6 @@ func NewServer(config *Config, flat Deps) (*Server, error) {
insecureRPCServer: rpc.NewServer(),
tlsConfigurator: flat.TLSConfigurator,
reassertLeaderCh: make(chan chan error),
segmentLAN: make(map[string]*serf.Serf, len(config.Segments)),
sessionTimers: NewSessionTimers(),
tombstoneGC: gc,
serverLookup: NewServerLookup(),
@ -483,10 +491,14 @@ func NewServer(config *Config, flat Deps) (*Server, error) {
// a little gross to be reading the updated config.
// Initialize the WAN Serf if enabled
serfBindPortWAN := -1
if config.SerfWANConfig != nil {
serfBindPortWAN = config.SerfWANConfig.MemberlistConfig.BindPort
s.serfWAN, err = s.setupSerf(config.SerfWANConfig, s.eventChWAN, serfWANSnapshot, true, serfBindPortWAN, "", s.Listener)
s.serfWAN, err = s.setupSerf(setupSerfOptions{
Config: config.SerfWANConfig,
EventCh: s.eventChWAN,
SnapshotPath: serfWANSnapshot,
WAN: true,
Listener: s.Listener,
})
if err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to start WAN Serf: %v", err)
@ -497,6 +509,7 @@ func NewServer(config *Config, flat Deps) (*Server, error) {
s.memberlistTransportWAN = config.SerfWANConfig.MemberlistConfig.Transport.(wanfed.IngestionAwareTransport)
// See big comment above why we are doing this.
serfBindPortWAN := config.SerfWANConfig.MemberlistConfig.BindPort
if serfBindPortWAN == 0 {
serfBindPortWAN = config.SerfWANConfig.MemberlistConfig.BindPort
if serfBindPortWAN == 0 {
@ -508,14 +521,13 @@ func NewServer(config *Config, flat Deps) (*Server, error) {
// Initialize the LAN segments before the default LAN Serf so we have
// updated port information to publish there.
if err := s.setupSegments(config, serfBindPortWAN, segmentListeners); err != nil {
if err := s.setupSegments(config, segmentListeners); err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to setup network segments: %v", err)
}
// Initialize the LAN Serf for the default network segment.
s.serfLAN, err = s.setupSerf(config.SerfLANConfig, s.eventChLAN, serfLANSnapshot, false, serfBindPortWAN, "", s.Listener)
if err != nil {
if err := s.setupSerfLAN(config); err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to start LAN Serf: %v", err)
}
@ -926,13 +938,7 @@ func (s *Server) Shutdown() error {
s.leaderRoutineManager.StopAll()
}
if s.serfLAN != nil {
s.serfLAN.Shutdown()
}
for _, segment := range s.segmentLAN {
segment.Shutdown()
}
s.shutdownSerfLAN()
if s.serfWAN != nil {
s.serfWAN.Shutdown()
@ -942,6 +948,8 @@ func (s *Server) Shutdown() error {
}
s.router.Shutdown()
// TODO: actually shutdown areas?
if s.raft != nil {
s.raftTransport.Close()
s.raftLayer.Close()
@ -1100,13 +1108,6 @@ func (s *Server) Leave() error {
return nil
}
// JoinLAN is used to have Consul join the inner-DC pool The target address
// should be another node inside the DC listening on the Serf LAN address
func (s *Server) JoinLAN(addrs []string, entMeta *structs.EnterpriseMeta) (int, error) {
// TODO(partitions): handle the different partitions
return s.serfLAN.Join(addrs, true)
}
// JoinWAN is used to have Consul join the cross-WAN Consul ring
// The target address should be another node listening on the
// Serf WAN address
@ -1157,7 +1158,9 @@ func (s *Server) AgentLocalMember() serf.Member {
return s.serfLAN.LocalMember()
}
// LANMembersInAgentPartition is used to return the members of the LAN cluster
// LANMembersInAgentPartition returns the LAN members for this agent's
// canonical serf pool. For clients this is the only pool that exists. For
// servers it's the pool in the default segment and the default partition.
func (s *Server) LANMembersInAgentPartition() []serf.Member {
return s.serfLAN.Members()
}
@ -1172,7 +1175,6 @@ func (s *Server) WANMembers() []serf.Member {
// RemoveFailedNode is used to remove a failed node from the cluster.
func (s *Server) RemoveFailedNode(node string, prune bool, entMeta *structs.EnterpriseMeta) error {
// TODO(partitions): handle the different partitions
var removeFn func(*serf.Serf, string) error
if prune {
removeFn = (*serf.Serf).RemoveFailedNodePrune
@ -1180,10 +1182,6 @@ func (s *Server) RemoveFailedNode(node string, prune bool, entMeta *structs.Ente
removeFn = (*serf.Serf).RemoveFailedNode
}
if err := removeFn(s.serfLAN, node); err != nil {
return err
}
wanNode := node
// The Serf WAN pool stores members as node.datacenter
@ -1191,13 +1189,8 @@ func (s *Server) RemoveFailedNode(node string, prune bool, entMeta *structs.Ente
if !strings.HasSuffix(node, "."+s.config.Datacenter) {
wanNode = node + "." + s.config.Datacenter
}
if s.serfWAN != nil {
if err := removeFn(s.serfWAN, wanNode); err != nil {
return err
}
}
return s.removeFailedNodeEnterprise(removeFn, node, wanNode)
return s.removeFailedNode(removeFn, node, wanNode, entMeta)
}
// IsLeader checks if this server is the cluster leader
@ -1213,6 +1206,7 @@ func (s *Server) LeaderLastContact() time.Time {
// KeyManagerLAN returns the LAN Serf keyring manager
func (s *Server) KeyManagerLAN() *serf.KeyManager {
// NOTE: The serfLAN keymanager is shared by all partitions.
return s.serfLAN.KeyManager()
}
@ -1221,15 +1215,8 @@ func (s *Server) KeyManagerWAN() *serf.KeyManager {
return s.serfWAN.KeyManager()
}
// LANSegments returns a map of LAN segments by name
func (s *Server) LANSegments() map[string]*serf.Serf {
segments := make(map[string]*serf.Serf, len(s.segmentLAN)+1)
segments[""] = s.serfLAN
for name, segment := range s.segmentLAN {
segments[name] = segment
}
return segments
func (s *Server) AgentEnterpriseMeta() *structs.EnterpriseMeta {
return s.config.AgentEnterpriseMeta()
}
// inmemCodec is used to do an RPC call without going over a network
@ -1379,10 +1366,25 @@ func (s *Server) Stats() map[string]map[string]string {
stats["serf_wan"] = s.serfWAN.Stats()
}
s.addEnterpriseStats(stats)
return stats
}
// GetLANCoordinate returns the coordinate of the server in the LAN gossip pool.
// GetLANCoordinate returns the coordinate of the node in the LAN gossip
// pool.
//
// - Clients return a single coordinate for the single gossip pool they are
// in (default, segment, or partition).
//
// - Servers return one coordinate for their canonical gossip pool (i.e.
// default partition/segment) and one per segment they are also ancillary
// members of.
//
// NOTE: servers do not emit coordinates for partitioned gossip pools they
// are ancillary members of.
//
// NOTE: This assumes coordinates are enabled, so check that before calling.
func (s *Server) GetLANCoordinate() (lib.CoordinateSet, error) {
lan, err := s.serfLAN.GetCoordinate()
if err != nil {
@ -1390,16 +1392,17 @@ func (s *Server) GetLANCoordinate() (lib.CoordinateSet, error) {
}
cs := lib.CoordinateSet{"": lan}
for name, segment := range s.segmentLAN {
c, err := segment.GetCoordinate()
if err != nil {
return nil, err
}
cs[name] = c
if err := s.addEnterpriseLANCoordinates(cs); err != nil {
return nil, err
}
return cs, nil
}
func (s *Server) agentSegmentName() string {
return s.config.Segment
}
// ReloadConfig is used to have the Server do an online reload of
// relevant configuration information
func (s *Server) ReloadConfig(config ReloadableConfig) error {

View File

@ -4,18 +4,68 @@
package consul
import (
"fmt"
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/serf"
"google.golang.org/grpc"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
)
func (s *Server) removeFailedNodeEnterprise(remove func(*serf.Serf, string) error, node, wanNode string) error {
// nothing to do for oss
return nil
func (s *Server) registerEnterpriseGRPCServices(deps Deps, srv *grpc.Server) {}
// JoinLAN is used to have Consul join the inner-DC pool The target address
// should be another node inside the DC listening on the Serf LAN address
func (s *Server) JoinLAN(addrs []string, entMeta *structs.EnterpriseMeta) (int, error) {
return s.serfLAN.Join(addrs, true)
}
func (s *Server) registerEnterpriseGRPCServices(deps Deps, srv *grpc.Server) {}
// removeFailedNode is used to remove a failed node from the cluster
func (s *Server) removeFailedNode(
removeFn func(*serf.Serf, string) error,
node, wanNode string,
entMeta *structs.EnterpriseMeta,
) error {
maybeRemove := func(s *serf.Serf, node string) (bool, error) {
if !isSerfMember(s, node) {
return false, nil
}
return true, removeFn(s, node)
}
foundAny := false
var merr error
if found, err := maybeRemove(s.serfLAN, node); err != nil {
merr = multierror.Append(merr, fmt.Errorf("could not remove failed node from LAN: %w", err))
} else if found {
foundAny = true
}
if s.serfWAN != nil {
if found, err := maybeRemove(s.serfWAN, wanNode); err != nil {
merr = multierror.Append(merr, fmt.Errorf("could not remove failed node from WAN: %w", err))
} else if found {
foundAny = true
}
}
if merr != nil {
return merr
}
if !foundAny {
return fmt.Errorf("agent: No node found with name '%s'", node)
}
return nil
}
// lanPoolAllMembers only returns our own segment or partition's members, because
// OSS servers can't be in multiple segments or partitions.
@ -39,3 +89,62 @@ func (s *Server) LANMembers(filter LANMemberFilter) ([]serf.Member, error) {
}
return s.LANMembersInAgentPartition(), nil
}
func (s *Server) GetMatchingLANCoordinate(_, _ string) (*coordinate.Coordinate, error) {
return s.serfLAN.GetCoordinate()
}
func (s *Server) addEnterpriseLANCoordinates(cs lib.CoordinateSet) error {
return nil
}
func (s *Server) LANSendUserEvent(name string, payload []byte, coalesce bool) error {
err := s.serfLAN.UserEvent(name, payload, coalesce)
if err != nil {
return fmt.Errorf("error broadcasting event: %w", err)
}
return nil
}
func (s *Server) DoWithLANSerfs(
fn func(name, poolKind string, pool *serf.Serf) error,
errorFn func(name, poolKind string, err error) error,
) error {
if errorFn == nil {
errorFn = func(_, _ string, err error) error { return err }
}
err := fn("", "", s.serfLAN)
if err != nil {
return errorFn("", "", err)
}
return nil
}
// reconcile is used to reconcile the differences between Serf membership and
// what is reflected in our strongly consistent store. Mainly we need to ensure
// all live nodes are registered, all failed nodes are marked as such, and all
// left nodes are deregistered.
func (s *Server) reconcile() (err error) {
defer metrics.MeasureSince([]string{"leader", "reconcile"}, time.Now())
members := s.serfLAN.Members()
knownMembers := make(map[string]struct{})
for _, member := range members {
if err := s.reconcileMember(member); err != nil {
return err
}
knownMembers[member.Name] = struct{}{}
}
// Reconcile any members that have been reaped while we were not the
// leader.
return s.reconcileReaped(knownMembers, nil)
}
func (s *Server) addEnterpriseStats(stats map[string]map[string]string) {
// no-op
}
func getSerfMemberEnterpriseMeta(member serf.Member) *structs.EnterpriseMeta {
return structs.NodeEnterpriseMetaInDefaultPartition()
}

View File

@ -1,6 +1,7 @@
package consul
import (
"errors"
"fmt"
"net"
"path/filepath"
@ -32,29 +33,69 @@ const (
maxPeerRetries = 6
)
type setupSerfOptions struct {
Config *serf.Config
EventCh chan serf.Event
SnapshotPath string
Listener net.Listener
// WAN only
WAN bool
// LAN only
Segment string
Partition string
}
// setupSerf is used to setup and initialize a Serf
func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, wan bool, wanPort int,
segment string, listener net.Listener) (*serf.Serf, error) {
func (s *Server) setupSerf(opts setupSerfOptions) (*serf.Serf, error) {
conf, err := s.setupSerfConfig(opts)
if err != nil {
return nil, err
}
return serf.Create(conf)
}
func (s *Server) setupSerfConfig(opts setupSerfOptions) (*serf.Config, error) {
if opts.Config == nil {
return nil, errors.New("serf config is a required field")
}
if opts.Listener == nil {
return nil, errors.New("listener is a required field")
}
if opts.WAN {
if opts.Segment != "" {
return nil, errors.New("cannot configure segments on the WAN serf pool")
}
if opts.Partition != "" {
return nil, errors.New("cannot configure partitions on the WAN serf pool")
}
}
conf := opts.Config
conf.Init()
if wan {
if opts.WAN {
conf.NodeName = fmt.Sprintf("%s.%s", s.config.NodeName, s.config.Datacenter)
} else {
conf.NodeName = s.config.NodeName
if wanPort > 0 {
conf.Tags["wan_join_port"] = fmt.Sprintf("%d", wanPort)
if s.config.SerfWANConfig != nil {
serfBindPortWAN := s.config.SerfWANConfig.MemberlistConfig.BindPort
if serfBindPortWAN > 0 {
conf.Tags["wan_join_port"] = fmt.Sprintf("%d", serfBindPortWAN)
}
}
}
conf.Tags["role"] = "consul"
conf.Tags["dc"] = s.config.Datacenter
conf.Tags["segment"] = segment
conf.Tags["segment"] = opts.Segment
conf.Tags["id"] = string(s.config.NodeID)
conf.Tags["vsn"] = fmt.Sprintf("%d", s.config.ProtocolVersion)
conf.Tags["vsn_min"] = fmt.Sprintf("%d", ProtocolVersionMin)
conf.Tags["vsn_max"] = fmt.Sprintf("%d", ProtocolVersionMax)
conf.Tags["raft_vsn"] = fmt.Sprintf("%d", s.config.RaftConfig.ProtocolVersion)
conf.Tags["build"] = s.config.Build
addr := listener.Addr().(*net.TCPAddr)
addr := opts.Listener.Addr().(*net.TCPAddr)
conf.Tags["port"] = fmt.Sprintf("%d", addr.Port)
if s.config.Bootstrap {
conf.Tags["bootstrap"] = "1"
@ -87,7 +128,7 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w
conf.Tags["ft_si"] = "1"
var subLoggerName string
if wan {
if opts.WAN {
subLoggerName = logging.WAN
} else {
subLoggerName = logging.LAN
@ -107,22 +148,23 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w
conf.MemberlistConfig.Logger = memberlistLogger
conf.Logger = serfLogger
conf.EventCh = ch
conf.EventCh = opts.EventCh
conf.ProtocolVersion = protocolVersionMap[s.config.ProtocolVersion]
conf.RejoinAfterLeave = s.config.RejoinAfterLeave
if wan {
if opts.WAN {
conf.Merge = &wanMergeDelegate{}
} else {
conf.Merge = &lanMergeDelegate{
dc: s.config.Datacenter,
nodeID: s.config.NodeID,
nodeName: s.config.NodeName,
segment: segment,
server: true,
dc: s.config.Datacenter,
nodeID: s.config.NodeID,
nodeName: s.config.NodeName,
segment: opts.Segment,
partition: opts.Partition,
server: true,
}
}
if wan {
if opts.WAN {
nt, err := memberlist.NewNetTransport(&memberlist.NetTransportConfig{
BindAddrs: []string{conf.MemberlistConfig.BindAddr},
BindPort: conf.MemberlistConfig.BindPort,
@ -154,7 +196,7 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w
// node which is rather unexpected.
conf.EnableNameConflictResolution = false
if wan && s.config.ConnectMeshGatewayWANFederationEnabled {
if opts.WAN && s.config.ConnectMeshGatewayWANFederationEnabled {
conf.MemberlistConfig.RequireNodeNames = true
conf.MemberlistConfig.DisableTcpPingsForNode = func(nodeName string) bool {
_, dc, err := wanfed.SplitNodeName(nodeName)
@ -169,7 +211,7 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w
}
if !s.config.DevMode {
conf.SnapshotPath = filepath.Join(s.config.DataDir, path)
conf.SnapshotPath = filepath.Join(s.config.DataDir, opts.SnapshotPath)
}
if err := lib.EnsurePath(conf.SnapshotPath, false); err != nil {
return nil, err
@ -183,7 +225,7 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w
s.config.OverrideInitialSerfTags(conf.Tags)
}
return serf.Create(conf)
return conf, nil
}
// userEventName computes the name of a user event

View File

@ -32,7 +32,6 @@ import (
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/consul/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -389,35 +388,38 @@ func TestServer_JoinLAN(t *testing.T) {
})
}
// TestServer_JoinLAN_SerfAllowedCIDRs test that IPs might be blocked
// with Serf.
// To run properly, this test requires to be able to bind and have access
// on 127.0.1.1 which is the case for most Linux machines and Windows,
// so Unit test will run in the CI.
// To run it on Mac OS, please run this commandd first, otherwise the
// test will be skipped: `sudo ifconfig lo0 alias 127.0.1.1 up`
// TestServer_JoinLAN_SerfAllowedCIDRs test that IPs might be blocked with
// Serf.
//
// To run properly, this test requires to be able to bind and have access on
// 127.0.1.1 which is the case for most Linux machines and Windows, so Unit
// test will run in the CI.
//
// To run it on Mac OS, please run this command first, otherwise the test will
// be skipped: `sudo ifconfig lo0 alias 127.0.1.1 up`
func TestServer_JoinLAN_SerfAllowedCIDRs(t *testing.T) {
t.Parallel()
const targetAddr = "127.0.1.1"
skipIfCannotBindToIP(t, targetAddr)
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.BootstrapExpect = 1
lan, err := memberlist.ParseCIDRs([]string{"127.0.0.1/32"})
assert.NoError(t, err)
require.NoError(t, err)
c.SerfLANConfig.MemberlistConfig.CIDRsAllowed = lan
wan, err := memberlist.ParseCIDRs([]string{"127.0.0.0/24", "::1/128"})
assert.NoError(t, err)
require.NoError(t, err)
c.SerfWANConfig.MemberlistConfig.CIDRsAllowed = wan
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
targetAddr := "127.0.1.1"
dir2, a2, err := testClientWithConfigWithErr(t, func(c *Config) {
dir2, a2 := testClientWithConfig(t, func(c *Config) {
c.SerfLANConfig.MemberlistConfig.BindAddr = targetAddr
})
defer os.RemoveAll(dir2)
if err != nil {
t.Skipf("Cannot bind on %s, to run on Mac OS: `sudo ifconfig lo0 alias 127.0.1.1 up`", targetAddr)
}
defer a2.Shutdown()
dir3, rs3 := testServerWithConfig(t, func(c *Config) {
@ -451,6 +453,80 @@ func TestServer_JoinLAN_SerfAllowedCIDRs(t *testing.T) {
})
}
// TestServer_JoinWAN_SerfAllowedCIDRs test that IPs might be
// blocked with Serf.
//
// To run properly, this test requires to be able to bind and have access on
// 127.0.1.1 which is the case for most Linux machines and Windows, so Unit
// test will run in the CI.
//
// To run it on Mac OS, please run this command first, otherwise the test will
// be skipped: `sudo ifconfig lo0 alias 127.0.1.1 up`
func TestServer_JoinWAN_SerfAllowedCIDRs(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
const targetAddr = "127.0.1.1"
skipIfCannotBindToIP(t, targetAddr)
wanCIDRs, err := memberlist.ParseCIDRs([]string{"127.0.0.1/32"})
require.NoError(t, err)
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.Bootstrap = true
c.BootstrapExpect = 1
c.Datacenter = "dc1"
c.SerfWANConfig.MemberlistConfig.CIDRsAllowed = wanCIDRs
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
waitForLeaderEstablishment(t, s1)
testrpc.WaitForLeader(t, s1.RPC, "dc1")
dir2, s2 := testServerWithConfig(t, func(c *Config) {
c.Bootstrap = true
c.BootstrapExpect = 1
c.PrimaryDatacenter = "dc1"
c.Datacenter = "dc2"
c.SerfWANConfig.MemberlistConfig.BindAddr = targetAddr
})
defer os.RemoveAll(dir2)
defer s2.Shutdown()
waitForLeaderEstablishment(t, s2)
testrpc.WaitForLeader(t, s2.RPC, "dc2")
// Joining should be fine
joinWANWithNoMembershipChecks(t, s2, s1)
// But membership is blocked if you go and take a peek on the server.
t.Run("LAN membership should only show each other", func(t *testing.T) {
require.Len(t, s1.LANMembersInAgentPartition(), 1)
require.Len(t, s2.LANMembersInAgentPartition(), 1)
})
t.Run("WAN membership in the primary should not show the secondary", func(t *testing.T) {
require.Len(t, s1.WANMembers(), 1)
})
t.Run("WAN membership in the secondary can show the primary", func(t *testing.T) {
require.Len(t, s2.WANMembers(), 2)
})
}
func skipIfCannotBindToIP(t *testing.T, ip string) {
ports := freeport.MustTake(1)
defer freeport.Return(ports)
addr := ipaddr.FormatAddressPort(ip, ports[0])
l, err := net.Listen("tcp", addr)
l.Close()
if err != nil {
t.Skipf("Cannot bind on %s, to run on Mac OS: `sudo ifconfig lo0 alias %s up`", ip, ip)
}
}
func TestServer_LANReap(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")

View File

@ -158,3 +158,12 @@ func (c *Client) CheckServers(datacenter string, fn func(*metadata.Server) bool)
c.router.CheckServers(datacenter, fn)
}
func isSerfMember(s *serf.Serf, nodeName string) bool {
for _, m := range s.Members() {
if m.Name == nodeName {
return true
}
}
return false
}

View File

@ -8,11 +8,12 @@ import (
"testing"
"time"
"github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/serf/coordinate"
)
func TestCoordinate_Disabled_Response(t *testing.T) {
@ -137,7 +138,6 @@ func TestCoordinate_Nodes(t *testing.T) {
arg1 := structs.CoordinateUpdateRequest{
Datacenter: "dc1",
Node: "foo",
Segment: "alpha",
Coord: coordinate.NewCoordinate(coordinate.DefaultConfig()),
}
var out struct{}
@ -178,65 +178,6 @@ func TestCoordinate_Nodes(t *testing.T) {
r.Fatalf("expected: bar, foo received: %v", coordinates)
}
})
// Filter on a nonexistent node segment
req, _ = http.NewRequest("GET", "/v1/coordinate/nodes?segment=nope", nil)
resp = httptest.NewRecorder()
retry.Run(t, func(r *retry.R) {
obj, err := a.srv.CoordinateNodes(resp, req)
if err != nil {
r.Fatalf("err: %v", err)
}
if resp.Code != http.StatusOK {
r.Fatalf("bad: %v", resp.Code)
}
coordinates, ok := obj.(structs.Coordinates)
if !ok {
r.Fatalf("expected: structs.Coordinates, received: %+v", obj)
}
if len(coordinates) != 0 {
r.Fatalf("coordinates should be empty, received: %v", coordinates)
}
})
// Filter on a real node segment
req, _ = http.NewRequest("GET", "/v1/coordinate/nodes?segment=alpha", nil)
resp = httptest.NewRecorder()
retry.Run(t, func(r *retry.R) {
obj, err := a.srv.CoordinateNodes(resp, req)
if err != nil {
r.Fatalf("err: %v", err)
}
if resp.Code != http.StatusOK {
r.Fatalf("bad: %v", resp.Code)
}
coordinates, ok := obj.(structs.Coordinates)
if !ok {
r.Fatalf("expected: structs.Coordinates, received: %+v", obj)
}
if len(coordinates) != 1 || coordinates[0].Node != "foo" {
r.Fatalf("expected: foo received: %v", coordinates)
}
})
// Make sure the empty filter works
req, _ = http.NewRequest("GET", "/v1/coordinate/nodes?segment=", nil)
resp = httptest.NewRecorder()
retry.Run(t, func(r *retry.R) {
obj, err := a.srv.CoordinateNodes(resp, req)
if err != nil {
r.Fatalf("err: %v", err)
}
coordinates, ok := obj.(structs.Coordinates)
if !ok {
r.Fatalf("expected: structs.Coordinates, received: %+v", obj)
}
if len(coordinates) != 1 || coordinates[0].Node != "bar" {
r.Fatalf("expected: bar received: %v", coordinates)
}
})
}
func TestCoordinate_Node(t *testing.T) {
@ -280,7 +221,6 @@ func TestCoordinate_Node(t *testing.T) {
arg1 := structs.CoordinateUpdateRequest{
Datacenter: "dc1",
Node: "foo",
Segment: "alpha",
Coord: coordinate.NewCoordinate(coordinate.DefaultConfig()),
}
var out struct{}
@ -315,45 +255,6 @@ func TestCoordinate_Node(t *testing.T) {
coordinates[0].Node != "foo" {
t.Fatalf("bad: %v", coordinates)
}
// Filter on a nonexistent node segment
req, _ = http.NewRequest("GET", "/v1/coordinate/node/foo?segment=nope", nil)
resp = httptest.NewRecorder()
_, err = a.srv.CoordinateNode(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != http.StatusNotFound {
t.Fatalf("bad: %v", resp.Code)
}
// Filter on a real node segment
req, _ = http.NewRequest("GET", "/v1/coordinate/node/foo?segment=alpha", nil)
resp = httptest.NewRecorder()
obj, err = a.srv.CoordinateNode(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != http.StatusOK {
t.Fatalf("bad: %v", resp.Code)
}
coordinates = obj.(structs.Coordinates)
if len(coordinates) != 1 || coordinates[0].Node != "foo" {
t.Fatalf("bad: %v", coordinates)
}
// Make sure the empty filter works
req, _ = http.NewRequest("GET", "/v1/coordinate/node/foo?segment=", nil)
resp = httptest.NewRecorder()
_, err = a.srv.CoordinateNode(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != http.StatusNotFound {
t.Fatalf("bad: %v", resp.Code)
}
}
func TestCoordinate_Update(t *testing.T) {

View File

@ -2042,6 +2042,7 @@ func TestAgent_sendCoordinate(t *testing.T) {
}
t.Parallel()
a := agent.StartTestAgent(t, agent.TestAgent{Overrides: `
sync_coordinate_interval_min = "1ms"
sync_coordinate_rate_target = 10.0

View File

@ -32,6 +32,7 @@ type Server struct {
SegmentAddrs map[string]string
SegmentPorts map[string]int
WanJoinPort int
LanJoinPort int
Bootstrap bool
Expect int
Build version.Version
@ -168,6 +169,7 @@ func IsConsulServer(m serf.Member) (bool, *Server) {
SegmentAddrs: segmentAddrs,
SegmentPorts: segmentPorts,
WanJoinPort: wanJoinPort,
LanJoinPort: int(m.Port),
Bootstrap: bootstrap,
Expect: expect,
Addr: addr,

View File

@ -6,11 +6,12 @@ import (
"strconv"
"time"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/raft"
autopilot "github.com/hashicorp/raft-autopilot"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
)
// OperatorRaftConfiguration is used to inspect the current Raft configuration.
@ -172,6 +173,11 @@ func keyringErrorsOrNil(responses []*structs.KeyringResponse) error {
if response.WAN {
pool = "WAN"
}
if response.Segment != "" {
pool += " [segment: " + response.Segment + "]"
} else if !structs.IsDefaultPartition(response.Partition) {
pool += " [partition: " + response.Partition + "]"
}
errs = multierror.Append(errs, fmt.Errorf("%s error: %s", pool, response.Error))
for key, message := range response.Messages {
errs = multierror.Append(errs, fmt.Errorf("%s: %s", key, message))

View File

@ -2591,6 +2591,7 @@ type KeyringResponse struct {
WAN bool
Datacenter string
Segment string
Partition string `json:",omitempty"`
Messages map[string]string `json:",omitempty"`
Keys map[string]int
PrimaryKeys map[string]int
@ -2598,6 +2599,10 @@ type KeyringResponse struct {
Error string `json:",omitempty"`
}
func (r *KeyringResponse) PartitionOrDefault() string {
return PartitionOrDefault(r.Partition)
}
// KeyringResponses holds multiple responses to keyring queries. Each
// datacenter replies independently, and KeyringResponses is used as a
// container for the set of all responses.

View File

@ -10,7 +10,7 @@ require (
github.com/hashicorp/go-hclog v0.12.0
github.com/hashicorp/go-rootcerts v1.0.2
github.com/hashicorp/go-uuid v1.0.1
github.com/hashicorp/serf v0.9.5
github.com/hashicorp/serf v0.9.6
github.com/mitchellh/mapstructure v1.1.2
github.com/stretchr/testify v1.4.0
)

View File

@ -36,11 +36,11 @@ github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/b
github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
github.com/hashicorp/mdns v1.0.1/go.mod h1:4gW7WsVCke5TE7EPeYliwHlRUyBtfCwuFwuMg2DmyNY=
github.com/hashicorp/memberlist v0.2.2 h1:5+RffWKwqJ71YPu9mWsF7ZOscZmwfasdA8kbdC7AO2g=
github.com/hashicorp/memberlist v0.2.2/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE=
github.com/hashicorp/serf v0.9.5 h1:EBWvyu9tcRszt3Bxp3KNssBMP1KuHWyO51lz9+786iM=
github.com/hashicorp/serf v0.9.5/go.mod h1:UWDWwZeL5cuWDJdl0C6wrvrUwEqtQ4ZKBKKENpqIUyk=
github.com/hashicorp/mdns v1.0.4/go.mod h1:mtBihi+LeNXGtG8L9dX59gAEa12BDtBQSp4v/YAJqrc=
github.com/hashicorp/memberlist v0.3.0 h1:8+567mCcFDnS5ADl7lrpxPMWiFCElyUEeW0gtj34fMA=
github.com/hashicorp/memberlist v0.3.0/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE=
github.com/hashicorp/serf v0.9.6 h1:uuEX1kLR6aoda1TBttmJQKDLZE1Ob7KN0NPdE7EtCDc=
github.com/hashicorp/serf v0.9.6/go.mod h1:TXZNMjZQijwlDvp+r0b63xZ45H7JmCmgg4gpTwn9UV4=
github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs=
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
@ -56,9 +56,9 @@ github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcME
github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE=
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
github.com/miekg/dns v1.1.26 h1:gPxPSwALAeHJSjarOs00QjVdV9QoBvc1D2ujQUr5BzU=
github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso=
github.com/miekg/dns v1.1.41 h1:WMszZWJG0XmzbK9FEmzH2TVcqYzFesusSIB41b8KHxY=
github.com/miekg/dns v1.1.41/go.mod h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJysuI=
github.com/mitchellh/cli v1.1.0/go.mod h1:xcISNoH86gajksDmfB23e/pu+B+GeFRMYmoHXxx3xhI=
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
@ -83,20 +83,18 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392 h1:ACG4HJsFiNMf47Y4PeRoebLNy/2lXT9EtprMuTFWt1M=
golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392/go.mod h1:/lpIB1dKB+9EgE3H3cr1v9wB50oz8l4C4h62xy7jSTY=
golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190923162816-aa69164e4478 h1:l5EDrHhldLYb3ZRHDUhXF7Om7MvYXnkV9/iQNo1lX6g=
golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1 h1:4qWs8cYYH6PoEFy4dfhDFgoMGkwAcETd+MmPdCPMzUc=
golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1/go.mod h1:9tjilg8BloeKEkVJvy7fQ90B1CfIiPueXVOjqfkSzI8=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190922100055-0a153f010e69/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -105,10 +103,16 @@ golang.org/x/sys v0.0.0-20191008105621-543471e840be/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200124204421-9fbb57f87de9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae h1:/WDfKMnPU+m5M4xB+6x4kaepxRw6jWvR5iDRdvjHgy8=
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210303074136-134d130e1a04/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44 h1:Bli41pIlzTzf3KEY06n+xnzK/BESIg2ze4Pgfh/aI8c=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190907020128-2ca718005c18/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

View File

@ -16,6 +16,9 @@ type KeyringResponse struct {
// Segment has the network segment this request corresponds to.
Segment string
// Partition has the admin partition this request corresponds to.
Partition string `json:",omitempty"`
// Messages has information or errors from serf
Messages map[string]string `json:",omitempty"`

View File

@ -18,7 +18,6 @@ func TestForceLeaveCommand_noTabs(t *testing.T) {
}
}
// TODO(partitions): split this test and verify it works in partitions
func TestForceLeaveCommand(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
@ -62,7 +61,6 @@ func TestForceLeaveCommand(t *testing.T) {
})
}
// TODO(partitions): split this test and verify it works in partitions
func TestForceLeaveCommand_NoNodeWithName(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")

View File

@ -16,8 +16,7 @@ func TestJoinCommand_noTabs(t *testing.T) {
}
}
// TODO(partitions): split this test and verify it works in partitions
func TestJoinCommandJoin_lan(t *testing.T) {
func TestJoinCommandJoin_LAN(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}

View File

@ -5,10 +5,12 @@ import (
"fmt"
"strings"
"github.com/mitchellh/cli"
"github.com/hashicorp/consul/agent"
"github.com/hashicorp/consul/agent/structs"
consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/command/flags"
"github.com/mitchellh/cli"
)
func New(ui cli.Ui) *cmd {
@ -185,21 +187,25 @@ func (c *cmd) Run(args []string) int {
func formatResponse(response *consulapi.KeyringResponse, keys map[string]int) string {
b := new(strings.Builder)
b.WriteString("\n")
b.WriteString(poolName(response.Datacenter, response.WAN, response.Segment))
b.WriteString(poolName(response.Datacenter, response.WAN, response.Partition, response.Segment))
b.WriteString(formatMessages(response.Messages))
b.WriteString(formatKeys(keys, response.NumNodes))
return strings.TrimRight(b.String(), "\n")
}
func poolName(dc string, wan bool, segment string) string {
func poolName(dc string, wan bool, partition, segment string) string {
pool := fmt.Sprintf("%s (LAN)", dc)
if wan {
pool = "WAN"
}
var suffix string
if segment != "" {
segment = fmt.Sprintf(" [%s]", segment)
suffix = fmt.Sprintf(" [%s]", segment)
} else if !structs.IsDefaultPartition(partition) {
suffix = fmt.Sprintf(" [partition: %s]", partition)
}
return fmt.Sprintf("%s%s:\n", pool, segment)
return fmt.Sprintf("%s%s:\n", pool, suffix)
}
func formatMessages(messages map[string]string) string {

View File

@ -4,10 +4,11 @@ import (
"strings"
"testing"
"github.com/hashicorp/consul/agent"
consulapi "github.com/hashicorp/consul/api"
"github.com/mitchellh/cli"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent"
consulapi "github.com/hashicorp/consul/api"
)
func TestKeyringCommand_noTabs(t *testing.T) {
@ -195,9 +196,8 @@ func removeKey(t *testing.T, addr string, key string) {
}
func TestKeyringCommand_poolName(t *testing.T) {
require.Equal(t, "dc1 (LAN):\n", poolName("dc1", false, ""))
require.Equal(t, "dc1 (LAN) [segment1]:\n", poolName("dc1", false, "segment1"))
require.Equal(t, "WAN:\n", poolName("dc1", true, ""))
require.Equal(t, "dc1 (LAN):\n", poolName("dc1", false, "", ""))
require.Equal(t, "WAN:\n", poolName("dc1", true, "", ""))
}
func TestKeyringCommand_formatKeys(t *testing.T) {

View File

@ -1,6 +1,7 @@
package members
import (
"encoding/csv"
"fmt"
"math/rand"
"sort"
@ -175,6 +176,35 @@ func TestMembersCommand_verticalBar(t *testing.T) {
}
}
func decodeOutput(t *testing.T, data string) []map[string]string {
r := csv.NewReader(strings.NewReader(data))
r.Comma = ' '
r.TrimLeadingSpace = true
lines, err := r.ReadAll()
require.NoError(t, err)
if len(lines) < 2 {
return nil
}
var out []map[string]string
for i := 1; i < len(lines); i++ {
m := zip(t, lines[0], lines[i])
out = append(out, m)
}
return out
}
func zip(t *testing.T, k, v []string) map[string]string {
require.Equal(t, len(k), len(v))
m := make(map[string]string)
for i := 0; i < len(k); i++ {
m[k[i]] = v[i]
}
return m
}
func TestSortByMemberNamePartitionAndSegment(t *testing.T) {
lib.SeedMathRand()

5
go.mod
View File

@ -49,13 +49,12 @@ require (
github.com/hashicorp/golang-lru v0.5.4
github.com/hashicorp/hcl v1.0.0
github.com/hashicorp/hil v0.0.0-20200423225030-a18a1cd20038
github.com/hashicorp/mdns v1.0.4 // indirect
github.com/hashicorp/memberlist v0.2.4
github.com/hashicorp/memberlist v0.3.0
github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69
github.com/hashicorp/raft v1.3.2
github.com/hashicorp/raft-autopilot v0.1.5
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea
github.com/hashicorp/serf v0.9.6-0.20210609195804-2b5dd0cd2de9
github.com/hashicorp/serf v0.9.6
github.com/hashicorp/vault/api v1.0.5-0.20200717191844-f687267c8086
github.com/hashicorp/vault/sdk v0.1.14-0.20200519221838-e0cfd64bc267
github.com/hashicorp/yamux v0.0.0-20210826001029-26ff87cf9493

10
go.sum
View File

@ -278,9 +278,8 @@ github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO
github.com/hashicorp/mdns v1.0.1/go.mod h1:4gW7WsVCke5TE7EPeYliwHlRUyBtfCwuFwuMg2DmyNY=
github.com/hashicorp/mdns v1.0.4 h1:sY0CMhFmjIPDMlTB+HfymFHCaYLhgifZ0QhjaYKD/UQ=
github.com/hashicorp/mdns v1.0.4/go.mod h1:mtBihi+LeNXGtG8L9dX59gAEa12BDtBQSp4v/YAJqrc=
github.com/hashicorp/memberlist v0.2.2/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE=
github.com/hashicorp/memberlist v0.2.4 h1:OOhYzSvFnkFQXm1ysE8RjXTHsqSRDyP4emusC9K7DYg=
github.com/hashicorp/memberlist v0.2.4/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE=
github.com/hashicorp/memberlist v0.3.0 h1:8+567mCcFDnS5ADl7lrpxPMWiFCElyUEeW0gtj34fMA=
github.com/hashicorp/memberlist v0.3.0/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE=
github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69 h1:lc3c72qGlIMDqQpQH82Y4vaglRMMFdJbziYWriR4UcE=
github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69/go.mod h1:/z+jUGRBlwVpUZfjute9jWaF6/HuhjuFQuL1YXzVD1Q=
github.com/hashicorp/raft v1.1.1/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8=
@ -291,9 +290,8 @@ github.com/hashicorp/raft-autopilot v0.1.5 h1:onEfMH5uHVdXQqtas36zXUHEZxLdsJVu/n
github.com/hashicorp/raft-autopilot v0.1.5/go.mod h1:Af4jZBwaNOI+tXfIqIdbcAnh/UyyqIMj/pOISIfhArw=
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea h1:xykPFhrBAS2J0VBzVa5e80b5ZtYuNQtgXjN40qBZlD4=
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea/go.mod h1:pNv7Wc3ycL6F5oOWn+tPGo2gWD4a5X+yp/ntwdKLjRk=
github.com/hashicorp/serf v0.9.5/go.mod h1:UWDWwZeL5cuWDJdl0C6wrvrUwEqtQ4ZKBKKENpqIUyk=
github.com/hashicorp/serf v0.9.6-0.20210609195804-2b5dd0cd2de9 h1:lCZfMBDn/Puwg9VosHMf/9p9jNDYYkbzVjb4jYjVfqU=
github.com/hashicorp/serf v0.9.6-0.20210609195804-2b5dd0cd2de9/go.mod h1:qapjppkpNXHYTyzx+HqkyWGGkmUxafHjuspm/Bqb2Jc=
github.com/hashicorp/serf v0.9.6 h1:uuEX1kLR6aoda1TBttmJQKDLZE1Ob7KN0NPdE7EtCDc=
github.com/hashicorp/serf v0.9.6/go.mod h1:TXZNMjZQijwlDvp+r0b63xZ45H7JmCmgg4gpTwn9UV4=
github.com/hashicorp/vault/api v1.0.5-0.20200717191844-f687267c8086 h1:OKsyxKi2sNmqm1Gv93adf2AID2FOBFdCbbZn9fGtIdg=
github.com/hashicorp/vault/api v1.0.5-0.20200717191844-f687267c8086/go.mod h1:R3Umvhlxi2TN7Ex2hzOowyeNb+SfbVWI973N+ctaFMk=
github.com/hashicorp/vault/sdk v0.1.14-0.20200519221838-e0cfd64bc267 h1:e1ok06zGrWJW91rzRroyl5nRNqraaBe4d5hiKcVZuHM=