diff --git a/.changelog/11429.txt b/.changelog/11429.txt new file mode 100644 index 0000000000..bff9f6c3a8 --- /dev/null +++ b/.changelog/11429.txt @@ -0,0 +1,3 @@ +```release-note:improvement +agent: refactor the agent delegate interface to be partition friendly +``` diff --git a/agent/acl.go b/agent/acl.go index 5130a608d8..fd196c84c6 100644 --- a/agent/acl.go +++ b/agent/acl.go @@ -99,7 +99,7 @@ func (a *Agent) vetCheckRegisterWithAuthorizer(authz acl.Authorizer, check *stru } } else { if authz.NodeWrite(a.config.NodeName, &authzContext) != acl.Allow { - return acl.PermissionDenied("Missing node:write on %s", structs.NodeNameString(a.config.NodeName, a.agentEnterpriseMeta())) + return acl.PermissionDenied("Missing node:write on %s", structs.NodeNameString(a.config.NodeName, a.AgentEnterpriseMeta())) } } @@ -111,7 +111,7 @@ func (a *Agent) vetCheckRegisterWithAuthorizer(authz acl.Authorizer, check *stru } } else { if authz.NodeWrite(a.config.NodeName, &authzContext) != acl.Allow { - return acl.PermissionDenied("Missing node:write on %s", structs.NodeNameString(a.config.NodeName, a.agentEnterpriseMeta())) + return acl.PermissionDenied("Missing node:write on %s", structs.NodeNameString(a.config.NodeName, a.AgentEnterpriseMeta())) } } } @@ -131,7 +131,7 @@ func (a *Agent) vetCheckUpdateWithAuthorizer(authz acl.Authorizer, checkID struc } } else { if authz.NodeWrite(a.config.NodeName, &authzContext) != acl.Allow { - return acl.PermissionDenied("Missing node:write on %s", structs.NodeNameString(a.config.NodeName, a.agentEnterpriseMeta())) + return acl.PermissionDenied("Missing node:write on %s", structs.NodeNameString(a.config.NodeName, a.AgentEnterpriseMeta())) } } } else { diff --git a/agent/acl_test.go b/agent/acl_test.go index a97b9dbd98..e3b3f5ca5f 100644 --- a/agent/acl_test.go +++ b/agent/acl_test.go @@ -128,25 +128,21 @@ func (a *TestACLAgent) GetLANCoordinate() (lib.CoordinateSet, error) { func (a *TestACLAgent) Leave() error { return fmt.Errorf("Unimplemented") } -func (a *TestACLAgent) LANMembers() []serf.Member { +func (a *TestACLAgent) LANMembersInAgentPartition() []serf.Member { return nil } -func (a *TestACLAgent) LANMembersAllSegments() ([]serf.Member, error) { +func (a *TestACLAgent) LANMembers(f consul.LANMemberFilter) ([]serf.Member, error) { return nil, fmt.Errorf("Unimplemented") } -func (a *TestACLAgent) LANSegmentMembers(segment string) ([]serf.Member, error) { - return nil, fmt.Errorf("Unimplemented") -} -func (a *TestACLAgent) LocalMember() serf.Member { +func (a *TestACLAgent) AgentLocalMember() serf.Member { return serf.Member{} } -func (a *TestACLAgent) JoinLAN(addrs []string) (n int, err error) { +func (a *TestACLAgent) JoinLAN(addrs []string, entMeta *structs.EnterpriseMeta) (n int, err error) { return 0, fmt.Errorf("Unimplemented") } -func (a *TestACLAgent) RemoveFailedNode(node string, prune bool) error { +func (a *TestACLAgent) RemoveFailedNode(node string, prune bool, entMeta *structs.EnterpriseMeta) error { return fmt.Errorf("Unimplemented") } - func (a *TestACLAgent) RPC(method string, args interface{}, reply interface{}) error { return fmt.Errorf("Unimplemented") } diff --git a/agent/agent.go b/agent/agent.go index 5464994b20..68103decf0 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -124,14 +124,35 @@ func ConfigSourceFromName(name string) (configSource, bool) { // delegate defines the interface shared by both // consul.Client and consul.Server. type delegate interface { - GetLANCoordinate() (lib.CoordinateSet, error) + // Leave is used to prepare for a graceful shutdown. Leave() error - LANMembers() []serf.Member - LANMembersAllSegments() ([]serf.Member, error) - LANSegmentMembers(segment string) ([]serf.Member, error) - LocalMember() serf.Member - JoinLAN(addrs []string) (n int, err error) - RemoveFailedNode(node string, prune bool) error + + // AgentLocalMember is used to retrieve the LAN member for the local node. + AgentLocalMember() serf.Member + + // LANMembersInAgentPartition returns the LAN members for this agent's + // canonical serf pool. For clients this is the only pool that exists. For + // servers it's the pool in the default segment and the default partition. + LANMembersInAgentPartition() []serf.Member + + // LANMembers returns the LAN members for one of: + // + // - the requested partition + // - the requested segment + // - all segments + // + // This is limited to segments and partitions that the node is a member of. + LANMembers(f consul.LANMemberFilter) ([]serf.Member, error) + + // GetLANCoordinate returns the coordinate of the node in the LAN gossip pool. + GetLANCoordinate() (lib.CoordinateSet, error) + + // JoinLAN is used to have Consul join the inner-DC pool The target address + // should be another node inside the DC listening on the Serf LAN address + JoinLAN(addrs []string, entMeta *structs.EnterpriseMeta) (n int, err error) + + // RemoveFailedNode is used to remove a failed node from the cluster. + RemoveFailedNode(node string, prune bool, entMeta *structs.EnterpriseMeta) error // TODO: replace this method with consul.ACLResolver ResolveTokenToIdentity(token string) (structs.ACLIdentity, error) @@ -515,8 +536,11 @@ func (a *Agent) Start(ctx context.Context) error { a.delegate = client } - // the staggering of the state syncing depends on the cluster size. - a.sync.ClusterSize = func() int { return len(a.delegate.LANMembers()) } + // The staggering of the state syncing depends on the cluster size. + // + // NOTE: we will use the agent's canonical serf pool for this since that's + // similarly scoped with the state store side of anti-entropy. + a.sync.ClusterSize = func() int { return len(a.delegate.LANMembersInAgentPartition()) } // link the state with the consul server/client and the state syncer // via callbacks. After several attempts this was easier than using @@ -1443,9 +1467,9 @@ func (a *Agent) ShutdownCh() <-chan struct{} { } // JoinLAN is used to have the agent join a LAN cluster -func (a *Agent) JoinLAN(addrs []string) (n int, err error) { +func (a *Agent) JoinLAN(addrs []string, entMeta *structs.EnterpriseMeta) (n int, err error) { a.logger.Info("(LAN) joining", "lan_addresses", addrs) - n, err = a.delegate.JoinLAN(addrs) + n, err = a.delegate.JoinLAN(addrs, entMeta) if err == nil { a.logger.Info("(LAN) joined", "number_of_nodes", n) if a.joinLANNotifier != nil { @@ -1510,12 +1534,13 @@ func (a *Agent) RefreshPrimaryGatewayFallbackAddresses(addrs []string) error { } // ForceLeave is used to remove a failed node from the cluster -func (a *Agent) ForceLeave(node string, prune bool) (err error) { +func (a *Agent) ForceLeave(node string, prune bool, entMeta *structs.EnterpriseMeta) (err error) { a.logger.Info("Force leaving node", "node", node) + // TODO(partitions): merge IsMember into the RemoveFailedNode call. if ok := a.IsMember(node); !ok { return fmt.Errorf("agent: No node found with name '%s'", node) } - err = a.delegate.RemoveFailedNode(node, prune) + err = a.delegate.RemoveFailedNode(node, prune, entMeta) if err != nil { a.logger.Warn("Failed to remove node", "node", node, @@ -1525,20 +1550,19 @@ func (a *Agent) ForceLeave(node string, prune bool) (err error) { return err } -// LocalMember is used to return the local node -func (a *Agent) LocalMember() serf.Member { - return a.delegate.LocalMember() +// AgentLocalMember is used to retrieve the LAN member for the local node. +func (a *Agent) AgentLocalMember() serf.Member { + return a.delegate.AgentLocalMember() } -// LANMembers is used to retrieve the LAN members -func (a *Agent) LANMembers() []serf.Member { - // TODO(partitions): filter this by the partition? - return a.delegate.LANMembers() +// LANMembersInAgentPartition is used to retrieve the LAN members for this +// agent's partition. +func (a *Agent) LANMembersInAgentPartition() []serf.Member { + return a.delegate.LANMembersInAgentPartition() } // WANMembers is used to retrieve the WAN members func (a *Agent) WANMembers() []serf.Member { - // TODO(partitions): filter this by the partition by omitting wan results for now? if srv, ok := a.delegate.(*consul.Server); ok { return srv.WANMembers() } @@ -1548,7 +1572,7 @@ func (a *Agent) WANMembers() []serf.Member { // IsMember is used to check if a node with the given nodeName // is a member func (a *Agent) IsMember(nodeName string) bool { - for _, m := range a.LANMembers() { + for _, m := range a.LANMembersInAgentPartition() { if m.Name == nodeName { return true } @@ -1626,12 +1650,12 @@ OUTER: for { rate := a.config.SyncCoordinateRateTarget min := a.config.SyncCoordinateIntervalMin - intv := lib.RateScaledInterval(rate, min, len(a.LANMembers())) + intv := lib.RateScaledInterval(rate, min, len(a.LANMembersInAgentPartition())) intv = intv + lib.RandomStagger(intv) select { case <-time.After(intv): - members := a.LANMembers() + members := a.LANMembersInAgentPartition() grok, err := consul.CanServersUnderstandProtocol(members, 3) if err != nil { a.logger.Error("Failed to check servers", "error", err) @@ -1655,7 +1679,7 @@ OUTER: Node: a.config.NodeName, Segment: segment, Coord: coord, - EnterpriseMeta: *a.agentEnterpriseMeta(), + EnterpriseMeta: *a.AgentEnterpriseMeta(), WriteRequest: structs.WriteRequest{Token: agentToken}, } var reply struct{} @@ -2732,7 +2756,7 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType, var rpcReq structs.NodeSpecificRequest rpcReq.Datacenter = a.config.Datacenter - rpcReq.EnterpriseMeta = *a.agentEnterpriseMeta() + rpcReq.EnterpriseMeta = *a.AgentEnterpriseMeta() // The token to set is really important. The behavior below follows // the same behavior as anti-entropy: we use the user-specified token diff --git a/agent/agent_endpoint.go b/agent/agent_endpoint.go index c79a0af1b1..d97fdcb385 100644 --- a/agent/agent_endpoint.go +++ b/agent/agent_endpoint.go @@ -20,6 +20,7 @@ import ( "github.com/hashicorp/consul/acl" cachetype "github.com/hashicorp/consul/agent/cache-types" + "github.com/hashicorp/consul/agent/consul" "github.com/hashicorp/consul/agent/debug" "github.com/hashicorp/consul/agent/structs" token_store "github.com/hashicorp/consul/agent/token" @@ -58,7 +59,7 @@ func (s *HTTPHandlers) AgentSelf(resp http.ResponseWriter, req *http.Request) (i // Authorize using the agent's own enterprise meta, not the token. var authzContext acl.AuthorizerContext - s.agent.agentEnterpriseMeta().FillAuthzContext(&authzContext) + s.agent.AgentEnterpriseMeta().FillAuthzContext(&authzContext) if authz.AgentRead(s.agent.config.NodeName, &authzContext) != acl.Allow { return nil, acl.ErrPermissionDenied } @@ -104,7 +105,7 @@ func (s *HTTPHandlers) AgentSelf(resp http.ResponseWriter, req *http.Request) (i Config: config, DebugConfig: s.agent.config.Sanitized(), Coord: cs[s.agent.config.SegmentName], - Member: s.agent.LocalMember(), + Member: s.agent.AgentLocalMember(), Stats: s.agent.Stats(), Meta: s.agent.State.Metadata(), XDS: xds, @@ -148,7 +149,7 @@ func (s *HTTPHandlers) AgentMetrics(resp http.ResponseWriter, req *http.Request) // Authorize using the agent's own enterprise meta, not the token. var authzContext acl.AuthorizerContext - s.agent.agentEnterpriseMeta().FillAuthzContext(&authzContext) + s.agent.AgentEnterpriseMeta().FillAuthzContext(&authzContext) if authz.AgentRead(s.agent.config.NodeName, &authzContext) != acl.Allow { return nil, acl.ErrPermissionDenied } @@ -183,7 +184,7 @@ func (s *HTTPHandlers) AgentMetricsStream(resp http.ResponseWriter, req *http.Re // Authorize using the agent's own enterprise meta, not the token. var authzContext acl.AuthorizerContext - s.agent.agentEnterpriseMeta().FillAuthzContext(&authzContext) + s.agent.AgentEnterpriseMeta().FillAuthzContext(&authzContext) if authz.AgentRead(s.agent.config.NodeName, &authzContext) != acl.Allow { return nil, acl.ErrPermissionDenied } @@ -236,7 +237,7 @@ func (s *HTTPHandlers) AgentReload(resp http.ResponseWriter, req *http.Request) // Authorize using the agent's own enterprise meta, not the token. var authzContext acl.AuthorizerContext - s.agent.agentEnterpriseMeta().FillAuthzContext(&authzContext) + s.agent.AgentEnterpriseMeta().FillAuthzContext(&authzContext) if authz.AgentWrite(s.agent.config.NodeName, &authzContext) != acl.Allow { return nil, acl.ErrPermissionDenied } @@ -520,12 +521,16 @@ func (s *HTTPHandlers) AgentMembers(resp http.ResponseWriter, req *http.Request) if wan { members = s.agent.WANMembers() } else { - var err error + filter := consul.LANMemberFilter{ + // TODO(partitions): insert the partition from the request + } if segment == api.AllSegments { - members, err = s.agent.delegate.LANMembersAllSegments() + filter.AllSegments = true } else { - members, err = s.agent.delegate.LANSegmentMembers(segment) + filter.Segment = segment } + var err error + members, err = s.agent.delegate.LANMembers(filter) if err != nil { return nil, err } @@ -547,7 +552,7 @@ func (s *HTTPHandlers) AgentJoin(resp http.ResponseWriter, req *http.Request) (i // Authorize using the agent's own enterprise meta, not the token. var authzContext acl.AuthorizerContext - s.agent.agentEnterpriseMeta().FillAuthzContext(&authzContext) + s.agent.AgentEnterpriseMeta().FillAuthzContext(&authzContext) if authz.AgentWrite(s.agent.config.NodeName, &authzContext) != acl.Allow { return nil, acl.ErrPermissionDenied } @@ -568,7 +573,8 @@ func (s *HTTPHandlers) AgentJoin(resp http.ResponseWriter, req *http.Request) (i } _, err = s.agent.JoinWAN([]string{addr}) } else { - _, err = s.agent.JoinLAN([]string{addr}) + // TODO(partitions): use the request entmeta + _, err = s.agent.JoinLAN([]string{addr}, nil) } return nil, err } @@ -584,7 +590,7 @@ func (s *HTTPHandlers) AgentLeave(resp http.ResponseWriter, req *http.Request) ( // Authorize using the agent's own enterprise meta, not the token. var authzContext acl.AuthorizerContext - s.agent.agentEnterpriseMeta().FillAuthzContext(&authzContext) + s.agent.AgentEnterpriseMeta().FillAuthzContext(&authzContext) if authz.AgentWrite(s.agent.config.NodeName, &authzContext) != acl.Allow { return nil, acl.ErrPermissionDenied } @@ -612,7 +618,8 @@ func (s *HTTPHandlers) AgentForceLeave(resp http.ResponseWriter, req *http.Reque _, prune := req.URL.Query()["prune"] addr := strings.TrimPrefix(req.URL.Path, "/v1/agent/force-leave/") - return nil, s.agent.ForceLeave(addr, prune) + // TODO(partitions): use the request entmeta + return nil, s.agent.ForceLeave(addr, prune, nil) } // syncChanges is a helper function which wraps a blocking call to sync @@ -1274,7 +1281,7 @@ func (s *HTTPHandlers) AgentNodeMaintenance(resp http.ResponseWriter, req *http. } var authzContext acl.AuthorizerContext - s.agent.agentEnterpriseMeta().FillAuthzContext(&authzContext) + s.agent.AgentEnterpriseMeta().FillAuthzContext(&authzContext) if authz.NodeWrite(s.agent.config.NodeName, &authzContext) != acl.Allow { return nil, acl.ErrPermissionDenied } @@ -1299,7 +1306,7 @@ func (s *HTTPHandlers) AgentMonitor(resp http.ResponseWriter, req *http.Request) // Authorize using the agent's own enterprise meta, not the token. var authzContext acl.AuthorizerContext - s.agent.agentEnterpriseMeta().FillAuthzContext(&authzContext) + s.agent.AgentEnterpriseMeta().FillAuthzContext(&authzContext) if authz.AgentRead(s.agent.config.NodeName, &authzContext) != acl.Allow { return nil, acl.ErrPermissionDenied } @@ -1382,7 +1389,7 @@ func (s *HTTPHandlers) AgentToken(resp http.ResponseWriter, req *http.Request) ( // Authorize using the agent's own enterprise meta, not the token. var authzContext acl.AuthorizerContext - s.agent.agentEnterpriseMeta().FillAuthzContext(&authzContext) + s.agent.AgentEnterpriseMeta().FillAuthzContext(&authzContext) if authz.AgentWrite(s.agent.config.NodeName, &authzContext) != acl.Allow { return nil, acl.ErrPermissionDenied } diff --git a/agent/agent_endpoint_test.go b/agent/agent_endpoint_test.go index 2c2ca91238..8f826ade87 100644 --- a/agent/agent_endpoint_test.go +++ b/agent/agent_endpoint_test.go @@ -1891,12 +1891,12 @@ func TestAgent_Join(t *testing.T) { t.Fatalf("Err: %v", obj) } - if len(a1.LANMembers()) != 2 { + if len(a1.LANMembersInAgentPartition()) != 2 { t.Fatalf("should have 2 members") } retry.Run(t, func(r *retry.R) { - if got, want := len(a2.LANMembers()), 2; got != want { + if got, want := len(a2.LANMembersInAgentPartition()), 2; got != want { r.Fatalf("got %d LAN members want %d", got, want) } }) @@ -2002,7 +2002,7 @@ func TestAgent_JoinLANNotify(t *testing.T) { a1.joinLANNotifier = notif addr := fmt.Sprintf("127.0.0.1:%d", a2.Config.SerfPortLAN) - _, err := a1.JoinLAN([]string{addr}) + _, err := a1.JoinLAN([]string{addr}, nil) if err != nil { t.Fatalf("err: %v", err) } @@ -2030,7 +2030,7 @@ func TestAgent_Leave(t *testing.T) { // Join first addr := fmt.Sprintf("127.0.0.1:%d", a2.Config.SerfPortLAN) - _, err := a1.JoinLAN([]string{addr}) + _, err := a1.JoinLAN([]string{addr}, nil) if err != nil { t.Fatalf("err: %v", err) } @@ -2045,7 +2045,7 @@ func TestAgent_Leave(t *testing.T) { t.Fatalf("Err: %v", obj) } retry.Run(t, func(r *retry.R) { - m := a1.LANMembers() + m := a1.LANMembersInAgentPartition() if got, want := m[1].Status, serf.StatusLeft; got != want { r.Fatalf("got status %q want %q", got, want) } @@ -2101,7 +2101,7 @@ func TestAgent_ForceLeave(t *testing.T) { // Join first addr := fmt.Sprintf("127.0.0.1:%d", a2.Config.SerfPortLAN) - _, err := a1.JoinLAN([]string{addr}) + _, err := a1.JoinLAN([]string{addr}, nil) if err != nil { t.Fatalf("err: %v", err) } @@ -2110,7 +2110,7 @@ func TestAgent_ForceLeave(t *testing.T) { a2.Shutdown() // Wait for agent being marked as failed, so we wait for full shutdown of Agent retry.Run(t, func(r *retry.R) { - m := a1.LANMembers() + m := a1.LANMembersInAgentPartition() if got, want := m[1].Status, serf.StatusFailed; got != want { r.Fatalf("got status %q want %q", got, want) } @@ -2126,7 +2126,7 @@ func TestAgent_ForceLeave(t *testing.T) { t.Fatalf("Err: %v", obj) } retry.Run(t, func(r *retry.R) { - m := a1.LANMembers() + m := a1.LANMembersInAgentPartition() if got, want := m[1].Status, serf.StatusLeft; got != want { r.Fatalf("got status %q want %q", got, want) } @@ -2210,7 +2210,7 @@ func TestAgent_ForceLeavePrune(t *testing.T) { // Join first addr := fmt.Sprintf("127.0.0.1:%d", a2.Config.SerfPortLAN) - _, err := a1.JoinLAN([]string{addr}) + _, err := a1.JoinLAN([]string{addr}, nil) if err != nil { t.Fatalf("err: %v", err) } @@ -2219,7 +2219,7 @@ func TestAgent_ForceLeavePrune(t *testing.T) { a2.Shutdown() // Wait for agent being marked as failed, so we wait for full shutdown of Agent retry.Run(t, func(r *retry.R) { - m := a1.LANMembers() + m := a1.LANMembersInAgentPartition() for _, member := range m { if member.Name == a2.Config.NodeName { if member.Status != serf.StatusFailed { @@ -2239,7 +2239,7 @@ func TestAgent_ForceLeavePrune(t *testing.T) { t.Fatalf("Err: %v", obj) } retry.Run(t, func(r *retry.R) { - m := len(a1.LANMembers()) + m := len(a1.LANMembersInAgentPartition()) if m != 1 { r.Fatalf("want one member, got %v", m) } diff --git a/agent/agent_oss.go b/agent/agent_oss.go index 88496e28be..d266b2f524 100644 --- a/agent/agent_oss.go +++ b/agent/agent_oss.go @@ -51,6 +51,6 @@ func (a *Agent) enterpriseStats() map[string]map[string]string { return nil } -func (a *Agent) agentEnterpriseMeta() *structs.EnterpriseMeta { +func (a *Agent) AgentEnterpriseMeta() *structs.EnterpriseMeta { return structs.NodeEnterpriseMetaInDefaultPartition() } diff --git a/agent/agent_test.go b/agent/agent_test.go index 6638fa35cf..79d441e2fd 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -4500,7 +4500,7 @@ services { // Now connect to server _, err := a1.JoinLAN([]string{ fmt.Sprintf("127.0.0.1:%d", a2.Config.SerfPortLAN), - }) + }, nil) require.NoError(t, err) t.Logf("joined client to server") diff --git a/agent/consul/auto_config_backend.go b/agent/consul/auto_config_backend.go index fa6b1e1689..3274000d14 100644 --- a/agent/consul/auto_config_backend.go +++ b/agent/consul/auto_config_backend.go @@ -32,7 +32,10 @@ func (b autoConfigBackend) GetCARoots() (*structs.IndexedCARoots, error) { // DatacenterJoinAddresses will return all the strings suitable for usage in // retry join operations to connect to the the LAN or LAN segment gossip pool. func (b autoConfigBackend) DatacenterJoinAddresses(segment string) ([]string, error) { - members, err := b.Server.LANSegmentMembers(segment) + members, err := b.Server.LANMembers(LANMemberFilter{ + Segment: segment, + Partition: "", // TODO(partitions): figure out what goes here + }) if err != nil { return nil, fmt.Errorf("Failed to retrieve members for segment %s - %w", segment, err) } diff --git a/agent/consul/autopilot.go b/agent/consul/autopilot.go index 22663f00be..8d17e49485 100644 --- a/agent/consul/autopilot.go +++ b/agent/consul/autopilot.go @@ -66,8 +66,9 @@ func (d *AutopilotDelegate) NotifyState(state *autopilot.State) { } func (d *AutopilotDelegate) RemoveFailedServer(srv *autopilot.Server) { + serverEntMeta := structs.DefaultEnterpriseMetaInDefaultPartition() go func() { - if err := d.server.RemoveFailedNode(srv.Name, false); err != nil { + if err := d.server.RemoveFailedNode(srv.Name, false, serverEntMeta); err != nil { d.server.logger.Error("failed to remove server", "name", srv.Name, "id", srv.ID, "error", err) } }() diff --git a/agent/consul/autopilot_test.go b/agent/consul/autopilot_test.go index 3acddc4407..8b26324112 100644 --- a/agent/consul/autopilot_test.go +++ b/agent/consul/autopilot_test.go @@ -101,7 +101,7 @@ func TestAutopilot_CleanupDeadServer(t *testing.T) { retry.Run(t, func(r *retry.R) { alive := 0 - for _, m := range servers[leaderIndex].LANMembers() { + for _, m := range servers[leaderIndex].LANMembersInAgentPartition() { if m.Status == serf.StatusAlive { alive++ } @@ -498,7 +498,7 @@ func TestAutopilot_MinQuorum(t *testing.T) { if leader == nil { r.Fatalf("no members set") } - for _, m := range leader.LANMembers() { + for _, m := range leader.LANMembersInAgentPartition() { if m.Name == dead.config.NodeName && m.Status != serf.StatusLeft { r.Fatalf("%v should be left, got %v", m.Name, m.Status.String()) } @@ -515,7 +515,7 @@ func TestAutopilot_MinQuorum(t *testing.T) { if leader == nil { r.Fatalf("no members set") } - for _, m := range leader.LANMembers() { + for _, m := range leader.LANMembersInAgentPartition() { if m.Name == dead.config.NodeName && m.Status != serf.StatusFailed { r.Fatalf("%v should be failed, got %v", m.Name, m.Status.String()) } diff --git a/agent/consul/client.go b/agent/consul/client.go index f3e337e12e..a2abc7fb76 100644 --- a/agent/consul/client.go +++ b/agent/consul/client.go @@ -177,7 +177,7 @@ func (c *Client) Shutdown() error { return nil } -// Leave is used to prepare for a graceful shutdown +// Leave is used to prepare for a graceful shutdown. func (c *Client) Leave() error { c.logger.Info("client starting leave") @@ -190,40 +190,49 @@ func (c *Client) Leave() error { return nil } -// JoinLAN is used to have Consul client join the inner-DC pool -// The target address should be another node inside the DC -// listening on the Serf LAN address -func (c *Client) JoinLAN(addrs []string) (int, error) { +// JoinLAN is used to have Consul join the inner-DC pool The target address +// should be another node inside the DC listening on the Serf LAN address +func (c *Client) JoinLAN(addrs []string, entMeta *structs.EnterpriseMeta) (int, error) { + // TODO(partitions): assert that the partitions match return c.serf.Join(addrs, true) } -// LocalMember is used to return the local node -func (c *Client) LocalMember() serf.Member { +// AgentLocalMember is used to retrieve the LAN member for the local node. +func (c *Client) AgentLocalMember() serf.Member { return c.serf.LocalMember() } -// LANMembers is used to return the members of the LAN cluster -func (c *Client) LANMembers() []serf.Member { +// LANMembersInAgentPartition returns the LAN members for this agent's +// canonical serf pool. For clients this is the only pool that exists. For +// servers it's the pool in the default segment and the default partition. +func (c *Client) LANMembersInAgentPartition() []serf.Member { return c.serf.Members() } -// LANMembersAllSegments returns members from all segments. -func (c *Client) LANMembersAllSegments() ([]serf.Member, error) { - return c.serf.Members(), nil -} +// LANMembers returns the LAN members for one of: +// +// - the requested partition +// - the requested segment +// - all segments +// +// This is limited to segments and partitions that the node is a member of. +func (c *Client) LANMembers(filter LANMemberFilter) ([]serf.Member, error) { + if err := filter.Validate(); err != nil { + return nil, err + } + + // TODO(partitions): assert that the partitions match -// LANSegmentMembers only returns our own segment's members, because clients -// can't be in multiple segments. -func (c *Client) LANSegmentMembers(segment string) ([]serf.Member, error) { - if segment == c.config.Segment { - return c.LANMembers(), nil + if !filter.AllSegments && filter.Segment != c.config.Segment { + return nil, fmt.Errorf("segment %q not found", filter.Segment) } - return nil, fmt.Errorf("segment %q not found", segment) + return c.serf.Members(), nil } -// RemoveFailedNode is used to remove a failed node from the cluster -func (c *Client) RemoveFailedNode(node string, prune bool) error { +// RemoveFailedNode is used to remove a failed node from the cluster. +func (c *Client) RemoveFailedNode(node string, prune bool, entMeta *structs.EnterpriseMeta) error { + // TODO(partitions): assert that the partitions match if prune { return c.serf.RemoveFailedNodePrune(node) } @@ -362,9 +371,9 @@ func (c *Client) Stats() map[string]map[string]string { return stats } -// GetLANCoordinate returns the network coordinate of the current node, as -// maintained by Serf. +// GetLANCoordinate returns the coordinate of the node in the LAN gossip pool. func (c *Client) GetLANCoordinate() (lib.CoordinateSet, error) { + // TODO(partitions): possibly something here lan, err := c.serf.GetCoordinate() if err != nil { return nil, err diff --git a/agent/consul/client_test.go b/agent/consul/client_test.go index 4b8ba66801..d6ae206bca 100644 --- a/agent/consul/client_test.go +++ b/agent/consul/client_test.go @@ -124,10 +124,10 @@ func TestClient_JoinLAN(t *testing.T) { if got, want := c1.router.GetLANManager().NumServers(), 1; got != want { r.Fatalf("got %d servers want %d", got, want) } - if got, want := len(s1.LANMembers()), 2; got != want { + if got, want := len(s1.LANMembersInAgentPartition()), 2; got != want { r.Fatalf("got %d server LAN members want %d", got, want) } - if got, want := len(c1.LANMembers()), 2; got != want { + if got, want := len(c1.LANMembersInAgentPartition()), 2; got != want { r.Fatalf("got %d client LAN members want %d", got, want) } }) @@ -157,8 +157,8 @@ func TestClient_LANReap(t *testing.T) { testrpc.WaitForLeader(t, c1.RPC, "dc1") retry.Run(t, func(r *retry.R) { - require.Len(r, s1.LANMembers(), 2) - require.Len(r, c1.LANMembers(), 2) + require.Len(r, s1.LANMembersInAgentPartition(), 2) + require.Len(r, c1.LANMembersInAgentPartition(), 2) }) // Check the router has both @@ -172,7 +172,7 @@ func TestClient_LANReap(t *testing.T) { s1.Shutdown() retry.Run(t, func(r *retry.R) { - require.Len(r, c1.LANMembers(), 1) + require.Len(r, c1.LANMembersInAgentPartition(), 1) server := c1.router.FindLANServer() require.Nil(t, server) }) @@ -193,15 +193,15 @@ func TestClient_JoinLAN_Invalid(t *testing.T) { defer c1.Shutdown() // Try to join - if _, err := c1.JoinLAN([]string{joinAddrLAN(s1)}); err == nil { + if _, err := c1.JoinLAN([]string{joinAddrLAN(s1)}, nil); err == nil { t.Fatal("should error") } time.Sleep(50 * time.Millisecond) - if len(s1.LANMembers()) != 1 { + if len(s1.LANMembersInAgentPartition()) != 1 { t.Fatalf("should not join") } - if len(c1.LANMembers()) != 1 { + if len(c1.LANMembersInAgentPartition()) != 1 { t.Fatalf("should not join") } } @@ -217,7 +217,7 @@ func TestClient_JoinWAN_Invalid(t *testing.T) { defer c1.Shutdown() // Try to join - if _, err := c1.JoinLAN([]string{joinAddrWAN(s1)}); err == nil { + if _, err := c1.JoinLAN([]string{joinAddrWAN(s1)}, nil); err == nil { t.Fatal("should error") } @@ -225,7 +225,7 @@ func TestClient_JoinWAN_Invalid(t *testing.T) { if len(s1.WANMembers()) != 1 { t.Fatalf("should not join") } - if len(c1.LANMembers()) != 1 { + if len(c1.LANMembersInAgentPartition()) != 1 { t.Fatalf("should not join") } } @@ -250,11 +250,11 @@ func TestClient_RPC(t *testing.T) { joinLAN(t, c1, s1) // Check the members - if len(s1.LANMembers()) != 2 { + if len(s1.LANMembersInAgentPartition()) != 2 { t.Fatalf("bad len") } - if len(c1.LANMembers()) != 2 { + if len(c1.LANMembersInAgentPartition()) != 2 { t.Fatalf("bad len") } @@ -354,10 +354,10 @@ func TestClient_RPC_Pool(t *testing.T) { // Wait for both agents to finish joining retry.Run(t, func(r *retry.R) { - if got, want := len(s1.LANMembers()), 2; got != want { + if got, want := len(s1.LANMembersInAgentPartition()), 2; got != want { r.Fatalf("got %d server LAN members want %d", got, want) } - if got, want := len(c1.LANMembers()), 2; got != want { + if got, want := len(c1.LANMembersInAgentPartition()), 2; got != want { r.Fatalf("got %d client LAN members want %d", got, want) } }) @@ -418,12 +418,12 @@ func TestClient_RPC_ConsulServerPing(t *testing.T) { c.router.GetLANManager().ResetRebalanceTimer() time.Sleep(time.Second) - if len(c.LANMembers()) != numServers+numClients { - t.Errorf("bad len: %d", len(c.LANMembers())) + if len(c.LANMembersInAgentPartition()) != numServers+numClients { + t.Errorf("bad len: %d", len(c.LANMembersInAgentPartition())) } for _, s := range servers { - if len(s.LANMembers()) != numServers+numClients { - t.Errorf("bad len: %d", len(s.LANMembers())) + if len(s.LANMembersInAgentPartition()) != numServers+numClients { + t.Errorf("bad len: %d", len(s.LANMembersInAgentPartition())) } } @@ -476,10 +476,10 @@ func TestClient_RPC_TLS(t *testing.T) { // Wait for joins to finish/RPC to succeed retry.Run(t, func(r *retry.R) { - if got, want := len(s1.LANMembers()), 2; got != want { + if got, want := len(s1.LANMembersInAgentPartition()), 2; got != want { r.Fatalf("got %d server LAN members want %d", got, want) } - if got, want := len(c1.LANMembers()), 2; got != want { + if got, want := len(c1.LANMembersInAgentPartition()), 2; got != want { r.Fatalf("got %d client LAN members want %d", got, want) } if err := c1.RPC("Status.Ping", struct{}{}, &out); err != nil { @@ -684,10 +684,10 @@ func TestClient_SnapshotRPC_TLS(t *testing.T) { // Try to join. joinLAN(t, c1, s1) retry.Run(t, func(r *retry.R) { - if got, want := len(s1.LANMembers()), 2; got != want { + if got, want := len(s1.LANMembersInAgentPartition()), 2; got != want { r.Fatalf("got %d server members want %d", got, want) } - if got, want := len(c1.LANMembers()), 2; got != want { + if got, want := len(c1.LANMembersInAgentPartition()), 2; got != want { r.Fatalf("got %d client members want %d", got, want) } @@ -746,10 +746,10 @@ func TestClientServer_UserEvent(t *testing.T) { // Check the members retry.Run(t, func(r *retry.R) { - if got, want := len(s1.LANMembers()), 2; got != want { + if got, want := len(s1.LANMembersInAgentPartition()), 2; got != want { r.Fatalf("got %d server LAN members want %d", got, want) } - if got, want := len(c1.LANMembers()), 2; got != want { + if got, want := len(c1.LANMembersInAgentPartition()), 2; got != want { r.Fatalf("got %d client LAN members want %d", got, want) } }) @@ -845,8 +845,8 @@ func TestClient_ShortReconnectTimeout(t *testing.T) { // up to 10x the time in the case of slow CI. require.Eventually(t, func() bool { - return len(cluster.Servers[0].LANMembers()) == 2 && - len(cluster.Clients[0].LANMembers()) == 2 + return len(cluster.Servers[0].LANMembersInAgentPartition()) == 2 && + len(cluster.Clients[0].LANMembersInAgentPartition()) == 2 }, time.Second, diff --git a/agent/consul/helper_test.go b/agent/consul/helper_test.go index e8477bd001..11309b1f79 100644 --- a/agent/consul/helper_test.go +++ b/agent/consul/helper_test.go @@ -108,8 +108,8 @@ func joinAddrWAN(s *Server) string { } type clientOrServer interface { - JoinLAN(addrs []string) (int, error) - LANMembers() []serf.Member + JoinLAN(addrs []string, entMeta *structs.EnterpriseMeta) (int, error) + LANMembersInAgentPartition() []serf.Member } // joinLAN is a convenience function for @@ -129,15 +129,15 @@ func joinLAN(t *testing.T, member clientOrServer, leader *Server) { memberAddr = fmt.Sprintf("127.0.0.1:%d", x.config.SerfLANConfig.MemberlistConfig.BindPort) } leaderAddr := joinAddrLAN(leader) - if _, err := member.JoinLAN([]string{leaderAddr}); err != nil { + if _, err := member.JoinLAN([]string{leaderAddr}, nil); err != nil { t.Fatal(err) } retry.Run(t, func(r *retry.R) { - if !seeEachOther(leader.LANMembers(), member.LANMembers(), leaderAddr, memberAddr) { + if !seeEachOther(leader.LANMembersInAgentPartition(), member.LANMembersInAgentPartition(), leaderAddr, memberAddr) { r.Fatalf("leader and member cannot see each other on LAN") } }) - if !seeEachOther(leader.LANMembers(), member.LANMembers(), leaderAddr, memberAddr) { + if !seeEachOther(leader.LANMembersInAgentPartition(), member.LANMembersInAgentPartition(), leaderAddr, memberAddr) { t.Fatalf("leader and member cannot see each other on LAN") } } diff --git a/agent/consul/leader_test.go b/agent/consul/leader_test.go index c6544148d5..635999b33c 100644 --- a/agent/consul/leader_test.go +++ b/agent/consul/leader_test.go @@ -249,7 +249,7 @@ func TestLeader_ReapMember(t *testing.T) { }) // Simulate a node reaping - mems := s1.LANMembers() + mems := s1.LANMembersInAgentPartition() var c1mem serf.Member for _, m := range mems { if m.Name == c1.config.NodeName { @@ -688,7 +688,7 @@ func TestLeader_LeftServer(t *testing.T) { servers[0].Shutdown() // Force remove the non-leader (transition to left state) - if err := servers[1].RemoveFailedNode(servers[0].config.NodeName, false); err != nil { + if err := servers[1].RemoveFailedNode(servers[0].config.NodeName, false, nil); err != nil { t.Fatalf("err: %v", err) } @@ -1045,7 +1045,7 @@ func TestLeader_ChangeServerID(t *testing.T) { retry.Run(t, func(r *retry.R) { alive := 0 - for _, m := range s1.LANMembers() { + for _, m := range s1.LANMembersInAgentPartition() { if m.Status == serf.StatusAlive { alive++ } @@ -1118,10 +1118,10 @@ func TestLeader_ChangeNodeID(t *testing.T) { // Shut down a server, freeing up its address/port s3.Shutdown() - // wait for s1.LANMembers() to show s3 as StatusFailed or StatusLeft on + // wait for s1.LANMembersInAgentPartition() to show s3 as StatusFailed or StatusLeft on retry.Run(t, func(r *retry.R) { var gone bool - for _, m := range s1.LANMembers() { + for _, m := range s1.LANMembersInAgentPartition() { if m.Name == s3.config.NodeName && (m.Status == serf.StatusFailed || m.Status == serf.StatusLeft) { gone = true } @@ -1149,7 +1149,7 @@ func TestLeader_ChangeNodeID(t *testing.T) { }) retry.Run(t, func(r *retry.R) { - for _, m := range s1.LANMembers() { + for _, m := range s1.LANMembersInAgentPartition() { require.Equal(r, serf.StatusAlive, m.Status) } }) diff --git a/agent/consul/segment_oss.go b/agent/consul/segment_oss.go index 8e8936a3c5..1398087a41 100644 --- a/agent/consul/segment_oss.go +++ b/agent/consul/segment_oss.go @@ -20,20 +20,6 @@ var SegmentOSSSummaries = []prometheus.SummaryDefinition{ }, } -// LANMembersAllSegments returns members from all segments. -func (s *Server) LANMembersAllSegments() ([]serf.Member, error) { - return s.LANMembers(), nil -} - -// LANSegmentMembers is used to return the members of the given LAN segment. -func (s *Server) LANSegmentMembers(segment string) ([]serf.Member, error) { - if segment == "" { - return s.LANMembers(), nil - } - - return nil, structs.ErrSegmentsNotSupported -} - // LANSegmentAddr is used to return the address used for the given LAN segment. func (s *Server) LANSegmentAddr(name string) string { return "" diff --git a/agent/consul/serf_filter.go b/agent/consul/serf_filter.go new file mode 100644 index 0000000000..4ea2811d20 --- /dev/null +++ b/agent/consul/serf_filter.go @@ -0,0 +1,27 @@ +package consul + +import ( + "fmt" + + "github.com/hashicorp/consul/agent/structs" +) + +type LANMemberFilter struct { + Partition string + Segment string + AllSegments bool +} + +func (f LANMemberFilter) Validate() error { + if f.AllSegments && f.Segment != "" { + return fmt.Errorf("cannot specify both allSegments and segment filters") + } + if (f.AllSegments || f.Segment != "") && !structs.IsDefaultPartition(f.Partition) { + return fmt.Errorf("segments do not exist outside of the default partition") + } + return nil +} + +func (f LANMemberFilter) PartitionOrDefault() string { + return structs.PartitionOrDefault(f.Partition) +} diff --git a/agent/consul/server.go b/agent/consul/server.go index 65f09d204f..5069e4d802 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -561,7 +561,7 @@ func NewServer(config *Config, flat Deps) (*Server, error) { WithDatacenter(s.config.Datacenter). WithReportingInterval(s.config.MetricsReportingInterval). WithGetMembersFunc(func() []serf.Member { - members, err := s.LANMembersAllSegments() + members, err := s.lanPoolAllMembers() if err != nil { return []serf.Member{} } @@ -993,7 +993,7 @@ func (s *Server) attemptLeadershipTransfer() (success bool) { return true } -// Leave is used to prepare for a graceful shutdown of the server +// Leave is used to prepare for a graceful shutdown. func (s *Server) Leave() error { s.logger.Info("server starting leave") @@ -1097,10 +1097,10 @@ func (s *Server) Leave() error { return nil } -// JoinLAN is used to have Consul join the inner-DC pool -// The target address should be another node inside the DC -// listening on the Serf LAN address -func (s *Server) JoinLAN(addrs []string) (int, error) { +// JoinLAN is used to have Consul join the inner-DC pool The target address +// should be another node inside the DC listening on the Serf LAN address +func (s *Server) JoinLAN(addrs []string, entMeta *structs.EnterpriseMeta) (int, error) { + // TODO(partitions): handle the different partitions return s.serfLAN.Join(addrs, true) } @@ -1149,13 +1149,13 @@ func (s *Server) PrimaryGatewayFallbackAddresses() []string { return s.gatewayLocator.PrimaryGatewayFallbackAddresses() } -// LocalMember is used to return the local node -func (s *Server) LocalMember() serf.Member { +// AgentLocalMember is used to retrieve the LAN member for the local node. +func (s *Server) AgentLocalMember() serf.Member { return s.serfLAN.LocalMember() } -// LANMembers is used to return the members of the LAN cluster -func (s *Server) LANMembers() []serf.Member { +// LANMembersInAgentPartition is used to return the members of the LAN cluster +func (s *Server) LANMembersInAgentPartition() []serf.Member { return s.serfLAN.Members() } @@ -1167,8 +1167,9 @@ func (s *Server) WANMembers() []serf.Member { return s.serfWAN.Members() } -// RemoveFailedNode is used to remove a failed node from the cluster -func (s *Server) RemoveFailedNode(node string, prune bool) error { +// RemoveFailedNode is used to remove a failed node from the cluster. +func (s *Server) RemoveFailedNode(node string, prune bool, entMeta *structs.EnterpriseMeta) error { + // TODO(partitions): handle the different partitions var removeFn func(*serf.Serf, string) error if prune { removeFn = (*serf.Serf).RemoveFailedNodePrune diff --git a/agent/consul/server_oss.go b/agent/consul/server_oss.go index 5b43c0cfed..ad5a30eb07 100644 --- a/agent/consul/server_oss.go +++ b/agent/consul/server_oss.go @@ -1,3 +1,4 @@ +//go:build !consulent // +build !consulent package consul @@ -5,6 +6,8 @@ package consul import ( "github.com/hashicorp/serf/serf" "google.golang.org/grpc" + + "github.com/hashicorp/consul/agent/structs" ) func (s *Server) removeFailedNodeEnterprise(remove func(*serf.Serf, string) error, node, wanNode string) error { @@ -13,3 +16,26 @@ func (s *Server) removeFailedNodeEnterprise(remove func(*serf.Serf, string) erro } func (s *Server) registerEnterpriseGRPCServices(deps Deps, srv *grpc.Server) {} + +// lanPoolAllMembers only returns our own segment or partition's members, because +// OSS servers can't be in multiple segments or partitions. +func (s *Server) lanPoolAllMembers() ([]serf.Member, error) { + return s.LANMembersInAgentPartition(), nil +} + +// LANMembers returns the LAN members for one of: +// +// - the requested partition +// - the requested segment +// - all segments +// +// This is limited to segments and partitions that the node is a member of. +func (s *Server) LANMembers(filter LANMemberFilter) ([]serf.Member, error) { + if err := filter.Validate(); err != nil { + return nil, err + } + if filter.Segment != "" { + return nil, structs.ErrSegmentsNotSupported + } + return s.LANMembersInAgentPartition(), nil +} diff --git a/agent/consul/server_test.go b/agent/consul/server_test.go index 30499d59ee..890489668d 100644 --- a/agent/consul/server_test.go +++ b/agent/consul/server_test.go @@ -380,10 +380,10 @@ func TestServer_JoinLAN(t *testing.T) { // Try to join joinLAN(t, s2, s1) retry.Run(t, func(r *retry.R) { - if got, want := len(s1.LANMembers()), 2; got != want { + if got, want := len(s1.LANMembersInAgentPartition()), 2; got != want { r.Fatalf("got %d s1 LAN members want %d", got, want) } - if got, want := len(s2.LANMembers()), 2; got != want { + if got, want := len(s2.LANMembersInAgentPartition()), 2; got != want { r.Fatalf("got %d s2 LAN members want %d", got, want) } }) @@ -428,17 +428,17 @@ func TestServer_JoinLAN_SerfAllowedCIDRs(t *testing.T) { defer rs3.Shutdown() leaderAddr := joinAddrLAN(s1) - if _, err := a2.JoinLAN([]string{leaderAddr}); err != nil { + if _, err := a2.JoinLAN([]string{leaderAddr}, nil); err != nil { t.Fatalf("Expected no error, had: %#v", err) } // Try to join joinWAN(t, rs3, s1) retry.Run(t, func(r *retry.R) { - if got, want := len(s1.LANMembers()), 1; got != want { + if got, want := len(s1.LANMembersInAgentPartition()), 1; got != want { // LAN is blocked, should be 1 only r.Fatalf("got %d s1 LAN members want %d", got, want) } - if got, want := len(a2.LANMembers()), 2; got != want { + if got, want := len(a2.LANMembersInAgentPartition()), 2; got != want { // LAN is blocked a2 can see s1, but not s1 r.Fatalf("got %d a2 LAN members want %d", got, want) } @@ -497,9 +497,9 @@ func TestServer_LANReap(t *testing.T) { testrpc.WaitForLeader(t, s3.RPC, "dc1") retry.Run(t, func(r *retry.R) { - require.Len(r, s1.LANMembers(), 3) - require.Len(r, s2.LANMembers(), 3) - require.Len(r, s3.LANMembers(), 3) + require.Len(r, s1.LANMembersInAgentPartition(), 3) + require.Len(r, s2.LANMembersInAgentPartition(), 3) + require.Len(r, s3.LANMembersInAgentPartition(), 3) }) // Check the router has both @@ -513,7 +513,7 @@ func TestServer_LANReap(t *testing.T) { s2.Shutdown() retry.Run(t, func(r *retry.R) { - require.Len(r, s1.LANMembers(), 2) + require.Len(r, s1.LANMembersInAgentPartition(), 2) servers := s1.serverLookup.Servers() require.Len(r, servers, 2) // require.Equal(r, s1.config.NodeName, servers[0].Name) @@ -930,10 +930,10 @@ func TestServer_JoinSeparateLanAndWanAddresses(t *testing.T) { if got, want := len(s2.WANMembers()), 3; got != want { r.Fatalf("got %d s2 WAN members want %d", got, want) } - if got, want := len(s2.LANMembers()), 2; got != want { + if got, want := len(s2.LANMembersInAgentPartition()), 2; got != want { r.Fatalf("got %d s2 LAN members want %d", got, want) } - if got, want := len(s3.LANMembers()), 2; got != want { + if got, want := len(s3.LANMembersInAgentPartition()), 2; got != want { r.Fatalf("got %d s3 LAN members want %d", got, want) } }) @@ -964,7 +964,7 @@ func TestServer_JoinSeparateLanAndWanAddresses(t *testing.T) { // Get and check the lan address of s2 from s3 var s2LanAddr string - for _, lanmember := range s3.LANMembers() { + for _, lanmember := range s3.LANMembersInAgentPartition() { if lanmember.Name == s2Name { s2LanAddr = lanmember.Addr.String() } diff --git a/agent/delegate_mock_test.go b/agent/delegate_mock_test.go index 2273fbd84f..678b0b87bc 100644 --- a/agent/delegate_mock_test.go +++ b/agent/delegate_mock_test.go @@ -25,31 +25,26 @@ func (m *delegateMock) Leave() error { return m.Called().Error(0) } -func (m *delegateMock) LANMembers() []serf.Member { +func (m *delegateMock) LANMembersInAgentPartition() []serf.Member { return m.Called().Get(0).([]serf.Member) } -func (m *delegateMock) LANMembersAllSegments() ([]serf.Member, error) { - ret := m.Called() - return ret.Get(0).([]serf.Member), ret.Error(1) -} - -func (m *delegateMock) LANSegmentMembers(segment string) ([]serf.Member, error) { - ret := m.Called() +func (m *delegateMock) LANMembers(f consul.LANMemberFilter) ([]serf.Member, error) { + ret := m.Called(f) return ret.Get(0).([]serf.Member), ret.Error(1) } -func (m *delegateMock) LocalMember() serf.Member { +func (m *delegateMock) AgentLocalMember() serf.Member { return m.Called().Get(0).(serf.Member) } -func (m *delegateMock) JoinLAN(addrs []string) (n int, err error) { - ret := m.Called(addrs) +func (m *delegateMock) JoinLAN(addrs []string, entMeta *structs.EnterpriseMeta) (n int, err error) { + ret := m.Called(addrs, entMeta) return ret.Int(0), ret.Error(1) } -func (m *delegateMock) RemoveFailedNode(node string, prune bool) error { - return m.Called(node, prune).Error(0) +func (m *delegateMock) RemoveFailedNode(node string, prune bool, entMeta *structs.EnterpriseMeta) error { + return m.Called(node, prune, entMeta).Error(0) } func (m *delegateMock) ResolveTokenToIdentity(token string) (structs.ACLIdentity, error) { diff --git a/agent/dns.go b/agent/dns.go index b4f7b0c6d7..b95dbefec8 100644 --- a/agent/dns.go +++ b/agent/dns.go @@ -136,7 +136,7 @@ func NewDNSServer(a *Agent) (*DNSServer, error) { domain: domain, altDomain: altDomain, logger: a.logger.Named(logging.DNS), - defaultEnterpriseMeta: *a.agentEnterpriseMeta(), + defaultEnterpriseMeta: *a.AgentEnterpriseMeta(), } cfg, err := GetDNSConfig(a.config) if err != nil { diff --git a/agent/dns_test.go b/agent/dns_test.go index ccbf5b012a..32fdef687a 100644 --- a/agent/dns_test.go +++ b/agent/dns_test.go @@ -6250,7 +6250,7 @@ func TestDNS_NonExistentDC_RPC(t *testing.T) { // Join LAN cluster addr := fmt.Sprintf("127.0.0.1:%d", s.Config.SerfPortLAN) - _, err := c.JoinLAN([]string{addr}) + _, err := c.JoinLAN([]string{addr}, nil) require.NoError(t, err) testrpc.WaitForTestAgent(t, c.RPC, "dc1") diff --git a/agent/retry_join.go b/agent/retry_join.go index feaada6657..8cfb00e22e 100644 --- a/agent/retry_join.go +++ b/agent/retry_join.go @@ -5,10 +5,11 @@ import ( "strings" "time" - "github.com/hashicorp/consul/lib" discover "github.com/hashicorp/go-discover" discoverk8s "github.com/hashicorp/go-discover/provider/k8s" "github.com/hashicorp/go-hclog" + + "github.com/hashicorp/consul/lib" ) func (a *Agent) retryJoinLAN() { @@ -18,8 +19,12 @@ func (a *Agent) retryJoinLAN() { addrs: a.config.RetryJoinLAN, maxAttempts: a.config.RetryJoinMaxAttemptsLAN, interval: a.config.RetryJoinIntervalLAN, - join: a.JoinLAN, - logger: a.logger.With("cluster", "LAN"), + join: func(addrs []string) (int, error) { + // NOTE: For partitioned servers you are only capable of using retry join + // to join nodes in the default partition. + return a.JoinLAN(addrs, a.AgentEnterpriseMeta()) + }, + logger: a.logger.With("cluster", "LAN"), } if err := r.retryJoin(); err != nil { a.retryJoinCh <- err diff --git a/agent/service_manager_test.go b/agent/service_manager_test.go index e630acf136..9831144d09 100644 --- a/agent/service_manager_test.go +++ b/agent/service_manager_test.go @@ -330,7 +330,7 @@ func TestServiceManager_PersistService_API(t *testing.T) { // Join first _, err := a.JoinLAN([]string{ fmt.Sprintf("127.0.0.1:%d", serverAgent.Config.SerfPortLAN), - }) + }, nil) require.NoError(err) testrpc.WaitForLeader(t, a.RPC, "dc1") @@ -589,7 +589,7 @@ func TestServiceManager_PersistService_ConfigFiles(t *testing.T) { // Join first _, err := a.JoinLAN([]string{ fmt.Sprintf("127.0.0.1:%d", serverAgent.Config.SerfPortLAN), - }) + }, nil) require.NoError(t, err) testrpc.WaitForLeader(t, a.RPC, "dc1") diff --git a/agent/structs/structs_oss.go b/agent/structs/structs_oss.go index 2d590ad06a..aa43c8fabb 100644 --- a/agent/structs/structs_oss.go +++ b/agent/structs/structs_oss.go @@ -75,6 +75,14 @@ func (m *EnterpriseMeta) PartitionOrDefault() string { return "default" } +func EqualPartitions(_, _ string) bool { + return true +} + +func IsDefaultPartition(partition string) bool { + return true +} + func PartitionOrDefault(_ string) string { return "default" } diff --git a/command/agent/agent.go b/command/agent/agent.go index 37ec6de84d..1525c28196 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -114,7 +114,9 @@ func (c *cmd) startupJoin(agent *agent.Agent, cfg *config.RuntimeConfig) error { } c.logger.Info("Joining cluster") - n, err := agent.JoinLAN(cfg.StartJoinAddrsLAN) + // NOTE: For partitioned servers you are only capable of using start join + // to join nodes in the default partition. + n, err := agent.JoinLAN(cfg.StartJoinAddrsLAN, agent.AgentEnterpriseMeta()) if err != nil { return err } diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index 074702130a..45fc700a4b 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -101,7 +101,7 @@ func TestRetryJoin(t *testing.T) { testrpc.WaitForLeader(t, a.RPC, "dc1") retry.Run(t, func(r *retry.R) { - if got, want := len(a.LANMembers()), 2; got != want { + if got, want := len(a.LANMembersInAgentPartition()), 2; got != want { r.Fatalf("got %d LAN members want %d", got, want) } }) diff --git a/command/forceleave/forceleave_test.go b/command/forceleave/forceleave_test.go index aed0606d41..f69059ca05 100644 --- a/command/forceleave/forceleave_test.go +++ b/command/forceleave/forceleave_test.go @@ -4,10 +4,11 @@ import ( "strings" "testing" - "github.com/hashicorp/consul/agent" - "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/serf/serf" "github.com/mitchellh/cli" + + "github.com/hashicorp/consul/agent" + "github.com/hashicorp/consul/sdk/testutil/retry" ) func TestForceLeaveCommand_noTabs(t *testing.T) { @@ -28,7 +29,7 @@ func TestForceLeaveCommand(t *testing.T) { defer a1.Shutdown() defer a2.Shutdown() - _, err := a2.JoinLAN([]string{a1.Config.SerfBindAddrLAN.String()}) + _, err := a2.JoinLAN([]string{a1.Config.SerfBindAddrLAN.String()}, nil) if err != nil { t.Fatalf("err: %s", err) } @@ -48,12 +49,12 @@ func TestForceLeaveCommand(t *testing.T) { t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String()) } - m := a1.LANMembers() + m := a1.LANMembersInAgentPartition() if len(m) != 2 { t.Fatalf("should have 2 members: %#v", m) } retry.Run(t, func(r *retry.R) { - m = a1.LANMembers() + m = a1.LANMembersInAgentPartition() if got, want := m[1].Status, serf.StatusLeft; got != want { r.Fatalf("got status %q want %q", got, want) } @@ -93,7 +94,7 @@ func TestForceLeaveCommand_prune(t *testing.T) { a2 := agent.StartTestAgent(t, agent.TestAgent{Name: "Agent2"}) defer a2.Shutdown() - _, err := a2.JoinLAN([]string{a1.Config.SerfBindAddrLAN.String()}) + _, err := a2.JoinLAN([]string{a1.Config.SerfBindAddrLAN.String()}, nil) if err != nil { t.Fatalf("err: %s", err) } @@ -114,7 +115,7 @@ func TestForceLeaveCommand_prune(t *testing.T) { t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String()) } retry.Run(t, func(r *retry.R) { - m := len(a1.LANMembers()) + m := len(a1.LANMembersInAgentPartition()) if m != 1 { r.Fatalf("should have 1 members, got %#v", m) } diff --git a/command/join/join_test.go b/command/join/join_test.go index da9d02f5b1..6bb9235d44 100644 --- a/command/join/join_test.go +++ b/command/join/join_test.go @@ -38,8 +38,8 @@ func TestJoinCommandJoin_lan(t *testing.T) { t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String()) } - if len(a1.LANMembers()) != 2 { - t.Fatalf("bad: %#v", a1.LANMembers()) + if len(a1.LANMembersInAgentPartition()) != 2 { + t.Fatalf("bad: %#v", a1.LANMembersInAgentPartition()) } }