diff --git a/.changelog/_1202.txt b/.changelog/_1202.txt new file mode 100644 index 0000000000..8b6e0da52e --- /dev/null +++ b/.changelog/_1202.txt @@ -0,0 +1,3 @@ +```release-note:feature +partitions: **(Enterprise only)** segment serf LAN gossip between nodes in different partitions +``` diff --git a/agent/agent.go b/agent/agent.go index 9e9cf5c213..65a012685d 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -144,7 +144,20 @@ type delegate interface { // This is limited to segments and partitions that the node is a member of. LANMembers(f consul.LANMemberFilter) ([]serf.Member, error) - // GetLANCoordinate returns the coordinate of the node in the LAN gossip pool. + // GetLANCoordinate returns the coordinate of the node in the LAN gossip + // pool. + // + // - Clients return a single coordinate for the single gossip pool they are + // in (default, segment, or partition). + // + // - Servers return one coordinate for their canonical gossip pool (i.e. + // default partition/segment) and one per segment they are also ancillary + // members of. + // + // NOTE: servers do not emit coordinates for partitioned gossip pools they + // are ancillary members of. + // + // NOTE: This assumes coordinates are enabled, so check that before calling. GetLANCoordinate() (lib.CoordinateSet, error) // JoinLAN is used to have Consul join the inner-DC pool The target address @@ -1264,6 +1277,7 @@ func segmentConfig(config *config.RuntimeConfig) ([]consul.NetworkSegment, error var segments []consul.NetworkSegment for _, s := range config.Segments { + // TODO: use consul.CloneSerfLANConfig(config.SerfLANConfig) here? serfConf := consul.DefaultConfig().SerfLANConfig serfConf.MemberlistConfig.BindAddr = s.Bind.IP.String() @@ -1541,10 +1555,6 @@ func (a *Agent) RefreshPrimaryGatewayFallbackAddresses(addrs []string) error { // ForceLeave is used to remove a failed node from the cluster func (a *Agent) ForceLeave(node string, prune bool, entMeta *structs.EnterpriseMeta) (err error) { a.logger.Info("Force leaving node", "node", node) - // TODO(partitions): merge IsMember into the RemoveFailedNode call. - if ok := a.IsMember(node); !ok { - return fmt.Errorf("agent: No node found with name '%s'", node) - } err = a.delegate.RemoveFailedNode(node, prune, entMeta) if err != nil { a.logger.Warn("Failed to remove node", @@ -1585,18 +1595,6 @@ func (a *Agent) WANMembers() []serf.Member { return nil } -// IsMember is used to check if a node with the given nodeName -// is a member -func (a *Agent) IsMember(nodeName string) bool { - for _, m := range a.LANMembersInAgentPartition() { - if m.Name == nodeName { - return true - } - } - - return false -} - // StartSync is called once Services and Checks are registered. // This is called to prevent a race between clients and the anti-entropy routines func (a *Agent) StartSync() { diff --git a/agent/consul/client.go b/agent/consul/client.go index 031308e19e..ac64d704ab 100644 --- a/agent/consul/client.go +++ b/agent/consul/client.go @@ -193,7 +193,13 @@ func (c *Client) Leave() error { // JoinLAN is used to have Consul join the inner-DC pool The target address // should be another node inside the DC listening on the Serf LAN address func (c *Client) JoinLAN(addrs []string, entMeta *structs.EnterpriseMeta) (int, error) { - // TODO(partitions): assert that the partitions match + // Partitions definitely have to match. + if c.config.AgentEnterpriseMeta().PartitionOrDefault() != entMeta.PartitionOrDefault() { + return 0, fmt.Errorf("target partition %q must match client agent partition %q", + entMeta.PartitionOrDefault(), + c.config.AgentEnterpriseMeta().PartitionOrDefault(), + ) + } return c.serf.Join(addrs, true) } @@ -221,7 +227,10 @@ func (c *Client) LANMembers(filter LANMemberFilter) ([]serf.Member, error) { return nil, err } - // TODO(partitions): assert that the partitions match + // Partitions definitely have to match. + if c.config.AgentEnterpriseMeta().PartitionOrDefault() != filter.PartitionOrDefault() { + return nil, fmt.Errorf("partition %q not found", filter.PartitionOrDefault()) + } if !filter.AllSegments && filter.Segment != c.config.Segment { return nil, fmt.Errorf("segment %q not found", filter.Segment) @@ -232,7 +241,14 @@ func (c *Client) LANMembers(filter LANMemberFilter) ([]serf.Member, error) { // RemoveFailedNode is used to remove a failed node from the cluster. func (c *Client) RemoveFailedNode(node string, prune bool, entMeta *structs.EnterpriseMeta) error { - // TODO(partitions): assert that the partitions match + // Partitions definitely have to match. + if c.config.AgentEnterpriseMeta().PartitionOrDefault() != entMeta.PartitionOrDefault() { + return fmt.Errorf("client agent in partition %q cannot remove node in different partition %q", + c.config.AgentEnterpriseMeta().PartitionOrDefault(), entMeta.PartitionOrDefault()) + } + if !isSerfMember(c.serf, node) { + return fmt.Errorf("agent: No node found with name '%s'", node) + } if prune { return c.serf.RemoveFailedNodePrune(node) } @@ -371,9 +387,21 @@ func (c *Client) Stats() map[string]map[string]string { return stats } -// GetLANCoordinate returns the coordinate of the node in the LAN gossip pool. +// GetLANCoordinate returns the coordinate of the node in the LAN gossip +// pool. +// +// - Clients return a single coordinate for the single gossip pool they are +// in (default, segment, or partition). +// +// - Servers return one coordinate for their canonical gossip pool (i.e. +// default partition/segment) and one per segment they are also ancillary +// members of. +// +// NOTE: servers do not emit coordinates for partitioned gossip pools they +// are ancillary members of. +// +// NOTE: This assumes coordinates are enabled, so check that before calling. func (c *Client) GetLANCoordinate() (lib.CoordinateSet, error) { - // TODO(partitions): possibly something here lan, err := c.serf.GetCoordinate() if err != nil { return nil, err @@ -389,3 +417,11 @@ func (c *Client) ReloadConfig(config ReloadableConfig) error { c.rpcLimiter.Store(rate.NewLimiter(config.RPCRateLimit, config.RPCMaxBurst)) return nil } + +func (c *Client) AgentEnterpriseMeta() *structs.EnterpriseMeta { + return c.config.AgentEnterpriseMeta() +} + +func (c *Client) agentSegmentName() string { + return c.config.Segment +} diff --git a/agent/consul/client_serf.go b/agent/consul/client_serf.go index 3b632f6fd7..55df7a5471 100644 --- a/agent/consul/client_serf.go +++ b/agent/consul/client_serf.go @@ -49,11 +49,12 @@ func (c *Client) setupSerf(conf *serf.Config, ch chan serf.Event, path string) ( conf.ProtocolVersion = protocolVersionMap[c.config.ProtocolVersion] conf.RejoinAfterLeave = c.config.RejoinAfterLeave conf.Merge = &lanMergeDelegate{ - dc: c.config.Datacenter, - nodeID: c.config.NodeID, - nodeName: c.config.NodeName, - segment: c.config.Segment, - server: false, + dc: c.config.Datacenter, + nodeID: c.config.NodeID, + nodeName: c.config.NodeName, + segment: c.config.Segment, + server: false, + partition: c.config.AgentEnterpriseMeta().PartitionOrDefault(), } conf.SnapshotPath = filepath.Join(c.config.DataDir, path) @@ -65,6 +66,8 @@ func (c *Client) setupSerf(conf *serf.Config, ch chan serf.Event, path string) ( conf.ReconnectTimeoutOverride = libserf.NewReconnectOverride(c.logger) + enterpriseModifyClientSerfConfigLAN(c.config, conf) + return serf.Create(conf) } diff --git a/agent/consul/config_test.go b/agent/consul/config_test.go index 7748895348..c536684c0e 100644 --- a/agent/consul/config_test.go +++ b/agent/consul/config_test.go @@ -19,26 +19,28 @@ func TestCloneSerfLANConfig(t *testing.T) { "Alive", "AwarenessMaxMultiplier", "Conflict", - "DNSConfigPath", "Delegate", "DelegateProtocolMax", "DelegateProtocolMin", "DelegateProtocolVersion", "DisableTcpPings", "DisableTcpPingsForNode", + "DNSConfigPath", "EnableCompression", "Events", "GossipToTheDeadTime", "HandoffQueueDepth", "IndirectChecks", - "LogOutput", + "Label", "Logger", + "LogOutput", "Merge", "Name", "Ping", "ProtocolVersion", "PushPullInterval", "RequireNodeNames", + "SkipInboundLabelCheck", "SuspicionMaxTimeoutMult", "TCPTimeout", "Transport", diff --git a/agent/consul/coordinate_endpoint.go b/agent/consul/coordinate_endpoint.go index 42819479af..896d374bc5 100644 --- a/agent/consul/coordinate_endpoint.go +++ b/agent/consul/coordinate_endpoint.go @@ -24,8 +24,9 @@ type Coordinate struct { logger hclog.Logger // updates holds pending coordinate updates for the given nodes. This is - // keyed by node:segment so we can get a coordinate for each segment for - // servers, and we only track the latest update per node:segment. + // keyed by partition/node:segment so we can get a coordinate for each + // segment for servers, and we only track the latest update per + // partition/node:segment. updates map[string]*structs.CoordinateUpdateRequest // updatesLock synchronizes access to the updates map. @@ -132,7 +133,7 @@ func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct // Since this is a coordinate coming from some place else we harden this // and look for dimensionality problems proactively. - coord, err := c.srv.serfLAN.GetCoordinate() + coord, err := c.srv.GetMatchingLANCoordinate(args.PartitionOrDefault(), args.Segment) if err != nil { return err } @@ -157,7 +158,7 @@ func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct } // Add the coordinate to the map of pending updates. - key := fmt.Sprintf("%s:%s", args.Node, args.Segment) + key := fmt.Sprintf("%s/%s:%s", args.PartitionOrDefault(), args.Node, args.Segment) c.updatesLock.Lock() c.updates[key] = args c.updatesLock.Unlock() @@ -174,8 +175,6 @@ func (c *Coordinate) ListDatacenters(args *struct{}, reply *[]structs.Datacenter return err } - // TODO(partitions): should we filter any of this out? - var out []structs.DatacenterMap // Strip the datacenter suffixes from all the node names. @@ -253,8 +252,6 @@ func (c *Coordinate) Node(args *structs.NodeSpecificRequest, reply *structs.Inde return acl.ErrPermissionDenied } - // TODO(partitions): do we have to add EnterpriseMeta to the reply like in Catalog.ListServices? - return c.srv.blockingQuery(&args.QueryOptions, &reply.QueryMeta, func(ws memdb.WatchSet, state *state.Store) error { diff --git a/agent/consul/enterprise_client_oss.go b/agent/consul/enterprise_client_oss.go index bedbe87347..3dcf34732a 100644 --- a/agent/consul/enterprise_client_oss.go +++ b/agent/consul/enterprise_client_oss.go @@ -1,3 +1,4 @@ +//go:build !consulent // +build !consulent package consul @@ -12,6 +13,10 @@ func (c *Client) initEnterprise(_ Deps) error { return nil } +func enterpriseModifyClientSerfConfigLAN(_ *Config, _ *serf.Config) { + // nothing +} + func (c *Client) startEnterprise() error { return nil } @@ -20,10 +25,6 @@ func (c *Client) handleEnterpriseUserEvents(event serf.UserEvent) bool { return false } -func (_ *Client) addEnterpriseSerfTags(_ map[string]string) { - // do nothing -} - func (c *Client) enterpriseStats() map[string]map[string]string { return nil } diff --git a/agent/consul/enterprise_server_oss.go b/agent/consul/enterprise_server_oss.go index f729fd810e..85c1b26f4e 100644 --- a/agent/consul/enterprise_server_oss.go +++ b/agent/consul/enterprise_server_oss.go @@ -84,6 +84,31 @@ func (s *Server) validateEnterpriseIntentionNamespace(ns string, _ bool) error { return errors.New("Namespaces is a Consul Enterprise feature") } +// setupSerfLAN is used to setup and initialize a Serf for the LAN +func (s *Server) setupSerfLAN(config *Config) error { + var err error + // Initialize the LAN Serf for the default network segment. + s.serfLAN, err = s.setupSerf(setupSerfOptions{ + Config: config.SerfLANConfig, + EventCh: s.eventChLAN, + SnapshotPath: serfLANSnapshot, + Listener: s.Listener, + WAN: false, + Segment: "", + Partition: "", + }) + if err != nil { + return err + } + return nil +} + +func (s *Server) shutdownSerfLAN() { + if s.serfLAN != nil { + s.serfLAN.Shutdown() + } +} + func addEnterpriseSerfTags(_ map[string]string, _ *structs.EnterpriseMeta) { // do nothing } diff --git a/agent/consul/helper_test.go b/agent/consul/helper_test.go index 11309b1f79..c971335804 100644 --- a/agent/consul/helper_test.go +++ b/agent/consul/helper_test.go @@ -87,11 +87,19 @@ func wantRaft(servers []*Server) error { // joinAddrLAN returns the address other servers can // use to join the cluster on the LAN interface. -func joinAddrLAN(s *Server) string { +func joinAddrLAN(s clientOrServer) string { if s == nil { - panic("no server") + panic("no client or server") + } + var port int + switch x := s.(type) { + case *Server: + port = x.config.SerfLANConfig.MemberlistConfig.BindPort + case *Client: + port = x.config.SerfLANConfig.MemberlistConfig.BindPort + default: + panic(fmt.Sprintf("unhandled type %T", s)) } - port := s.config.SerfLANConfig.MemberlistConfig.BindPort return fmt.Sprintf("127.0.0.1:%d", port) } @@ -110,6 +118,8 @@ func joinAddrWAN(s *Server) string { type clientOrServer interface { JoinLAN(addrs []string, entMeta *structs.EnterpriseMeta) (int, error) LANMembersInAgentPartition() []serf.Member + AgentEnterpriseMeta() *structs.EnterpriseMeta + agentSegmentName() string } // joinLAN is a convenience function for @@ -117,27 +127,54 @@ type clientOrServer interface { // member.JoinLAN("127.0.0.1:"+leader.config.SerfLANConfig.MemberlistConfig.BindPort) func joinLAN(t *testing.T, member clientOrServer, leader *Server) { t.Helper() + joinLANWithOptions(t, member, leader, true) +} +func joinLANWithNoMembershipChecks(t *testing.T, member clientOrServer, leader *Server) { + t.Helper() + joinLANWithOptions(t, member, leader, false) +} +func joinLANWithOptions(t *testing.T, member clientOrServer, leader *Server, doMembershipChecks bool) { + t.Helper() if member == nil || leader == nil { panic("no server") } - var memberAddr string - switch x := member.(type) { - case *Server: - memberAddr = joinAddrLAN(x) - case *Client: - memberAddr = fmt.Sprintf("127.0.0.1:%d", x.config.SerfLANConfig.MemberlistConfig.BindPort) - } + memberAddr := joinAddrLAN(member) + + var ( + memberEntMeta = member.AgentEnterpriseMeta() + memberPartition = memberEntMeta.PartitionOrDefault() + memberSegment = member.agentSegmentName() + ) + leaderAddr := joinAddrLAN(leader) - if _, err := member.JoinLAN([]string{leaderAddr}, nil); err != nil { + if memberSegment != "" { + leaderAddr = leader.LANSegmentAddr(memberSegment) + } + if _, err := member.JoinLAN([]string{leaderAddr}, memberEntMeta); err != nil { t.Fatal(err) } + + if !doMembershipChecks { + return + } + + f := LANMemberFilter{ + Partition: memberPartition, + Segment: memberSegment, + } retry.Run(t, func(r *retry.R) { - if !seeEachOther(leader.LANMembersInAgentPartition(), member.LANMembersInAgentPartition(), leaderAddr, memberAddr) { + leaderView, err := leader.LANMembers(f) + require.NoError(r, err) + + if !seeEachOther(leaderView, member.LANMembersInAgentPartition(), leaderAddr, memberAddr) { r.Fatalf("leader and member cannot see each other on LAN") } }) - if !seeEachOther(leader.LANMembersInAgentPartition(), member.LANMembersInAgentPartition(), leaderAddr, memberAddr) { + + leaderView, err := leader.LANMembers(f) + require.NoError(t, err) + if !seeEachOther(leaderView, member.LANMembersInAgentPartition(), leaderAddr, memberAddr) { t.Fatalf("leader and member cannot see each other on LAN") } } @@ -147,6 +184,14 @@ func joinLAN(t *testing.T, member clientOrServer, leader *Server) { // member.JoinWAN("127.0.0.1:"+leader.config.SerfWANConfig.MemberlistConfig.BindPort) func joinWAN(t *testing.T, member, leader *Server) { t.Helper() + joinWANWithOptions(t, member, leader, true) +} +func joinWANWithNoMembershipChecks(t *testing.T, member, leader *Server) { + t.Helper() + joinWANWithOptions(t, member, leader, false) +} +func joinWANWithOptions(t *testing.T, member, leader *Server, doMembershipChecks bool) { + t.Helper() if member == nil || leader == nil { panic("no server") @@ -155,6 +200,11 @@ func joinWAN(t *testing.T, member, leader *Server) { if _, err := member.JoinWAN([]string{leaderAddr}); err != nil { t.Fatal(err) } + + if !doMembershipChecks { + return + } + retry.Run(t, func(r *retry.R) { if !seeEachOther(leader.WANMembers(), member.WANMembers(), leaderAddr, memberAddr) { r.Fatalf("leader and member cannot see each other on WAN") diff --git a/agent/consul/internal_endpoint.go b/agent/consul/internal_endpoint.go index 03b8a55205..7eb725a531 100644 --- a/agent/consul/internal_endpoint.go +++ b/agent/consul/internal_endpoint.go @@ -6,7 +6,6 @@ import ( bexpr "github.com/hashicorp/go-bexpr" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" - "github.com/hashicorp/go-multierror" "github.com/hashicorp/serf/serf" "github.com/hashicorp/consul/acl" @@ -418,16 +417,7 @@ func (m *Internal) EventFire(args *structs.EventFireRequest, eventName := userEventName(args.Name) // Fire the event on all LAN segments - segments := m.srv.LANSegments() - var errs error - for name, segment := range segments { - err := segment.UserEvent(eventName, args.Payload, false) - if err != nil { - err = fmt.Errorf("error broadcasting event to segment %q: %v", name, err) - errs = multierror.Append(errs, err) - } - } - return errs + return m.srv.LANSendUserEvent(eventName, args.Payload, false) } // KeyringOperation will query the WAN and LAN gossip keyrings of all nodes. @@ -492,14 +482,18 @@ func (m *Internal) KeyringOperation( func (m *Internal) executeKeyringOpLAN(args *structs.KeyringRequest) []*structs.KeyringResponse { responses := []*structs.KeyringResponse{} - segments := m.srv.LANSegments() - for name, segment := range segments { - mgr := segment.KeyManager() + _ = m.srv.DoWithLANSerfs(func(poolName, poolKind string, pool *serf.Serf) error { + mgr := pool.KeyManager() serfResp, err := m.executeKeyringOpMgr(mgr, args) resp := translateKeyResponseToKeyringResponse(serfResp, m.srv.config.Datacenter, err) - resp.Segment = name + if poolKind == PoolKindSegment { + resp.Segment = poolName + } else { + resp.Partition = poolName + } responses = append(responses, &resp) - } + return nil + }, nil) return responses } diff --git a/agent/consul/leader.go b/agent/consul/leader.go index c78ab4ed88..674bc5a40e 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -134,13 +134,8 @@ func (s *Server) leaderLoop(stopCh chan struct{}) { // Fire a user event indicating a new leader payload := []byte(s.config.NodeName) - for name, segment := range s.LANSegments() { - if err := segment.UserEvent(newLeaderEvent, payload, false); err != nil { - s.logger.Warn("failed to broadcast new leader event on segment", - "segment", name, - "error", err, - ) - } + if err := s.LANSendUserEvent(newLeaderEvent, payload, false); err != nil { + s.logger.Warn("failed to broadcast new leader event", "error", err) } // Reconcile channel is only used once initial reconcile diff --git a/agent/consul/leader_oss_test.go b/agent/consul/leader_oss_test.go new file mode 100644 index 0000000000..6aab96f89a --- /dev/null +++ b/agent/consul/leader_oss_test.go @@ -0,0 +1,14 @@ +//go:build !consulent +// +build !consulent + +package consul + +import libserf "github.com/hashicorp/consul/lib/serf" + +func updateSerfTags(s *Server, key, value string) { + libserf.UpdateTag(s.serfLAN, key, value) + + if s.serfWAN != nil { + libserf.UpdateTag(s.serfWAN, key, value) + } +} diff --git a/agent/consul/leader_test.go b/agent/consul/leader_test.go index 635999b33c..0f80de4e2b 100644 --- a/agent/consul/leader_test.go +++ b/agent/consul/leader_test.go @@ -17,7 +17,6 @@ import ( "github.com/hashicorp/consul/agent/structs" tokenStore "github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/api" - libserf "github.com/hashicorp/consul/lib/serf" "github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" @@ -1770,14 +1769,6 @@ func TestDatacenterSupportsFederationStates(t *testing.T) { }) } -func updateSerfTags(s *Server, key, value string) { - libserf.UpdateTag(s.serfLAN, key, value) - - if s.serfWAN != nil { - libserf.UpdateTag(s.serfWAN, key, value) - } -} - func TestDatacenterSupportsIntentionsAsConfigEntries(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") diff --git a/agent/consul/merge.go b/agent/consul/merge.go index 16bc55ddc8..04a41f0f53 100644 --- a/agent/consul/merge.go +++ b/agent/consul/merge.go @@ -14,16 +14,10 @@ import ( // ring. We check that the peers are in the same datacenter and abort the // merge if there is a mis-match. type lanMergeDelegate struct { - dc string - nodeID types.NodeID - nodeName string - segment string - - // TODO(partitions): use server and partition to reject gossip messages - // from nodes in the wrong partition depending upon the role the node is - // playing. For example servers will always be in the default partition, - // but all clients in all partitions should be aware of the servers so that - // general RPC routing works. + dc string + nodeID types.NodeID + nodeName string + segment string server bool partition string } @@ -81,9 +75,8 @@ func (md *lanMergeDelegate) NotifyMerge(members []*serf.Member) error { } } - if segment := m.Tags["segment"]; segment != md.segment { - return fmt.Errorf("Member '%s' part of wrong segment '%s' (expected '%s')", - m.Name, segment, md.segment) + if err := md.enterpriseNotifyMergeMember(m); err != nil { + return err } } return nil diff --git a/agent/consul/merge_oss.go b/agent/consul/merge_oss.go new file mode 100644 index 0000000000..515bbbcd1a --- /dev/null +++ b/agent/consul/merge_oss.go @@ -0,0 +1,22 @@ +//go:build !consulent +// +build !consulent + +package consul + +import ( + "fmt" + + "github.com/hashicorp/serf/serf" +) + +func (md *lanMergeDelegate) enterpriseNotifyMergeMember(m *serf.Member) error { + if memberPartition := m.Tags["ap"]; memberPartition != "" { + return fmt.Errorf("Member '%s' part of partition '%s'; Partitions are a Consul Enterprise feature", + m.Name, memberPartition) + } + if segment := m.Tags["segment"]; segment != "" { + return fmt.Errorf("Member '%s' part of segment '%s'; Network Segments are a Consul Enterprise feature", + m.Name, segment) + } + return nil +} diff --git a/agent/consul/merge_oss_test.go b/agent/consul/merge_oss_test.go new file mode 100644 index 0000000000..99333c7dd8 --- /dev/null +++ b/agent/consul/merge_oss_test.go @@ -0,0 +1,76 @@ +//go:build !consulent +// +build !consulent + +package consul + +import ( + "testing" + + "github.com/hashicorp/serf/serf" + "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/sdk/testutil" + "github.com/hashicorp/consul/types" +) + +func TestMerge_OSS_LAN(t *testing.T) { + type testcase struct { + segment string + server bool + partition string + members []*serf.Member + expect string + } + + const thisNodeID = "ee954a2f-80de-4b34-8780-97b942a50a99" + + run := func(t *testing.T, tc testcase) { + delegate := &lanMergeDelegate{ + dc: "dc1", + nodeID: types.NodeID(thisNodeID), + nodeName: "node0", + segment: tc.segment, + server: tc.server, + partition: tc.partition, + } + + err := delegate.NotifyMerge(tc.members) + + if tc.expect == "" { + require.NoError(t, err) + } else { + testutil.RequireErrorContains(t, err, tc.expect) + } + } + + cases := map[string]testcase{ + "node in a segment": { + members: []*serf.Member{ + makeTestNode(t, testMember{ + dc: "dc1", + name: "node1", + build: "0.7.5", + segment: "alpha", + }), + }, + expect: `Member 'node1' part of segment 'alpha'; Network Segments are a Consul Enterprise feature`, + }, + "node in a partition": { + members: []*serf.Member{ + makeTestNode(t, testMember{ + dc: "dc1", + name: "node1", + build: "0.7.5", + partition: "part1", + }), + }, + expect: `Member 'node1' part of partition 'part1'; Partitions are a Consul Enterprise feature`, + }, + } + + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + run(t, tc) + }) + } +} diff --git a/agent/consul/merge_test.go b/agent/consul/merge_test.go index 91e86a1243..7219edcab1 100644 --- a/agent/consul/merge_test.go +++ b/agent/consul/merge_test.go @@ -1,190 +1,232 @@ package consul import ( - "strings" "testing" - "github.com/hashicorp/consul/types" + uuid "github.com/hashicorp/go-uuid" "github.com/hashicorp/serf/serf" + "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/sdk/testutil" + "github.com/hashicorp/consul/types" ) -func makeNode(dc, name, id string, server bool, build string) *serf.Member { - var role string - if server { - role = "consul" - } else { - role = "node" +func TestMerge_LAN(t *testing.T) { + type testcase struct { + members []*serf.Member + expect string } - return &serf.Member{ - Name: name, + const thisNodeID = "ee954a2f-80de-4b34-8780-97b942a50a99" + + run := func(t *testing.T, tc testcase) { + delegate := &lanMergeDelegate{ + dc: "dc1", + nodeID: types.NodeID(thisNodeID), + nodeName: "node0", + } + + err := delegate.NotifyMerge(tc.members) + + if tc.expect == "" { + require.NoError(t, err) + } else { + testutil.RequireErrorContains(t, err, tc.expect) + } + } + + cases := map[string]testcase{ + "client in the wrong datacenter": { + members: []*serf.Member{ + makeTestNode(t, testMember{ + dc: "dc2", + name: "node1", + server: false, + build: "0.7.5", + }), + }, + expect: "wrong datacenter", + }, + "server in the wrong datacenter": { + members: []*serf.Member{ + makeTestNode(t, testMember{ + dc: "dc2", + name: "node1", + server: true, + build: "0.7.5", + }), + }, + expect: "wrong datacenter", + }, + "node ID conflict with delegate's ID": { + members: []*serf.Member{ + makeTestNode(t, testMember{ + dc: "dc1", + name: "node1", + id: thisNodeID, + server: true, + build: "0.7.5", + }), + }, + expect: "with this agent's ID", + }, + "cluster with existing conflicting node IDs": { + members: []*serf.Member{ + makeTestNode(t, testMember{ + dc: "dc1", + name: "node1", + id: "6185913b-98d7-4441-bd8f-f7f7d854a4af", + server: true, + build: "0.8.5", + }), + makeTestNode(t, testMember{ + dc: "dc1", + name: "node2", + id: "6185913b-98d7-4441-bd8f-f7f7d854a4af", + server: true, + build: "0.9.0", + }), + }, + expect: "with member", + }, + "cluster with existing conflicting node IDs, but version is old enough to skip the check": { + members: []*serf.Member{ + makeTestNode(t, testMember{ + dc: "dc1", + name: "node1", + id: "6185913b-98d7-4441-bd8f-f7f7d854a4af", + server: true, + build: "0.8.5", + }), + makeTestNode(t, testMember{ + dc: "dc1", + name: "node2", + id: "6185913b-98d7-4441-bd8f-f7f7d854a4af", + server: true, + build: "0.8.4", + }), + }, + expect: "with member", + }, + "good cluster": { + members: []*serf.Member{ + makeTestNode(t, testMember{ + dc: "dc1", + name: "node1", + server: true, + build: "0.8.5", + }), + makeTestNode(t, testMember{ + dc: "dc1", + name: "node2", + server: true, + build: "0.8.5", + }), + }, + expect: "", + }, + } + + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + run(t, tc) + }) + } +} + +func TestMerge_WAN(t *testing.T) { + type testcase struct { + members []*serf.Member + expect string + } + + run := func(t *testing.T, tc testcase) { + delegate := &wanMergeDelegate{} + err := delegate.NotifyMerge(tc.members) + if tc.expect == "" { + require.NoError(t, err) + } else { + testutil.RequireErrorContains(t, err, tc.expect) + } + } + + cases := map[string]testcase{ + "not a server": { + members: []*serf.Member{ + makeTestNode(t, testMember{ + dc: "dc2", + name: "node1", + server: false, + build: "0.7.5", + }), + }, + expect: "not a server", + }, + "good cluster": { + members: []*serf.Member{ + makeTestNode(t, testMember{ + dc: "dc2", + name: "node1", + server: true, + build: "0.7.5", + }), + makeTestNode(t, testMember{ + dc: "dc3", + name: "node2", + server: true, + build: "0.7.5", + }), + }, + expect: "", + }, + } + + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + run(t, tc) + }) + } +} + +type testMember struct { + dc string + name string + id string + server bool + build string + segment string + partition string +} + +func (tm testMember) role() string { + if tm.server { + return "consul" + } + return "node" +} + +func makeTestNode(t *testing.T, tm testMember) *serf.Member { + if tm.id == "" { + uuid, err := uuid.GenerateUUID() + require.NoError(t, err) + tm.id = uuid + } + m := &serf.Member{ + Name: tm.name, Tags: map[string]string{ - "role": role, - "dc": dc, - "id": id, + "role": tm.role(), + "dc": tm.dc, + "id": tm.id, "port": "8300", - "build": build, + "segment": tm.segment, + "build": tm.build, "vsn": "2", "vsn_max": "3", "vsn_min": "2", }, } -} - -func TestMerge_LAN(t *testing.T) { - t.Parallel() - cases := []struct { - members []*serf.Member - expect string - }{ - // Client in the wrong datacenter. - { - members: []*serf.Member{ - makeNode("dc2", - "node1", - "96430788-246f-4379-94ce-257f7429e340", - false, - "0.7.5"), - }, - expect: "wrong datacenter", - }, - // Server in the wrong datacenter. - { - members: []*serf.Member{ - makeNode("dc2", - "node1", - "96430788-246f-4379-94ce-257f7429e340", - true, - "0.7.5"), - }, - expect: "wrong datacenter", - }, - // Node ID conflict with delegate's ID. - { - members: []*serf.Member{ - makeNode("dc1", - "node1", - "ee954a2f-80de-4b34-8780-97b942a50a99", - true, - "0.7.5"), - }, - expect: "with this agent's ID", - }, - // Cluster with existing conflicting node IDs. - { - members: []*serf.Member{ - makeNode("dc1", - "node1", - "6185913b-98d7-4441-bd8f-f7f7d854a4af", - true, - "0.8.5"), - makeNode("dc1", - "node2", - "6185913b-98d7-4441-bd8f-f7f7d854a4af", - true, - "0.9.0"), - }, - expect: "with member", - }, - // Cluster with existing conflicting node IDs, but version is - // old enough to skip the check. - { - members: []*serf.Member{ - makeNode("dc1", - "node1", - "6185913b-98d7-4441-bd8f-f7f7d854a4af", - true, - "0.8.5"), - makeNode("dc1", - "node2", - "6185913b-98d7-4441-bd8f-f7f7d854a4af", - true, - "0.8.4"), - }, - expect: "with member", - }, - // Good cluster. - { - members: []*serf.Member{ - makeNode("dc1", - "node1", - "6185913b-98d7-4441-bd8f-f7f7d854a4af", - true, - "0.8.5"), - makeNode("dc1", - "node2", - "cda916bc-a357-4a19-b886-59419fcee50c", - true, - "0.8.5"), - }, - expect: "", - }, - } - - delegate := &lanMergeDelegate{ - dc: "dc1", - nodeID: types.NodeID("ee954a2f-80de-4b34-8780-97b942a50a99"), - nodeName: "node0", - segment: "", - } - for i, c := range cases { - if err := delegate.NotifyMerge(c.members); c.expect == "" { - if err != nil { - t.Fatalf("case %d: err: %v", i+1, err) - } - } else { - if err == nil || !strings.Contains(err.Error(), c.expect) { - t.Fatalf("case %d: err: %v", i+1, err) - } - } - } -} - -func TestMerge_WAN(t *testing.T) { - t.Parallel() - cases := []struct { - members []*serf.Member - expect string - }{ - // Not a server - { - members: []*serf.Member{ - makeNode("dc2", - "node1", - "96430788-246f-4379-94ce-257f7429e340", - false, - "0.7.5"), - }, - expect: "not a server", - }, - // Good cluster. - { - members: []*serf.Member{ - makeNode("dc2", - "node1", - "6185913b-98d7-4441-bd8f-f7f7d854a4af", - true, - "0.7.5"), - makeNode("dc3", - "node2", - "cda916bc-a357-4a19-b886-59419fcee50c", - true, - "0.7.5"), - }, - expect: "", - }, - } - - delegate := &wanMergeDelegate{} - for i, c := range cases { - if err := delegate.NotifyMerge(c.members); c.expect == "" { - if err != nil { - t.Fatalf("case %d: err: %v", i+1, err) - } - } else { - if err == nil || !strings.Contains(err.Error(), c.expect) { - t.Fatalf("case %d: err: %v", i+1, err) - } - } - } + if tm.partition != "" { + m.Tags["ap"] = tm.partition + } + return m } diff --git a/agent/consul/segment_oss.go b/agent/consul/segment_oss.go index 1398087a41..034e79c54e 100644 --- a/agent/consul/segment_oss.go +++ b/agent/consul/segment_oss.go @@ -1,14 +1,12 @@ +//go:build !consulent // +build !consulent package consul import ( "net" - "time" - "github.com/armon/go-metrics" "github.com/armon/go-metrics/prometheus" - "github.com/hashicorp/serf/serf" "github.com/hashicorp/consul/agent/structs" ) @@ -37,7 +35,7 @@ func (s *Server) setupSegmentRPC() (map[string]net.Listener, error) { // setupSegments returns an error if any segments are defined since the OSS // version of Consul doesn't support them. -func (s *Server) setupSegments(config *Config, port int, rpcListeners map[string]net.Listener) error { +func (s *Server) setupSegments(config *Config, rpcListeners map[string]net.Listener) error { if len(config.Segments) > 0 { return structs.ErrSegmentsNotSupported } @@ -48,28 +46,3 @@ func (s *Server) setupSegments(config *Config, port int, rpcListeners map[string // floodSegments is a NOP in the OSS version of Consul. func (s *Server) floodSegments(config *Config) { } - -func getSerfMemberEnterpriseMeta(member serf.Member) *structs.EnterpriseMeta { - return structs.NodeEnterpriseMetaInDefaultPartition() -} - -// reconcile is used to reconcile the differences between Serf membership and -// what is reflected in our strongly consistent store. Mainly we need to ensure -// all live nodes are registered, all failed nodes are marked as such, and all -// left nodes are deregistered. -func (s *Server) reconcile() (err error) { - defer metrics.MeasureSince([]string{"leader", "reconcile"}, time.Now()) - - members := s.serfLAN.Members() - knownMembers := make(map[string]struct{}) - for _, member := range members { - if err := s.reconcileMember(member); err != nil { - return err - } - knownMembers[member.Name] = struct{}{} - } - - // Reconcile any members that have been reaped while we were not the - // leader. - return s.reconcileReaped(knownMembers, nil) -} diff --git a/agent/consul/server.go b/agent/consul/server.go index 524a0f1eae..1d4bedb6b5 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -121,6 +121,11 @@ var ( ErrWANFederationDisabled = fmt.Errorf("WAN Federation is disabled") ) +const ( + PoolKindPartition = "partition" + PoolKindSegment = "segment" +) + // Server is Consul server which manages the service discovery, // health checking, DC forwarding, Raft, and multiple Serf pools. type Server struct { @@ -248,11 +253,15 @@ type Server struct { // serfLAN is the Serf cluster maintained inside the DC // which contains all the DC nodes + // + // - If Network Segments are active, this only contains members in the + // default segment. + // + // - If Admin Partitions are active, this only contains members in the + // default partition. + // serfLAN *serf.Serf - // segmentLAN maps segment names to their Serf cluster - segmentLAN map[string]*serf.Serf - // serfWAN is the Serf cluster maintained between DC's // which SHOULD only consist of Consul servers serfWAN *serf.Serf @@ -362,7 +371,6 @@ func NewServer(config *Config, flat Deps) (*Server, error) { insecureRPCServer: rpc.NewServer(), tlsConfigurator: flat.TLSConfigurator, reassertLeaderCh: make(chan chan error), - segmentLAN: make(map[string]*serf.Serf, len(config.Segments)), sessionTimers: NewSessionTimers(), tombstoneGC: gc, serverLookup: NewServerLookup(), @@ -483,10 +491,14 @@ func NewServer(config *Config, flat Deps) (*Server, error) { // a little gross to be reading the updated config. // Initialize the WAN Serf if enabled - serfBindPortWAN := -1 if config.SerfWANConfig != nil { - serfBindPortWAN = config.SerfWANConfig.MemberlistConfig.BindPort - s.serfWAN, err = s.setupSerf(config.SerfWANConfig, s.eventChWAN, serfWANSnapshot, true, serfBindPortWAN, "", s.Listener) + s.serfWAN, err = s.setupSerf(setupSerfOptions{ + Config: config.SerfWANConfig, + EventCh: s.eventChWAN, + SnapshotPath: serfWANSnapshot, + WAN: true, + Listener: s.Listener, + }) if err != nil { s.Shutdown() return nil, fmt.Errorf("Failed to start WAN Serf: %v", err) @@ -497,6 +509,7 @@ func NewServer(config *Config, flat Deps) (*Server, error) { s.memberlistTransportWAN = config.SerfWANConfig.MemberlistConfig.Transport.(wanfed.IngestionAwareTransport) // See big comment above why we are doing this. + serfBindPortWAN := config.SerfWANConfig.MemberlistConfig.BindPort if serfBindPortWAN == 0 { serfBindPortWAN = config.SerfWANConfig.MemberlistConfig.BindPort if serfBindPortWAN == 0 { @@ -508,14 +521,13 @@ func NewServer(config *Config, flat Deps) (*Server, error) { // Initialize the LAN segments before the default LAN Serf so we have // updated port information to publish there. - if err := s.setupSegments(config, serfBindPortWAN, segmentListeners); err != nil { + if err := s.setupSegments(config, segmentListeners); err != nil { s.Shutdown() return nil, fmt.Errorf("Failed to setup network segments: %v", err) } // Initialize the LAN Serf for the default network segment. - s.serfLAN, err = s.setupSerf(config.SerfLANConfig, s.eventChLAN, serfLANSnapshot, false, serfBindPortWAN, "", s.Listener) - if err != nil { + if err := s.setupSerfLAN(config); err != nil { s.Shutdown() return nil, fmt.Errorf("Failed to start LAN Serf: %v", err) } @@ -926,13 +938,7 @@ func (s *Server) Shutdown() error { s.leaderRoutineManager.StopAll() } - if s.serfLAN != nil { - s.serfLAN.Shutdown() - } - - for _, segment := range s.segmentLAN { - segment.Shutdown() - } + s.shutdownSerfLAN() if s.serfWAN != nil { s.serfWAN.Shutdown() @@ -942,6 +948,8 @@ func (s *Server) Shutdown() error { } s.router.Shutdown() + // TODO: actually shutdown areas? + if s.raft != nil { s.raftTransport.Close() s.raftLayer.Close() @@ -1100,13 +1108,6 @@ func (s *Server) Leave() error { return nil } -// JoinLAN is used to have Consul join the inner-DC pool The target address -// should be another node inside the DC listening on the Serf LAN address -func (s *Server) JoinLAN(addrs []string, entMeta *structs.EnterpriseMeta) (int, error) { - // TODO(partitions): handle the different partitions - return s.serfLAN.Join(addrs, true) -} - // JoinWAN is used to have Consul join the cross-WAN Consul ring // The target address should be another node listening on the // Serf WAN address @@ -1157,7 +1158,9 @@ func (s *Server) AgentLocalMember() serf.Member { return s.serfLAN.LocalMember() } -// LANMembersInAgentPartition is used to return the members of the LAN cluster +// LANMembersInAgentPartition returns the LAN members for this agent's +// canonical serf pool. For clients this is the only pool that exists. For +// servers it's the pool in the default segment and the default partition. func (s *Server) LANMembersInAgentPartition() []serf.Member { return s.serfLAN.Members() } @@ -1172,7 +1175,6 @@ func (s *Server) WANMembers() []serf.Member { // RemoveFailedNode is used to remove a failed node from the cluster. func (s *Server) RemoveFailedNode(node string, prune bool, entMeta *structs.EnterpriseMeta) error { - // TODO(partitions): handle the different partitions var removeFn func(*serf.Serf, string) error if prune { removeFn = (*serf.Serf).RemoveFailedNodePrune @@ -1180,10 +1182,6 @@ func (s *Server) RemoveFailedNode(node string, prune bool, entMeta *structs.Ente removeFn = (*serf.Serf).RemoveFailedNode } - if err := removeFn(s.serfLAN, node); err != nil { - return err - } - wanNode := node // The Serf WAN pool stores members as node.datacenter @@ -1191,13 +1189,8 @@ func (s *Server) RemoveFailedNode(node string, prune bool, entMeta *structs.Ente if !strings.HasSuffix(node, "."+s.config.Datacenter) { wanNode = node + "." + s.config.Datacenter } - if s.serfWAN != nil { - if err := removeFn(s.serfWAN, wanNode); err != nil { - return err - } - } - return s.removeFailedNodeEnterprise(removeFn, node, wanNode) + return s.removeFailedNode(removeFn, node, wanNode, entMeta) } // IsLeader checks if this server is the cluster leader @@ -1213,6 +1206,7 @@ func (s *Server) LeaderLastContact() time.Time { // KeyManagerLAN returns the LAN Serf keyring manager func (s *Server) KeyManagerLAN() *serf.KeyManager { + // NOTE: The serfLAN keymanager is shared by all partitions. return s.serfLAN.KeyManager() } @@ -1221,15 +1215,8 @@ func (s *Server) KeyManagerWAN() *serf.KeyManager { return s.serfWAN.KeyManager() } -// LANSegments returns a map of LAN segments by name -func (s *Server) LANSegments() map[string]*serf.Serf { - segments := make(map[string]*serf.Serf, len(s.segmentLAN)+1) - segments[""] = s.serfLAN - for name, segment := range s.segmentLAN { - segments[name] = segment - } - - return segments +func (s *Server) AgentEnterpriseMeta() *structs.EnterpriseMeta { + return s.config.AgentEnterpriseMeta() } // inmemCodec is used to do an RPC call without going over a network @@ -1379,10 +1366,25 @@ func (s *Server) Stats() map[string]map[string]string { stats["serf_wan"] = s.serfWAN.Stats() } + s.addEnterpriseStats(stats) + return stats } -// GetLANCoordinate returns the coordinate of the server in the LAN gossip pool. +// GetLANCoordinate returns the coordinate of the node in the LAN gossip +// pool. +// +// - Clients return a single coordinate for the single gossip pool they are +// in (default, segment, or partition). +// +// - Servers return one coordinate for their canonical gossip pool (i.e. +// default partition/segment) and one per segment they are also ancillary +// members of. +// +// NOTE: servers do not emit coordinates for partitioned gossip pools they +// are ancillary members of. +// +// NOTE: This assumes coordinates are enabled, so check that before calling. func (s *Server) GetLANCoordinate() (lib.CoordinateSet, error) { lan, err := s.serfLAN.GetCoordinate() if err != nil { @@ -1390,16 +1392,17 @@ func (s *Server) GetLANCoordinate() (lib.CoordinateSet, error) { } cs := lib.CoordinateSet{"": lan} - for name, segment := range s.segmentLAN { - c, err := segment.GetCoordinate() - if err != nil { - return nil, err - } - cs[name] = c + if err := s.addEnterpriseLANCoordinates(cs); err != nil { + return nil, err } + return cs, nil } +func (s *Server) agentSegmentName() string { + return s.config.Segment +} + // ReloadConfig is used to have the Server do an online reload of // relevant configuration information func (s *Server) ReloadConfig(config ReloadableConfig) error { diff --git a/agent/consul/server_oss.go b/agent/consul/server_oss.go index ad5a30eb07..b49b716b91 100644 --- a/agent/consul/server_oss.go +++ b/agent/consul/server_oss.go @@ -4,18 +4,68 @@ package consul import ( + "fmt" + "time" + + "github.com/armon/go-metrics" + "github.com/hashicorp/go-multierror" + "github.com/hashicorp/serf/coordinate" "github.com/hashicorp/serf/serf" "google.golang.org/grpc" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/lib" ) -func (s *Server) removeFailedNodeEnterprise(remove func(*serf.Serf, string) error, node, wanNode string) error { - // nothing to do for oss - return nil +func (s *Server) registerEnterpriseGRPCServices(deps Deps, srv *grpc.Server) {} + +// JoinLAN is used to have Consul join the inner-DC pool The target address +// should be another node inside the DC listening on the Serf LAN address +func (s *Server) JoinLAN(addrs []string, entMeta *structs.EnterpriseMeta) (int, error) { + return s.serfLAN.Join(addrs, true) } -func (s *Server) registerEnterpriseGRPCServices(deps Deps, srv *grpc.Server) {} +// removeFailedNode is used to remove a failed node from the cluster +func (s *Server) removeFailedNode( + removeFn func(*serf.Serf, string) error, + node, wanNode string, + entMeta *structs.EnterpriseMeta, +) error { + maybeRemove := func(s *serf.Serf, node string) (bool, error) { + if !isSerfMember(s, node) { + return false, nil + } + return true, removeFn(s, node) + } + + foundAny := false + + var merr error + + if found, err := maybeRemove(s.serfLAN, node); err != nil { + merr = multierror.Append(merr, fmt.Errorf("could not remove failed node from LAN: %w", err)) + } else if found { + foundAny = true + } + + if s.serfWAN != nil { + if found, err := maybeRemove(s.serfWAN, wanNode); err != nil { + merr = multierror.Append(merr, fmt.Errorf("could not remove failed node from WAN: %w", err)) + } else if found { + foundAny = true + } + } + + if merr != nil { + return merr + } + + if !foundAny { + return fmt.Errorf("agent: No node found with name '%s'", node) + } + + return nil +} // lanPoolAllMembers only returns our own segment or partition's members, because // OSS servers can't be in multiple segments or partitions. @@ -39,3 +89,62 @@ func (s *Server) LANMembers(filter LANMemberFilter) ([]serf.Member, error) { } return s.LANMembersInAgentPartition(), nil } + +func (s *Server) GetMatchingLANCoordinate(_, _ string) (*coordinate.Coordinate, error) { + return s.serfLAN.GetCoordinate() +} + +func (s *Server) addEnterpriseLANCoordinates(cs lib.CoordinateSet) error { + return nil +} + +func (s *Server) LANSendUserEvent(name string, payload []byte, coalesce bool) error { + err := s.serfLAN.UserEvent(name, payload, coalesce) + if err != nil { + return fmt.Errorf("error broadcasting event: %w", err) + } + return nil +} + +func (s *Server) DoWithLANSerfs( + fn func(name, poolKind string, pool *serf.Serf) error, + errorFn func(name, poolKind string, err error) error, +) error { + if errorFn == nil { + errorFn = func(_, _ string, err error) error { return err } + } + err := fn("", "", s.serfLAN) + if err != nil { + return errorFn("", "", err) + } + return nil +} + +// reconcile is used to reconcile the differences between Serf membership and +// what is reflected in our strongly consistent store. Mainly we need to ensure +// all live nodes are registered, all failed nodes are marked as such, and all +// left nodes are deregistered. +func (s *Server) reconcile() (err error) { + defer metrics.MeasureSince([]string{"leader", "reconcile"}, time.Now()) + + members := s.serfLAN.Members() + knownMembers := make(map[string]struct{}) + for _, member := range members { + if err := s.reconcileMember(member); err != nil { + return err + } + knownMembers[member.Name] = struct{}{} + } + + // Reconcile any members that have been reaped while we were not the + // leader. + return s.reconcileReaped(knownMembers, nil) +} + +func (s *Server) addEnterpriseStats(stats map[string]map[string]string) { + // no-op +} + +func getSerfMemberEnterpriseMeta(member serf.Member) *structs.EnterpriseMeta { + return structs.NodeEnterpriseMetaInDefaultPartition() +} diff --git a/agent/consul/server_serf.go b/agent/consul/server_serf.go index 1950c6c326..f5864f654e 100644 --- a/agent/consul/server_serf.go +++ b/agent/consul/server_serf.go @@ -1,6 +1,7 @@ package consul import ( + "errors" "fmt" "net" "path/filepath" @@ -32,29 +33,69 @@ const ( maxPeerRetries = 6 ) +type setupSerfOptions struct { + Config *serf.Config + EventCh chan serf.Event + SnapshotPath string + Listener net.Listener + + // WAN only + WAN bool + + // LAN only + Segment string + Partition string +} + // setupSerf is used to setup and initialize a Serf -func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, wan bool, wanPort int, - segment string, listener net.Listener) (*serf.Serf, error) { +func (s *Server) setupSerf(opts setupSerfOptions) (*serf.Serf, error) { + conf, err := s.setupSerfConfig(opts) + if err != nil { + return nil, err + } + return serf.Create(conf) +} + +func (s *Server) setupSerfConfig(opts setupSerfOptions) (*serf.Config, error) { + if opts.Config == nil { + return nil, errors.New("serf config is a required field") + } + if opts.Listener == nil { + return nil, errors.New("listener is a required field") + } + if opts.WAN { + if opts.Segment != "" { + return nil, errors.New("cannot configure segments on the WAN serf pool") + } + if opts.Partition != "" { + return nil, errors.New("cannot configure partitions on the WAN serf pool") + } + } + + conf := opts.Config conf.Init() - if wan { + if opts.WAN { conf.NodeName = fmt.Sprintf("%s.%s", s.config.NodeName, s.config.Datacenter) } else { conf.NodeName = s.config.NodeName - if wanPort > 0 { - conf.Tags["wan_join_port"] = fmt.Sprintf("%d", wanPort) + if s.config.SerfWANConfig != nil { + serfBindPortWAN := s.config.SerfWANConfig.MemberlistConfig.BindPort + if serfBindPortWAN > 0 { + conf.Tags["wan_join_port"] = fmt.Sprintf("%d", serfBindPortWAN) + } } } conf.Tags["role"] = "consul" conf.Tags["dc"] = s.config.Datacenter - conf.Tags["segment"] = segment + conf.Tags["segment"] = opts.Segment conf.Tags["id"] = string(s.config.NodeID) conf.Tags["vsn"] = fmt.Sprintf("%d", s.config.ProtocolVersion) conf.Tags["vsn_min"] = fmt.Sprintf("%d", ProtocolVersionMin) conf.Tags["vsn_max"] = fmt.Sprintf("%d", ProtocolVersionMax) conf.Tags["raft_vsn"] = fmt.Sprintf("%d", s.config.RaftConfig.ProtocolVersion) conf.Tags["build"] = s.config.Build - addr := listener.Addr().(*net.TCPAddr) + addr := opts.Listener.Addr().(*net.TCPAddr) conf.Tags["port"] = fmt.Sprintf("%d", addr.Port) if s.config.Bootstrap { conf.Tags["bootstrap"] = "1" @@ -87,7 +128,7 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w conf.Tags["ft_si"] = "1" var subLoggerName string - if wan { + if opts.WAN { subLoggerName = logging.WAN } else { subLoggerName = logging.LAN @@ -107,22 +148,23 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w conf.MemberlistConfig.Logger = memberlistLogger conf.Logger = serfLogger - conf.EventCh = ch + conf.EventCh = opts.EventCh conf.ProtocolVersion = protocolVersionMap[s.config.ProtocolVersion] conf.RejoinAfterLeave = s.config.RejoinAfterLeave - if wan { + if opts.WAN { conf.Merge = &wanMergeDelegate{} } else { conf.Merge = &lanMergeDelegate{ - dc: s.config.Datacenter, - nodeID: s.config.NodeID, - nodeName: s.config.NodeName, - segment: segment, - server: true, + dc: s.config.Datacenter, + nodeID: s.config.NodeID, + nodeName: s.config.NodeName, + segment: opts.Segment, + partition: opts.Partition, + server: true, } } - if wan { + if opts.WAN { nt, err := memberlist.NewNetTransport(&memberlist.NetTransportConfig{ BindAddrs: []string{conf.MemberlistConfig.BindAddr}, BindPort: conf.MemberlistConfig.BindPort, @@ -154,7 +196,7 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w // node which is rather unexpected. conf.EnableNameConflictResolution = false - if wan && s.config.ConnectMeshGatewayWANFederationEnabled { + if opts.WAN && s.config.ConnectMeshGatewayWANFederationEnabled { conf.MemberlistConfig.RequireNodeNames = true conf.MemberlistConfig.DisableTcpPingsForNode = func(nodeName string) bool { _, dc, err := wanfed.SplitNodeName(nodeName) @@ -169,7 +211,7 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w } if !s.config.DevMode { - conf.SnapshotPath = filepath.Join(s.config.DataDir, path) + conf.SnapshotPath = filepath.Join(s.config.DataDir, opts.SnapshotPath) } if err := lib.EnsurePath(conf.SnapshotPath, false); err != nil { return nil, err @@ -183,7 +225,7 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w s.config.OverrideInitialSerfTags(conf.Tags) } - return serf.Create(conf) + return conf, nil } // userEventName computes the name of a user event diff --git a/agent/consul/server_test.go b/agent/consul/server_test.go index 84cb868b38..978d2453cd 100644 --- a/agent/consul/server_test.go +++ b/agent/consul/server_test.go @@ -32,7 +32,6 @@ import ( "github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/consul/types" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -389,35 +388,38 @@ func TestServer_JoinLAN(t *testing.T) { }) } -// TestServer_JoinLAN_SerfAllowedCIDRs test that IPs might be blocked -// with Serf. -// To run properly, this test requires to be able to bind and have access -// on 127.0.1.1 which is the case for most Linux machines and Windows, -// so Unit test will run in the CI. -// To run it on Mac OS, please run this commandd first, otherwise the -// test will be skipped: `sudo ifconfig lo0 alias 127.0.1.1 up` +// TestServer_JoinLAN_SerfAllowedCIDRs test that IPs might be blocked with +// Serf. +// +// To run properly, this test requires to be able to bind and have access on +// 127.0.1.1 which is the case for most Linux machines and Windows, so Unit +// test will run in the CI. +// +// To run it on Mac OS, please run this command first, otherwise the test will +// be skipped: `sudo ifconfig lo0 alias 127.0.1.1 up` func TestServer_JoinLAN_SerfAllowedCIDRs(t *testing.T) { t.Parallel() + + const targetAddr = "127.0.1.1" + + skipIfCannotBindToIP(t, targetAddr) + dir1, s1 := testServerWithConfig(t, func(c *Config) { c.BootstrapExpect = 1 lan, err := memberlist.ParseCIDRs([]string{"127.0.0.1/32"}) - assert.NoError(t, err) + require.NoError(t, err) c.SerfLANConfig.MemberlistConfig.CIDRsAllowed = lan wan, err := memberlist.ParseCIDRs([]string{"127.0.0.0/24", "::1/128"}) - assert.NoError(t, err) + require.NoError(t, err) c.SerfWANConfig.MemberlistConfig.CIDRsAllowed = wan }) defer os.RemoveAll(dir1) defer s1.Shutdown() - targetAddr := "127.0.1.1" - dir2, a2, err := testClientWithConfigWithErr(t, func(c *Config) { + dir2, a2 := testClientWithConfig(t, func(c *Config) { c.SerfLANConfig.MemberlistConfig.BindAddr = targetAddr }) defer os.RemoveAll(dir2) - if err != nil { - t.Skipf("Cannot bind on %s, to run on Mac OS: `sudo ifconfig lo0 alias 127.0.1.1 up`", targetAddr) - } defer a2.Shutdown() dir3, rs3 := testServerWithConfig(t, func(c *Config) { @@ -451,6 +453,80 @@ func TestServer_JoinLAN_SerfAllowedCIDRs(t *testing.T) { }) } +// TestServer_JoinWAN_SerfAllowedCIDRs test that IPs might be +// blocked with Serf. +// +// To run properly, this test requires to be able to bind and have access on +// 127.0.1.1 which is the case for most Linux machines and Windows, so Unit +// test will run in the CI. +// +// To run it on Mac OS, please run this command first, otherwise the test will +// be skipped: `sudo ifconfig lo0 alias 127.0.1.1 up` +func TestServer_JoinWAN_SerfAllowedCIDRs(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + const targetAddr = "127.0.1.1" + + skipIfCannotBindToIP(t, targetAddr) + + wanCIDRs, err := memberlist.ParseCIDRs([]string{"127.0.0.1/32"}) + require.NoError(t, err) + + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.Bootstrap = true + c.BootstrapExpect = 1 + c.Datacenter = "dc1" + c.SerfWANConfig.MemberlistConfig.CIDRsAllowed = wanCIDRs + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + waitForLeaderEstablishment(t, s1) + testrpc.WaitForLeader(t, s1.RPC, "dc1") + + dir2, s2 := testServerWithConfig(t, func(c *Config) { + c.Bootstrap = true + c.BootstrapExpect = 1 + c.PrimaryDatacenter = "dc1" + c.Datacenter = "dc2" + c.SerfWANConfig.MemberlistConfig.BindAddr = targetAddr + }) + defer os.RemoveAll(dir2) + defer s2.Shutdown() + + waitForLeaderEstablishment(t, s2) + testrpc.WaitForLeader(t, s2.RPC, "dc2") + + // Joining should be fine + joinWANWithNoMembershipChecks(t, s2, s1) + + // But membership is blocked if you go and take a peek on the server. + t.Run("LAN membership should only show each other", func(t *testing.T) { + require.Len(t, s1.LANMembersInAgentPartition(), 1) + require.Len(t, s2.LANMembersInAgentPartition(), 1) + }) + t.Run("WAN membership in the primary should not show the secondary", func(t *testing.T) { + require.Len(t, s1.WANMembers(), 1) + }) + t.Run("WAN membership in the secondary can show the primary", func(t *testing.T) { + require.Len(t, s2.WANMembers(), 2) + }) +} + +func skipIfCannotBindToIP(t *testing.T, ip string) { + ports := freeport.MustTake(1) + defer freeport.Return(ports) + + addr := ipaddr.FormatAddressPort(ip, ports[0]) + l, err := net.Listen("tcp", addr) + l.Close() + if err != nil { + t.Skipf("Cannot bind on %s, to run on Mac OS: `sudo ifconfig lo0 alias %s up`", ip, ip) + } +} + func TestServer_LANReap(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") diff --git a/agent/consul/util.go b/agent/consul/util.go index 09e69381a9..3e6b17a287 100644 --- a/agent/consul/util.go +++ b/agent/consul/util.go @@ -158,3 +158,12 @@ func (c *Client) CheckServers(datacenter string, fn func(*metadata.Server) bool) c.router.CheckServers(datacenter, fn) } + +func isSerfMember(s *serf.Serf, nodeName string) bool { + for _, m := range s.Members() { + if m.Name == nodeName { + return true + } + } + return false +} diff --git a/agent/coordinate_endpoint_test.go b/agent/coordinate_endpoint_test.go index 78d5ff6095..36b956a8f8 100644 --- a/agent/coordinate_endpoint_test.go +++ b/agent/coordinate_endpoint_test.go @@ -8,11 +8,12 @@ import ( "testing" "time" + "github.com/hashicorp/serf/coordinate" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" - "github.com/hashicorp/serf/coordinate" ) func TestCoordinate_Disabled_Response(t *testing.T) { @@ -137,7 +138,6 @@ func TestCoordinate_Nodes(t *testing.T) { arg1 := structs.CoordinateUpdateRequest{ Datacenter: "dc1", Node: "foo", - Segment: "alpha", Coord: coordinate.NewCoordinate(coordinate.DefaultConfig()), } var out struct{} @@ -178,65 +178,6 @@ func TestCoordinate_Nodes(t *testing.T) { r.Fatalf("expected: bar, foo received: %v", coordinates) } }) - // Filter on a nonexistent node segment - req, _ = http.NewRequest("GET", "/v1/coordinate/nodes?segment=nope", nil) - resp = httptest.NewRecorder() - retry.Run(t, func(r *retry.R) { - obj, err := a.srv.CoordinateNodes(resp, req) - if err != nil { - r.Fatalf("err: %v", err) - } - - if resp.Code != http.StatusOK { - r.Fatalf("bad: %v", resp.Code) - } - - coordinates, ok := obj.(structs.Coordinates) - if !ok { - r.Fatalf("expected: structs.Coordinates, received: %+v", obj) - } - if len(coordinates) != 0 { - r.Fatalf("coordinates should be empty, received: %v", coordinates) - } - }) - // Filter on a real node segment - req, _ = http.NewRequest("GET", "/v1/coordinate/nodes?segment=alpha", nil) - resp = httptest.NewRecorder() - retry.Run(t, func(r *retry.R) { - obj, err := a.srv.CoordinateNodes(resp, req) - if err != nil { - r.Fatalf("err: %v", err) - } - - if resp.Code != http.StatusOK { - r.Fatalf("bad: %v", resp.Code) - } - - coordinates, ok := obj.(structs.Coordinates) - if !ok { - r.Fatalf("expected: structs.Coordinates, received: %+v", obj) - } - if len(coordinates) != 1 || coordinates[0].Node != "foo" { - r.Fatalf("expected: foo received: %v", coordinates) - } - }) - // Make sure the empty filter works - req, _ = http.NewRequest("GET", "/v1/coordinate/nodes?segment=", nil) - resp = httptest.NewRecorder() - retry.Run(t, func(r *retry.R) { - obj, err := a.srv.CoordinateNodes(resp, req) - if err != nil { - r.Fatalf("err: %v", err) - } - - coordinates, ok := obj.(structs.Coordinates) - if !ok { - r.Fatalf("expected: structs.Coordinates, received: %+v", obj) - } - if len(coordinates) != 1 || coordinates[0].Node != "bar" { - r.Fatalf("expected: bar received: %v", coordinates) - } - }) } func TestCoordinate_Node(t *testing.T) { @@ -280,7 +221,6 @@ func TestCoordinate_Node(t *testing.T) { arg1 := structs.CoordinateUpdateRequest{ Datacenter: "dc1", Node: "foo", - Segment: "alpha", Coord: coordinate.NewCoordinate(coordinate.DefaultConfig()), } var out struct{} @@ -315,45 +255,6 @@ func TestCoordinate_Node(t *testing.T) { coordinates[0].Node != "foo" { t.Fatalf("bad: %v", coordinates) } - - // Filter on a nonexistent node segment - req, _ = http.NewRequest("GET", "/v1/coordinate/node/foo?segment=nope", nil) - resp = httptest.NewRecorder() - _, err = a.srv.CoordinateNode(resp, req) - if err != nil { - t.Fatalf("err: %v", err) - } - if resp.Code != http.StatusNotFound { - t.Fatalf("bad: %v", resp.Code) - } - - // Filter on a real node segment - req, _ = http.NewRequest("GET", "/v1/coordinate/node/foo?segment=alpha", nil) - resp = httptest.NewRecorder() - obj, err = a.srv.CoordinateNode(resp, req) - if err != nil { - t.Fatalf("err: %v", err) - } - - if resp.Code != http.StatusOK { - t.Fatalf("bad: %v", resp.Code) - } - - coordinates = obj.(structs.Coordinates) - if len(coordinates) != 1 || coordinates[0].Node != "foo" { - t.Fatalf("bad: %v", coordinates) - } - - // Make sure the empty filter works - req, _ = http.NewRequest("GET", "/v1/coordinate/node/foo?segment=", nil) - resp = httptest.NewRecorder() - _, err = a.srv.CoordinateNode(resp, req) - if err != nil { - t.Fatalf("err: %v", err) - } - if resp.Code != http.StatusNotFound { - t.Fatalf("bad: %v", resp.Code) - } } func TestCoordinate_Update(t *testing.T) { diff --git a/agent/local/state_test.go b/agent/local/state_test.go index f4be09ba12..46d0d6e6f3 100644 --- a/agent/local/state_test.go +++ b/agent/local/state_test.go @@ -2042,6 +2042,7 @@ func TestAgent_sendCoordinate(t *testing.T) { } t.Parallel() + a := agent.StartTestAgent(t, agent.TestAgent{Overrides: ` sync_coordinate_interval_min = "1ms" sync_coordinate_rate_target = 10.0 diff --git a/agent/metadata/server.go b/agent/metadata/server.go index 6fdad57c88..3715032c6a 100644 --- a/agent/metadata/server.go +++ b/agent/metadata/server.go @@ -32,6 +32,7 @@ type Server struct { SegmentAddrs map[string]string SegmentPorts map[string]int WanJoinPort int + LanJoinPort int Bootstrap bool Expect int Build version.Version @@ -168,6 +169,7 @@ func IsConsulServer(m serf.Member) (bool, *Server) { SegmentAddrs: segmentAddrs, SegmentPorts: segmentPorts, WanJoinPort: wanJoinPort, + LanJoinPort: int(m.Port), Bootstrap: bootstrap, Expect: expect, Addr: addr, diff --git a/agent/operator_endpoint.go b/agent/operator_endpoint.go index 644fe7fba6..e43302aef3 100644 --- a/agent/operator_endpoint.go +++ b/agent/operator_endpoint.go @@ -6,11 +6,12 @@ import ( "strconv" "time" - "github.com/hashicorp/consul/agent/structs" - "github.com/hashicorp/consul/api" multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/raft" autopilot "github.com/hashicorp/raft-autopilot" + + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/api" ) // OperatorRaftConfiguration is used to inspect the current Raft configuration. @@ -172,6 +173,11 @@ func keyringErrorsOrNil(responses []*structs.KeyringResponse) error { if response.WAN { pool = "WAN" } + if response.Segment != "" { + pool += " [segment: " + response.Segment + "]" + } else if !structs.IsDefaultPartition(response.Partition) { + pool += " [partition: " + response.Partition + "]" + } errs = multierror.Append(errs, fmt.Errorf("%s error: %s", pool, response.Error)) for key, message := range response.Messages { errs = multierror.Append(errs, fmt.Errorf("%s: %s", key, message)) diff --git a/agent/structs/structs.go b/agent/structs/structs.go index 386280e7dd..cef5fd13b9 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -2591,6 +2591,7 @@ type KeyringResponse struct { WAN bool Datacenter string Segment string + Partition string `json:",omitempty"` Messages map[string]string `json:",omitempty"` Keys map[string]int PrimaryKeys map[string]int @@ -2598,6 +2599,10 @@ type KeyringResponse struct { Error string `json:",omitempty"` } +func (r *KeyringResponse) PartitionOrDefault() string { + return PartitionOrDefault(r.Partition) +} + // KeyringResponses holds multiple responses to keyring queries. Each // datacenter replies independently, and KeyringResponses is used as a // container for the set of all responses. diff --git a/api/go.mod b/api/go.mod index 348ad8a738..6a37c10dd6 100644 --- a/api/go.mod +++ b/api/go.mod @@ -10,7 +10,7 @@ require ( github.com/hashicorp/go-hclog v0.12.0 github.com/hashicorp/go-rootcerts v1.0.2 github.com/hashicorp/go-uuid v1.0.1 - github.com/hashicorp/serf v0.9.5 + github.com/hashicorp/serf v0.9.6 github.com/mitchellh/mapstructure v1.1.2 github.com/stretchr/testify v1.4.0 ) diff --git a/api/go.sum b/api/go.sum index b95bd47444..16706f53fd 100644 --- a/api/go.sum +++ b/api/go.sum @@ -36,11 +36,11 @@ github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/b github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64= -github.com/hashicorp/mdns v1.0.1/go.mod h1:4gW7WsVCke5TE7EPeYliwHlRUyBtfCwuFwuMg2DmyNY= -github.com/hashicorp/memberlist v0.2.2 h1:5+RffWKwqJ71YPu9mWsF7ZOscZmwfasdA8kbdC7AO2g= -github.com/hashicorp/memberlist v0.2.2/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= -github.com/hashicorp/serf v0.9.5 h1:EBWvyu9tcRszt3Bxp3KNssBMP1KuHWyO51lz9+786iM= -github.com/hashicorp/serf v0.9.5/go.mod h1:UWDWwZeL5cuWDJdl0C6wrvrUwEqtQ4ZKBKKENpqIUyk= +github.com/hashicorp/mdns v1.0.4/go.mod h1:mtBihi+LeNXGtG8L9dX59gAEa12BDtBQSp4v/YAJqrc= +github.com/hashicorp/memberlist v0.3.0 h1:8+567mCcFDnS5ADl7lrpxPMWiFCElyUEeW0gtj34fMA= +github.com/hashicorp/memberlist v0.3.0/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= +github.com/hashicorp/serf v0.9.6 h1:uuEX1kLR6aoda1TBttmJQKDLZE1Ob7KN0NPdE7EtCDc= +github.com/hashicorp/serf v0.9.6/go.mod h1:TXZNMjZQijwlDvp+r0b63xZ45H7JmCmgg4gpTwn9UV4= github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -56,9 +56,9 @@ github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcME github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE= github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= -github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= -github.com/miekg/dns v1.1.26 h1:gPxPSwALAeHJSjarOs00QjVdV9QoBvc1D2ujQUr5BzU= github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso= +github.com/miekg/dns v1.1.41 h1:WMszZWJG0XmzbK9FEmzH2TVcqYzFesusSIB41b8KHxY= +github.com/miekg/dns v1.1.41/go.mod h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJysuI= github.com/mitchellh/cli v1.1.0/go.mod h1:xcISNoH86gajksDmfB23e/pu+B+GeFRMYmoHXxx3xhI= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= @@ -83,20 +83,18 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392 h1:ACG4HJsFiNMf47Y4PeRoebLNy/2lXT9EtprMuTFWt1M= golang.org/x/crypto v0.0.0-20190923035154-9ee001bba392/go.mod h1:/lpIB1dKB+9EgE3H3cr1v9wB50oz8l4C4h62xy7jSTY= -golang.org/x/net v0.0.0-20181023162649-9b4f9f5ad519/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20190923162816-aa69164e4478 h1:l5EDrHhldLYb3ZRHDUhXF7Om7MvYXnkV9/iQNo1lX6g= golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1 h1:4qWs8cYYH6PoEFy4dfhDFgoMGkwAcETd+MmPdCPMzUc= +golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1/go.mod h1:9tjilg8BloeKEkVJvy7fQ90B1CfIiPueXVOjqfkSzI8= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190922100055-0a153f010e69/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -105,10 +103,16 @@ golang.org/x/sys v0.0.0-20191008105621-543471e840be/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200124204421-9fbb57f87de9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae h1:/WDfKMnPU+m5M4xB+6x4kaepxRw6jWvR5iDRdvjHgy8= golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210303074136-134d130e1a04/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44 h1:Bli41pIlzTzf3KEY06n+xnzK/BESIg2ze4Pgfh/aI8c= +golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190907020128-2ca718005c18/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/api/operator_keyring.go b/api/operator_keyring.go index 15f319bb11..6db31a252b 100644 --- a/api/operator_keyring.go +++ b/api/operator_keyring.go @@ -16,6 +16,9 @@ type KeyringResponse struct { // Segment has the network segment this request corresponds to. Segment string + // Partition has the admin partition this request corresponds to. + Partition string `json:",omitempty"` + // Messages has information or errors from serf Messages map[string]string `json:",omitempty"` diff --git a/command/forceleave/forceleave_test.go b/command/forceleave/forceleave_test.go index e138088548..f69059ca05 100644 --- a/command/forceleave/forceleave_test.go +++ b/command/forceleave/forceleave_test.go @@ -18,7 +18,6 @@ func TestForceLeaveCommand_noTabs(t *testing.T) { } } -// TODO(partitions): split this test and verify it works in partitions func TestForceLeaveCommand(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") @@ -62,7 +61,6 @@ func TestForceLeaveCommand(t *testing.T) { }) } -// TODO(partitions): split this test and verify it works in partitions func TestForceLeaveCommand_NoNodeWithName(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") diff --git a/command/join/join_test.go b/command/join/join_test.go index 555014a5df..48b89b32ea 100644 --- a/command/join/join_test.go +++ b/command/join/join_test.go @@ -16,8 +16,7 @@ func TestJoinCommand_noTabs(t *testing.T) { } } -// TODO(partitions): split this test and verify it works in partitions -func TestJoinCommandJoin_lan(t *testing.T) { +func TestJoinCommandJoin_LAN(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") } diff --git a/command/keyring/keyring.go b/command/keyring/keyring.go index acdb0476f7..6c73c74297 100644 --- a/command/keyring/keyring.go +++ b/command/keyring/keyring.go @@ -5,10 +5,12 @@ import ( "fmt" "strings" + "github.com/mitchellh/cli" + "github.com/hashicorp/consul/agent" + "github.com/hashicorp/consul/agent/structs" consulapi "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/command/flags" - "github.com/mitchellh/cli" ) func New(ui cli.Ui) *cmd { @@ -185,21 +187,25 @@ func (c *cmd) Run(args []string) int { func formatResponse(response *consulapi.KeyringResponse, keys map[string]int) string { b := new(strings.Builder) b.WriteString("\n") - b.WriteString(poolName(response.Datacenter, response.WAN, response.Segment)) + b.WriteString(poolName(response.Datacenter, response.WAN, response.Partition, response.Segment)) b.WriteString(formatMessages(response.Messages)) b.WriteString(formatKeys(keys, response.NumNodes)) return strings.TrimRight(b.String(), "\n") } -func poolName(dc string, wan bool, segment string) string { +func poolName(dc string, wan bool, partition, segment string) string { pool := fmt.Sprintf("%s (LAN)", dc) if wan { pool = "WAN" } + + var suffix string if segment != "" { - segment = fmt.Sprintf(" [%s]", segment) + suffix = fmt.Sprintf(" [%s]", segment) + } else if !structs.IsDefaultPartition(partition) { + suffix = fmt.Sprintf(" [partition: %s]", partition) } - return fmt.Sprintf("%s%s:\n", pool, segment) + return fmt.Sprintf("%s%s:\n", pool, suffix) } func formatMessages(messages map[string]string) string { diff --git a/command/keyring/keyring_test.go b/command/keyring/keyring_test.go index a9f6d396f5..166129fe00 100644 --- a/command/keyring/keyring_test.go +++ b/command/keyring/keyring_test.go @@ -4,10 +4,11 @@ import ( "strings" "testing" - "github.com/hashicorp/consul/agent" - consulapi "github.com/hashicorp/consul/api" "github.com/mitchellh/cli" "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent" + consulapi "github.com/hashicorp/consul/api" ) func TestKeyringCommand_noTabs(t *testing.T) { @@ -195,9 +196,8 @@ func removeKey(t *testing.T, addr string, key string) { } func TestKeyringCommand_poolName(t *testing.T) { - require.Equal(t, "dc1 (LAN):\n", poolName("dc1", false, "")) - require.Equal(t, "dc1 (LAN) [segment1]:\n", poolName("dc1", false, "segment1")) - require.Equal(t, "WAN:\n", poolName("dc1", true, "")) + require.Equal(t, "dc1 (LAN):\n", poolName("dc1", false, "", "")) + require.Equal(t, "WAN:\n", poolName("dc1", true, "", "")) } func TestKeyringCommand_formatKeys(t *testing.T) { diff --git a/command/members/members_test.go b/command/members/members_test.go index 7f8a6479db..cc4a21742a 100644 --- a/command/members/members_test.go +++ b/command/members/members_test.go @@ -1,6 +1,7 @@ package members import ( + "encoding/csv" "fmt" "math/rand" "sort" @@ -175,6 +176,35 @@ func TestMembersCommand_verticalBar(t *testing.T) { } } +func decodeOutput(t *testing.T, data string) []map[string]string { + r := csv.NewReader(strings.NewReader(data)) + r.Comma = ' ' + r.TrimLeadingSpace = true + + lines, err := r.ReadAll() + require.NoError(t, err) + if len(lines) < 2 { + return nil + } + + var out []map[string]string + for i := 1; i < len(lines); i++ { + m := zip(t, lines[0], lines[i]) + out = append(out, m) + } + return out +} + +func zip(t *testing.T, k, v []string) map[string]string { + require.Equal(t, len(k), len(v)) + + m := make(map[string]string) + for i := 0; i < len(k); i++ { + m[k[i]] = v[i] + } + return m +} + func TestSortByMemberNamePartitionAndSegment(t *testing.T) { lib.SeedMathRand() diff --git a/go.mod b/go.mod index a697ac7ab6..6392247c89 100644 --- a/go.mod +++ b/go.mod @@ -49,13 +49,12 @@ require ( github.com/hashicorp/golang-lru v0.5.4 github.com/hashicorp/hcl v1.0.0 github.com/hashicorp/hil v0.0.0-20200423225030-a18a1cd20038 - github.com/hashicorp/mdns v1.0.4 // indirect - github.com/hashicorp/memberlist v0.2.4 + github.com/hashicorp/memberlist v0.3.0 github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69 github.com/hashicorp/raft v1.3.2 github.com/hashicorp/raft-autopilot v0.1.5 github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea - github.com/hashicorp/serf v0.9.6-0.20210609195804-2b5dd0cd2de9 + github.com/hashicorp/serf v0.9.6 github.com/hashicorp/vault/api v1.0.5-0.20200717191844-f687267c8086 github.com/hashicorp/vault/sdk v0.1.14-0.20200519221838-e0cfd64bc267 github.com/hashicorp/yamux v0.0.0-20210826001029-26ff87cf9493 diff --git a/go.sum b/go.sum index abf4f8ecb0..18d3e92270 100644 --- a/go.sum +++ b/go.sum @@ -278,9 +278,8 @@ github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO github.com/hashicorp/mdns v1.0.1/go.mod h1:4gW7WsVCke5TE7EPeYliwHlRUyBtfCwuFwuMg2DmyNY= github.com/hashicorp/mdns v1.0.4 h1:sY0CMhFmjIPDMlTB+HfymFHCaYLhgifZ0QhjaYKD/UQ= github.com/hashicorp/mdns v1.0.4/go.mod h1:mtBihi+LeNXGtG8L9dX59gAEa12BDtBQSp4v/YAJqrc= -github.com/hashicorp/memberlist v0.2.2/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= -github.com/hashicorp/memberlist v0.2.4 h1:OOhYzSvFnkFQXm1ysE8RjXTHsqSRDyP4emusC9K7DYg= -github.com/hashicorp/memberlist v0.2.4/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= +github.com/hashicorp/memberlist v0.3.0 h1:8+567mCcFDnS5ADl7lrpxPMWiFCElyUEeW0gtj34fMA= +github.com/hashicorp/memberlist v0.3.0/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69 h1:lc3c72qGlIMDqQpQH82Y4vaglRMMFdJbziYWriR4UcE= github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69/go.mod h1:/z+jUGRBlwVpUZfjute9jWaF6/HuhjuFQuL1YXzVD1Q= github.com/hashicorp/raft v1.1.1/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8= @@ -291,9 +290,8 @@ github.com/hashicorp/raft-autopilot v0.1.5 h1:onEfMH5uHVdXQqtas36zXUHEZxLdsJVu/n github.com/hashicorp/raft-autopilot v0.1.5/go.mod h1:Af4jZBwaNOI+tXfIqIdbcAnh/UyyqIMj/pOISIfhArw= github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea h1:xykPFhrBAS2J0VBzVa5e80b5ZtYuNQtgXjN40qBZlD4= github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea/go.mod h1:pNv7Wc3ycL6F5oOWn+tPGo2gWD4a5X+yp/ntwdKLjRk= -github.com/hashicorp/serf v0.9.5/go.mod h1:UWDWwZeL5cuWDJdl0C6wrvrUwEqtQ4ZKBKKENpqIUyk= -github.com/hashicorp/serf v0.9.6-0.20210609195804-2b5dd0cd2de9 h1:lCZfMBDn/Puwg9VosHMf/9p9jNDYYkbzVjb4jYjVfqU= -github.com/hashicorp/serf v0.9.6-0.20210609195804-2b5dd0cd2de9/go.mod h1:qapjppkpNXHYTyzx+HqkyWGGkmUxafHjuspm/Bqb2Jc= +github.com/hashicorp/serf v0.9.6 h1:uuEX1kLR6aoda1TBttmJQKDLZE1Ob7KN0NPdE7EtCDc= +github.com/hashicorp/serf v0.9.6/go.mod h1:TXZNMjZQijwlDvp+r0b63xZ45H7JmCmgg4gpTwn9UV4= github.com/hashicorp/vault/api v1.0.5-0.20200717191844-f687267c8086 h1:OKsyxKi2sNmqm1Gv93adf2AID2FOBFdCbbZn9fGtIdg= github.com/hashicorp/vault/api v1.0.5-0.20200717191844-f687267c8086/go.mod h1:R3Umvhlxi2TN7Ex2hzOowyeNb+SfbVWI973N+ctaFMk= github.com/hashicorp/vault/sdk v0.1.14-0.20200519221838-e0cfd64bc267 h1:e1ok06zGrWJW91rzRroyl5nRNqraaBe4d5hiKcVZuHM=