agent: refactor the agent delegate interface to be partition friendly (#11429)

pull/11434/head
R.B. Boyer 3 years ago committed by GitHub
parent ff6a33511e
commit ef559dfdd4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,3 @@
```release-note:improvement
agent: refactor the agent delegate interface to be partition friendly
```

@ -99,7 +99,7 @@ func (a *Agent) vetCheckRegisterWithAuthorizer(authz acl.Authorizer, check *stru
}
} else {
if authz.NodeWrite(a.config.NodeName, &authzContext) != acl.Allow {
return acl.PermissionDenied("Missing node:write on %s", structs.NodeNameString(a.config.NodeName, a.agentEnterpriseMeta()))
return acl.PermissionDenied("Missing node:write on %s", structs.NodeNameString(a.config.NodeName, a.AgentEnterpriseMeta()))
}
}
@ -111,7 +111,7 @@ func (a *Agent) vetCheckRegisterWithAuthorizer(authz acl.Authorizer, check *stru
}
} else {
if authz.NodeWrite(a.config.NodeName, &authzContext) != acl.Allow {
return acl.PermissionDenied("Missing node:write on %s", structs.NodeNameString(a.config.NodeName, a.agentEnterpriseMeta()))
return acl.PermissionDenied("Missing node:write on %s", structs.NodeNameString(a.config.NodeName, a.AgentEnterpriseMeta()))
}
}
}
@ -131,7 +131,7 @@ func (a *Agent) vetCheckUpdateWithAuthorizer(authz acl.Authorizer, checkID struc
}
} else {
if authz.NodeWrite(a.config.NodeName, &authzContext) != acl.Allow {
return acl.PermissionDenied("Missing node:write on %s", structs.NodeNameString(a.config.NodeName, a.agentEnterpriseMeta()))
return acl.PermissionDenied("Missing node:write on %s", structs.NodeNameString(a.config.NodeName, a.AgentEnterpriseMeta()))
}
}
} else {

@ -128,25 +128,21 @@ func (a *TestACLAgent) GetLANCoordinate() (lib.CoordinateSet, error) {
func (a *TestACLAgent) Leave() error {
return fmt.Errorf("Unimplemented")
}
func (a *TestACLAgent) LANMembers() []serf.Member {
func (a *TestACLAgent) LANMembersInAgentPartition() []serf.Member {
return nil
}
func (a *TestACLAgent) LANMembersAllSegments() ([]serf.Member, error) {
func (a *TestACLAgent) LANMembers(f consul.LANMemberFilter) ([]serf.Member, error) {
return nil, fmt.Errorf("Unimplemented")
}
func (a *TestACLAgent) LANSegmentMembers(segment string) ([]serf.Member, error) {
return nil, fmt.Errorf("Unimplemented")
}
func (a *TestACLAgent) LocalMember() serf.Member {
func (a *TestACLAgent) AgentLocalMember() serf.Member {
return serf.Member{}
}
func (a *TestACLAgent) JoinLAN(addrs []string) (n int, err error) {
func (a *TestACLAgent) JoinLAN(addrs []string, entMeta *structs.EnterpriseMeta) (n int, err error) {
return 0, fmt.Errorf("Unimplemented")
}
func (a *TestACLAgent) RemoveFailedNode(node string, prune bool) error {
func (a *TestACLAgent) RemoveFailedNode(node string, prune bool, entMeta *structs.EnterpriseMeta) error {
return fmt.Errorf("Unimplemented")
}
func (a *TestACLAgent) RPC(method string, args interface{}, reply interface{}) error {
return fmt.Errorf("Unimplemented")
}

@ -124,14 +124,35 @@ func ConfigSourceFromName(name string) (configSource, bool) {
// delegate defines the interface shared by both
// consul.Client and consul.Server.
type delegate interface {
GetLANCoordinate() (lib.CoordinateSet, error)
// Leave is used to prepare for a graceful shutdown.
Leave() error
LANMembers() []serf.Member
LANMembersAllSegments() ([]serf.Member, error)
LANSegmentMembers(segment string) ([]serf.Member, error)
LocalMember() serf.Member
JoinLAN(addrs []string) (n int, err error)
RemoveFailedNode(node string, prune bool) error
// AgentLocalMember is used to retrieve the LAN member for the local node.
AgentLocalMember() serf.Member
// LANMembersInAgentPartition returns the LAN members for this agent's
// canonical serf pool. For clients this is the only pool that exists. For
// servers it's the pool in the default segment and the default partition.
LANMembersInAgentPartition() []serf.Member
// LANMembers returns the LAN members for one of:
//
// - the requested partition
// - the requested segment
// - all segments
//
// This is limited to segments and partitions that the node is a member of.
LANMembers(f consul.LANMemberFilter) ([]serf.Member, error)
// GetLANCoordinate returns the coordinate of the node in the LAN gossip pool.
GetLANCoordinate() (lib.CoordinateSet, error)
// JoinLAN is used to have Consul join the inner-DC pool The target address
// should be another node inside the DC listening on the Serf LAN address
JoinLAN(addrs []string, entMeta *structs.EnterpriseMeta) (n int, err error)
// RemoveFailedNode is used to remove a failed node from the cluster.
RemoveFailedNode(node string, prune bool, entMeta *structs.EnterpriseMeta) error
// TODO: replace this method with consul.ACLResolver
ResolveTokenToIdentity(token string) (structs.ACLIdentity, error)
@ -515,8 +536,11 @@ func (a *Agent) Start(ctx context.Context) error {
a.delegate = client
}
// the staggering of the state syncing depends on the cluster size.
a.sync.ClusterSize = func() int { return len(a.delegate.LANMembers()) }
// The staggering of the state syncing depends on the cluster size.
//
// NOTE: we will use the agent's canonical serf pool for this since that's
// similarly scoped with the state store side of anti-entropy.
a.sync.ClusterSize = func() int { return len(a.delegate.LANMembersInAgentPartition()) }
// link the state with the consul server/client and the state syncer
// via callbacks. After several attempts this was easier than using
@ -1443,9 +1467,9 @@ func (a *Agent) ShutdownCh() <-chan struct{} {
}
// JoinLAN is used to have the agent join a LAN cluster
func (a *Agent) JoinLAN(addrs []string) (n int, err error) {
func (a *Agent) JoinLAN(addrs []string, entMeta *structs.EnterpriseMeta) (n int, err error) {
a.logger.Info("(LAN) joining", "lan_addresses", addrs)
n, err = a.delegate.JoinLAN(addrs)
n, err = a.delegate.JoinLAN(addrs, entMeta)
if err == nil {
a.logger.Info("(LAN) joined", "number_of_nodes", n)
if a.joinLANNotifier != nil {
@ -1510,12 +1534,13 @@ func (a *Agent) RefreshPrimaryGatewayFallbackAddresses(addrs []string) error {
}
// ForceLeave is used to remove a failed node from the cluster
func (a *Agent) ForceLeave(node string, prune bool) (err error) {
func (a *Agent) ForceLeave(node string, prune bool, entMeta *structs.EnterpriseMeta) (err error) {
a.logger.Info("Force leaving node", "node", node)
// TODO(partitions): merge IsMember into the RemoveFailedNode call.
if ok := a.IsMember(node); !ok {
return fmt.Errorf("agent: No node found with name '%s'", node)
}
err = a.delegate.RemoveFailedNode(node, prune)
err = a.delegate.RemoveFailedNode(node, prune, entMeta)
if err != nil {
a.logger.Warn("Failed to remove node",
"node", node,
@ -1525,20 +1550,19 @@ func (a *Agent) ForceLeave(node string, prune bool) (err error) {
return err
}
// LocalMember is used to return the local node
func (a *Agent) LocalMember() serf.Member {
return a.delegate.LocalMember()
// AgentLocalMember is used to retrieve the LAN member for the local node.
func (a *Agent) AgentLocalMember() serf.Member {
return a.delegate.AgentLocalMember()
}
// LANMembers is used to retrieve the LAN members
func (a *Agent) LANMembers() []serf.Member {
// TODO(partitions): filter this by the partition?
return a.delegate.LANMembers()
// LANMembersInAgentPartition is used to retrieve the LAN members for this
// agent's partition.
func (a *Agent) LANMembersInAgentPartition() []serf.Member {
return a.delegate.LANMembersInAgentPartition()
}
// WANMembers is used to retrieve the WAN members
func (a *Agent) WANMembers() []serf.Member {
// TODO(partitions): filter this by the partition by omitting wan results for now?
if srv, ok := a.delegate.(*consul.Server); ok {
return srv.WANMembers()
}
@ -1548,7 +1572,7 @@ func (a *Agent) WANMembers() []serf.Member {
// IsMember is used to check if a node with the given nodeName
// is a member
func (a *Agent) IsMember(nodeName string) bool {
for _, m := range a.LANMembers() {
for _, m := range a.LANMembersInAgentPartition() {
if m.Name == nodeName {
return true
}
@ -1626,12 +1650,12 @@ OUTER:
for {
rate := a.config.SyncCoordinateRateTarget
min := a.config.SyncCoordinateIntervalMin
intv := lib.RateScaledInterval(rate, min, len(a.LANMembers()))
intv := lib.RateScaledInterval(rate, min, len(a.LANMembersInAgentPartition()))
intv = intv + lib.RandomStagger(intv)
select {
case <-time.After(intv):
members := a.LANMembers()
members := a.LANMembersInAgentPartition()
grok, err := consul.CanServersUnderstandProtocol(members, 3)
if err != nil {
a.logger.Error("Failed to check servers", "error", err)
@ -1655,7 +1679,7 @@ OUTER:
Node: a.config.NodeName,
Segment: segment,
Coord: coord,
EnterpriseMeta: *a.agentEnterpriseMeta(),
EnterpriseMeta: *a.AgentEnterpriseMeta(),
WriteRequest: structs.WriteRequest{Token: agentToken},
}
var reply struct{}
@ -2732,7 +2756,7 @@ func (a *Agent) addCheck(check *structs.HealthCheck, chkType *structs.CheckType,
var rpcReq structs.NodeSpecificRequest
rpcReq.Datacenter = a.config.Datacenter
rpcReq.EnterpriseMeta = *a.agentEnterpriseMeta()
rpcReq.EnterpriseMeta = *a.AgentEnterpriseMeta()
// The token to set is really important. The behavior below follows
// the same behavior as anti-entropy: we use the user-specified token

@ -20,6 +20,7 @@ import (
"github.com/hashicorp/consul/acl"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/debug"
"github.com/hashicorp/consul/agent/structs"
token_store "github.com/hashicorp/consul/agent/token"
@ -58,7 +59,7 @@ func (s *HTTPHandlers) AgentSelf(resp http.ResponseWriter, req *http.Request) (i
// Authorize using the agent's own enterprise meta, not the token.
var authzContext acl.AuthorizerContext
s.agent.agentEnterpriseMeta().FillAuthzContext(&authzContext)
s.agent.AgentEnterpriseMeta().FillAuthzContext(&authzContext)
if authz.AgentRead(s.agent.config.NodeName, &authzContext) != acl.Allow {
return nil, acl.ErrPermissionDenied
}
@ -104,7 +105,7 @@ func (s *HTTPHandlers) AgentSelf(resp http.ResponseWriter, req *http.Request) (i
Config: config,
DebugConfig: s.agent.config.Sanitized(),
Coord: cs[s.agent.config.SegmentName],
Member: s.agent.LocalMember(),
Member: s.agent.AgentLocalMember(),
Stats: s.agent.Stats(),
Meta: s.agent.State.Metadata(),
XDS: xds,
@ -148,7 +149,7 @@ func (s *HTTPHandlers) AgentMetrics(resp http.ResponseWriter, req *http.Request)
// Authorize using the agent's own enterprise meta, not the token.
var authzContext acl.AuthorizerContext
s.agent.agentEnterpriseMeta().FillAuthzContext(&authzContext)
s.agent.AgentEnterpriseMeta().FillAuthzContext(&authzContext)
if authz.AgentRead(s.agent.config.NodeName, &authzContext) != acl.Allow {
return nil, acl.ErrPermissionDenied
}
@ -183,7 +184,7 @@ func (s *HTTPHandlers) AgentMetricsStream(resp http.ResponseWriter, req *http.Re
// Authorize using the agent's own enterprise meta, not the token.
var authzContext acl.AuthorizerContext
s.agent.agentEnterpriseMeta().FillAuthzContext(&authzContext)
s.agent.AgentEnterpriseMeta().FillAuthzContext(&authzContext)
if authz.AgentRead(s.agent.config.NodeName, &authzContext) != acl.Allow {
return nil, acl.ErrPermissionDenied
}
@ -236,7 +237,7 @@ func (s *HTTPHandlers) AgentReload(resp http.ResponseWriter, req *http.Request)
// Authorize using the agent's own enterprise meta, not the token.
var authzContext acl.AuthorizerContext
s.agent.agentEnterpriseMeta().FillAuthzContext(&authzContext)
s.agent.AgentEnterpriseMeta().FillAuthzContext(&authzContext)
if authz.AgentWrite(s.agent.config.NodeName, &authzContext) != acl.Allow {
return nil, acl.ErrPermissionDenied
}
@ -520,12 +521,16 @@ func (s *HTTPHandlers) AgentMembers(resp http.ResponseWriter, req *http.Request)
if wan {
members = s.agent.WANMembers()
} else {
var err error
filter := consul.LANMemberFilter{
// TODO(partitions): insert the partition from the request
}
if segment == api.AllSegments {
members, err = s.agent.delegate.LANMembersAllSegments()
filter.AllSegments = true
} else {
members, err = s.agent.delegate.LANSegmentMembers(segment)
filter.Segment = segment
}
var err error
members, err = s.agent.delegate.LANMembers(filter)
if err != nil {
return nil, err
}
@ -547,7 +552,7 @@ func (s *HTTPHandlers) AgentJoin(resp http.ResponseWriter, req *http.Request) (i
// Authorize using the agent's own enterprise meta, not the token.
var authzContext acl.AuthorizerContext
s.agent.agentEnterpriseMeta().FillAuthzContext(&authzContext)
s.agent.AgentEnterpriseMeta().FillAuthzContext(&authzContext)
if authz.AgentWrite(s.agent.config.NodeName, &authzContext) != acl.Allow {
return nil, acl.ErrPermissionDenied
}
@ -568,7 +573,8 @@ func (s *HTTPHandlers) AgentJoin(resp http.ResponseWriter, req *http.Request) (i
}
_, err = s.agent.JoinWAN([]string{addr})
} else {
_, err = s.agent.JoinLAN([]string{addr})
// TODO(partitions): use the request entmeta
_, err = s.agent.JoinLAN([]string{addr}, nil)
}
return nil, err
}
@ -584,7 +590,7 @@ func (s *HTTPHandlers) AgentLeave(resp http.ResponseWriter, req *http.Request) (
// Authorize using the agent's own enterprise meta, not the token.
var authzContext acl.AuthorizerContext
s.agent.agentEnterpriseMeta().FillAuthzContext(&authzContext)
s.agent.AgentEnterpriseMeta().FillAuthzContext(&authzContext)
if authz.AgentWrite(s.agent.config.NodeName, &authzContext) != acl.Allow {
return nil, acl.ErrPermissionDenied
}
@ -612,7 +618,8 @@ func (s *HTTPHandlers) AgentForceLeave(resp http.ResponseWriter, req *http.Reque
_, prune := req.URL.Query()["prune"]
addr := strings.TrimPrefix(req.URL.Path, "/v1/agent/force-leave/")
return nil, s.agent.ForceLeave(addr, prune)
// TODO(partitions): use the request entmeta
return nil, s.agent.ForceLeave(addr, prune, nil)
}
// syncChanges is a helper function which wraps a blocking call to sync
@ -1274,7 +1281,7 @@ func (s *HTTPHandlers) AgentNodeMaintenance(resp http.ResponseWriter, req *http.
}
var authzContext acl.AuthorizerContext
s.agent.agentEnterpriseMeta().FillAuthzContext(&authzContext)
s.agent.AgentEnterpriseMeta().FillAuthzContext(&authzContext)
if authz.NodeWrite(s.agent.config.NodeName, &authzContext) != acl.Allow {
return nil, acl.ErrPermissionDenied
}
@ -1299,7 +1306,7 @@ func (s *HTTPHandlers) AgentMonitor(resp http.ResponseWriter, req *http.Request)
// Authorize using the agent's own enterprise meta, not the token.
var authzContext acl.AuthorizerContext
s.agent.agentEnterpriseMeta().FillAuthzContext(&authzContext)
s.agent.AgentEnterpriseMeta().FillAuthzContext(&authzContext)
if authz.AgentRead(s.agent.config.NodeName, &authzContext) != acl.Allow {
return nil, acl.ErrPermissionDenied
}
@ -1382,7 +1389,7 @@ func (s *HTTPHandlers) AgentToken(resp http.ResponseWriter, req *http.Request) (
// Authorize using the agent's own enterprise meta, not the token.
var authzContext acl.AuthorizerContext
s.agent.agentEnterpriseMeta().FillAuthzContext(&authzContext)
s.agent.AgentEnterpriseMeta().FillAuthzContext(&authzContext)
if authz.AgentWrite(s.agent.config.NodeName, &authzContext) != acl.Allow {
return nil, acl.ErrPermissionDenied
}

@ -1891,12 +1891,12 @@ func TestAgent_Join(t *testing.T) {
t.Fatalf("Err: %v", obj)
}
if len(a1.LANMembers()) != 2 {
if len(a1.LANMembersInAgentPartition()) != 2 {
t.Fatalf("should have 2 members")
}
retry.Run(t, func(r *retry.R) {
if got, want := len(a2.LANMembers()), 2; got != want {
if got, want := len(a2.LANMembersInAgentPartition()), 2; got != want {
r.Fatalf("got %d LAN members want %d", got, want)
}
})
@ -2002,7 +2002,7 @@ func TestAgent_JoinLANNotify(t *testing.T) {
a1.joinLANNotifier = notif
addr := fmt.Sprintf("127.0.0.1:%d", a2.Config.SerfPortLAN)
_, err := a1.JoinLAN([]string{addr})
_, err := a1.JoinLAN([]string{addr}, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -2030,7 +2030,7 @@ func TestAgent_Leave(t *testing.T) {
// Join first
addr := fmt.Sprintf("127.0.0.1:%d", a2.Config.SerfPortLAN)
_, err := a1.JoinLAN([]string{addr})
_, err := a1.JoinLAN([]string{addr}, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -2045,7 +2045,7 @@ func TestAgent_Leave(t *testing.T) {
t.Fatalf("Err: %v", obj)
}
retry.Run(t, func(r *retry.R) {
m := a1.LANMembers()
m := a1.LANMembersInAgentPartition()
if got, want := m[1].Status, serf.StatusLeft; got != want {
r.Fatalf("got status %q want %q", got, want)
}
@ -2101,7 +2101,7 @@ func TestAgent_ForceLeave(t *testing.T) {
// Join first
addr := fmt.Sprintf("127.0.0.1:%d", a2.Config.SerfPortLAN)
_, err := a1.JoinLAN([]string{addr})
_, err := a1.JoinLAN([]string{addr}, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -2110,7 +2110,7 @@ func TestAgent_ForceLeave(t *testing.T) {
a2.Shutdown()
// Wait for agent being marked as failed, so we wait for full shutdown of Agent
retry.Run(t, func(r *retry.R) {
m := a1.LANMembers()
m := a1.LANMembersInAgentPartition()
if got, want := m[1].Status, serf.StatusFailed; got != want {
r.Fatalf("got status %q want %q", got, want)
}
@ -2126,7 +2126,7 @@ func TestAgent_ForceLeave(t *testing.T) {
t.Fatalf("Err: %v", obj)
}
retry.Run(t, func(r *retry.R) {
m := a1.LANMembers()
m := a1.LANMembersInAgentPartition()
if got, want := m[1].Status, serf.StatusLeft; got != want {
r.Fatalf("got status %q want %q", got, want)
}
@ -2210,7 +2210,7 @@ func TestAgent_ForceLeavePrune(t *testing.T) {
// Join first
addr := fmt.Sprintf("127.0.0.1:%d", a2.Config.SerfPortLAN)
_, err := a1.JoinLAN([]string{addr})
_, err := a1.JoinLAN([]string{addr}, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -2219,7 +2219,7 @@ func TestAgent_ForceLeavePrune(t *testing.T) {
a2.Shutdown()
// Wait for agent being marked as failed, so we wait for full shutdown of Agent
retry.Run(t, func(r *retry.R) {
m := a1.LANMembers()
m := a1.LANMembersInAgentPartition()
for _, member := range m {
if member.Name == a2.Config.NodeName {
if member.Status != serf.StatusFailed {
@ -2239,7 +2239,7 @@ func TestAgent_ForceLeavePrune(t *testing.T) {
t.Fatalf("Err: %v", obj)
}
retry.Run(t, func(r *retry.R) {
m := len(a1.LANMembers())
m := len(a1.LANMembersInAgentPartition())
if m != 1 {
r.Fatalf("want one member, got %v", m)
}

@ -51,6 +51,6 @@ func (a *Agent) enterpriseStats() map[string]map[string]string {
return nil
}
func (a *Agent) agentEnterpriseMeta() *structs.EnterpriseMeta {
func (a *Agent) AgentEnterpriseMeta() *structs.EnterpriseMeta {
return structs.NodeEnterpriseMetaInDefaultPartition()
}

@ -4500,7 +4500,7 @@ services {
// Now connect to server
_, err := a1.JoinLAN([]string{
fmt.Sprintf("127.0.0.1:%d", a2.Config.SerfPortLAN),
})
}, nil)
require.NoError(t, err)
t.Logf("joined client to server")

@ -32,7 +32,10 @@ func (b autoConfigBackend) GetCARoots() (*structs.IndexedCARoots, error) {
// DatacenterJoinAddresses will return all the strings suitable for usage in
// retry join operations to connect to the the LAN or LAN segment gossip pool.
func (b autoConfigBackend) DatacenterJoinAddresses(segment string) ([]string, error) {
members, err := b.Server.LANSegmentMembers(segment)
members, err := b.Server.LANMembers(LANMemberFilter{
Segment: segment,
Partition: "", // TODO(partitions): figure out what goes here
})
if err != nil {
return nil, fmt.Errorf("Failed to retrieve members for segment %s - %w", segment, err)
}

@ -66,8 +66,9 @@ func (d *AutopilotDelegate) NotifyState(state *autopilot.State) {
}
func (d *AutopilotDelegate) RemoveFailedServer(srv *autopilot.Server) {
serverEntMeta := structs.DefaultEnterpriseMetaInDefaultPartition()
go func() {
if err := d.server.RemoveFailedNode(srv.Name, false); err != nil {
if err := d.server.RemoveFailedNode(srv.Name, false, serverEntMeta); err != nil {
d.server.logger.Error("failed to remove server", "name", srv.Name, "id", srv.ID, "error", err)
}
}()

@ -101,7 +101,7 @@ func TestAutopilot_CleanupDeadServer(t *testing.T) {
retry.Run(t, func(r *retry.R) {
alive := 0
for _, m := range servers[leaderIndex].LANMembers() {
for _, m := range servers[leaderIndex].LANMembersInAgentPartition() {
if m.Status == serf.StatusAlive {
alive++
}
@ -498,7 +498,7 @@ func TestAutopilot_MinQuorum(t *testing.T) {
if leader == nil {
r.Fatalf("no members set")
}
for _, m := range leader.LANMembers() {
for _, m := range leader.LANMembersInAgentPartition() {
if m.Name == dead.config.NodeName && m.Status != serf.StatusLeft {
r.Fatalf("%v should be left, got %v", m.Name, m.Status.String())
}
@ -515,7 +515,7 @@ func TestAutopilot_MinQuorum(t *testing.T) {
if leader == nil {
r.Fatalf("no members set")
}
for _, m := range leader.LANMembers() {
for _, m := range leader.LANMembersInAgentPartition() {
if m.Name == dead.config.NodeName && m.Status != serf.StatusFailed {
r.Fatalf("%v should be failed, got %v", m.Name, m.Status.String())
}

@ -177,7 +177,7 @@ func (c *Client) Shutdown() error {
return nil
}
// Leave is used to prepare for a graceful shutdown
// Leave is used to prepare for a graceful shutdown.
func (c *Client) Leave() error {
c.logger.Info("client starting leave")
@ -190,40 +190,49 @@ func (c *Client) Leave() error {
return nil
}
// JoinLAN is used to have Consul client join the inner-DC pool
// The target address should be another node inside the DC
// listening on the Serf LAN address
func (c *Client) JoinLAN(addrs []string) (int, error) {
// JoinLAN is used to have Consul join the inner-DC pool The target address
// should be another node inside the DC listening on the Serf LAN address
func (c *Client) JoinLAN(addrs []string, entMeta *structs.EnterpriseMeta) (int, error) {
// TODO(partitions): assert that the partitions match
return c.serf.Join(addrs, true)
}
// LocalMember is used to return the local node
func (c *Client) LocalMember() serf.Member {
// AgentLocalMember is used to retrieve the LAN member for the local node.
func (c *Client) AgentLocalMember() serf.Member {
return c.serf.LocalMember()
}
// LANMembers is used to return the members of the LAN cluster
func (c *Client) LANMembers() []serf.Member {
// LANMembersInAgentPartition returns the LAN members for this agent's
// canonical serf pool. For clients this is the only pool that exists. For
// servers it's the pool in the default segment and the default partition.
func (c *Client) LANMembersInAgentPartition() []serf.Member {
return c.serf.Members()
}
// LANMembersAllSegments returns members from all segments.
func (c *Client) LANMembersAllSegments() ([]serf.Member, error) {
return c.serf.Members(), nil
}
// LANMembers returns the LAN members for one of:
//
// - the requested partition
// - the requested segment
// - all segments
//
// This is limited to segments and partitions that the node is a member of.
func (c *Client) LANMembers(filter LANMemberFilter) ([]serf.Member, error) {
if err := filter.Validate(); err != nil {
return nil, err
}
// TODO(partitions): assert that the partitions match
// LANSegmentMembers only returns our own segment's members, because clients
// can't be in multiple segments.
func (c *Client) LANSegmentMembers(segment string) ([]serf.Member, error) {
if segment == c.config.Segment {
return c.LANMembers(), nil
if !filter.AllSegments && filter.Segment != c.config.Segment {
return nil, fmt.Errorf("segment %q not found", filter.Segment)
}
return nil, fmt.Errorf("segment %q not found", segment)
return c.serf.Members(), nil
}
// RemoveFailedNode is used to remove a failed node from the cluster
func (c *Client) RemoveFailedNode(node string, prune bool) error {
// RemoveFailedNode is used to remove a failed node from the cluster.
func (c *Client) RemoveFailedNode(node string, prune bool, entMeta *structs.EnterpriseMeta) error {
// TODO(partitions): assert that the partitions match
if prune {
return c.serf.RemoveFailedNodePrune(node)
}
@ -362,9 +371,9 @@ func (c *Client) Stats() map[string]map[string]string {
return stats
}
// GetLANCoordinate returns the network coordinate of the current node, as
// maintained by Serf.
// GetLANCoordinate returns the coordinate of the node in the LAN gossip pool.
func (c *Client) GetLANCoordinate() (lib.CoordinateSet, error) {
// TODO(partitions): possibly something here
lan, err := c.serf.GetCoordinate()
if err != nil {
return nil, err

@ -124,10 +124,10 @@ func TestClient_JoinLAN(t *testing.T) {
if got, want := c1.router.GetLANManager().NumServers(), 1; got != want {
r.Fatalf("got %d servers want %d", got, want)
}
if got, want := len(s1.LANMembers()), 2; got != want {
if got, want := len(s1.LANMembersInAgentPartition()), 2; got != want {
r.Fatalf("got %d server LAN members want %d", got, want)
}
if got, want := len(c1.LANMembers()), 2; got != want {
if got, want := len(c1.LANMembersInAgentPartition()), 2; got != want {
r.Fatalf("got %d client LAN members want %d", got, want)
}
})
@ -157,8 +157,8 @@ func TestClient_LANReap(t *testing.T) {
testrpc.WaitForLeader(t, c1.RPC, "dc1")
retry.Run(t, func(r *retry.R) {
require.Len(r, s1.LANMembers(), 2)
require.Len(r, c1.LANMembers(), 2)
require.Len(r, s1.LANMembersInAgentPartition(), 2)
require.Len(r, c1.LANMembersInAgentPartition(), 2)
})
// Check the router has both
@ -172,7 +172,7 @@ func TestClient_LANReap(t *testing.T) {
s1.Shutdown()
retry.Run(t, func(r *retry.R) {
require.Len(r, c1.LANMembers(), 1)
require.Len(r, c1.LANMembersInAgentPartition(), 1)
server := c1.router.FindLANServer()
require.Nil(t, server)
})
@ -193,15 +193,15 @@ func TestClient_JoinLAN_Invalid(t *testing.T) {
defer c1.Shutdown()
// Try to join
if _, err := c1.JoinLAN([]string{joinAddrLAN(s1)}); err == nil {
if _, err := c1.JoinLAN([]string{joinAddrLAN(s1)}, nil); err == nil {
t.Fatal("should error")
}
time.Sleep(50 * time.Millisecond)
if len(s1.LANMembers()) != 1 {
if len(s1.LANMembersInAgentPartition()) != 1 {
t.Fatalf("should not join")
}
if len(c1.LANMembers()) != 1 {
if len(c1.LANMembersInAgentPartition()) != 1 {
t.Fatalf("should not join")
}
}
@ -217,7 +217,7 @@ func TestClient_JoinWAN_Invalid(t *testing.T) {
defer c1.Shutdown()
// Try to join
if _, err := c1.JoinLAN([]string{joinAddrWAN(s1)}); err == nil {
if _, err := c1.JoinLAN([]string{joinAddrWAN(s1)}, nil); err == nil {
t.Fatal("should error")
}
@ -225,7 +225,7 @@ func TestClient_JoinWAN_Invalid(t *testing.T) {
if len(s1.WANMembers()) != 1 {
t.Fatalf("should not join")
}
if len(c1.LANMembers()) != 1 {
if len(c1.LANMembersInAgentPartition()) != 1 {
t.Fatalf("should not join")
}
}
@ -250,11 +250,11 @@ func TestClient_RPC(t *testing.T) {
joinLAN(t, c1, s1)
// Check the members
if len(s1.LANMembers()) != 2 {
if len(s1.LANMembersInAgentPartition()) != 2 {
t.Fatalf("bad len")
}
if len(c1.LANMembers()) != 2 {
if len(c1.LANMembersInAgentPartition()) != 2 {
t.Fatalf("bad len")
}
@ -354,10 +354,10 @@ func TestClient_RPC_Pool(t *testing.T) {
// Wait for both agents to finish joining
retry.Run(t, func(r *retry.R) {
if got, want := len(s1.LANMembers()), 2; got != want {
if got, want := len(s1.LANMembersInAgentPartition()), 2; got != want {
r.Fatalf("got %d server LAN members want %d", got, want)
}
if got, want := len(c1.LANMembers()), 2; got != want {
if got, want := len(c1.LANMembersInAgentPartition()), 2; got != want {
r.Fatalf("got %d client LAN members want %d", got, want)
}
})
@ -418,12 +418,12 @@ func TestClient_RPC_ConsulServerPing(t *testing.T) {
c.router.GetLANManager().ResetRebalanceTimer()
time.Sleep(time.Second)
if len(c.LANMembers()) != numServers+numClients {
t.Errorf("bad len: %d", len(c.LANMembers()))
if len(c.LANMembersInAgentPartition()) != numServers+numClients {
t.Errorf("bad len: %d", len(c.LANMembersInAgentPartition()))
}
for _, s := range servers {
if len(s.LANMembers()) != numServers+numClients {
t.Errorf("bad len: %d", len(s.LANMembers()))
if len(s.LANMembersInAgentPartition()) != numServers+numClients {
t.Errorf("bad len: %d", len(s.LANMembersInAgentPartition()))
}
}
@ -476,10 +476,10 @@ func TestClient_RPC_TLS(t *testing.T) {
// Wait for joins to finish/RPC to succeed
retry.Run(t, func(r *retry.R) {
if got, want := len(s1.LANMembers()), 2; got != want {
if got, want := len(s1.LANMembersInAgentPartition()), 2; got != want {
r.Fatalf("got %d server LAN members want %d", got, want)
}
if got, want := len(c1.LANMembers()), 2; got != want {
if got, want := len(c1.LANMembersInAgentPartition()), 2; got != want {
r.Fatalf("got %d client LAN members want %d", got, want)
}
if err := c1.RPC("Status.Ping", struct{}{}, &out); err != nil {
@ -684,10 +684,10 @@ func TestClient_SnapshotRPC_TLS(t *testing.T) {
// Try to join.
joinLAN(t, c1, s1)
retry.Run(t, func(r *retry.R) {
if got, want := len(s1.LANMembers()), 2; got != want {
if got, want := len(s1.LANMembersInAgentPartition()), 2; got != want {
r.Fatalf("got %d server members want %d", got, want)
}
if got, want := len(c1.LANMembers()), 2; got != want {
if got, want := len(c1.LANMembersInAgentPartition()), 2; got != want {
r.Fatalf("got %d client members want %d", got, want)
}
@ -746,10 +746,10 @@ func TestClientServer_UserEvent(t *testing.T) {
// Check the members
retry.Run(t, func(r *retry.R) {
if got, want := len(s1.LANMembers()), 2; got != want {
if got, want := len(s1.LANMembersInAgentPartition()), 2; got != want {
r.Fatalf("got %d server LAN members want %d", got, want)
}
if got, want := len(c1.LANMembers()), 2; got != want {
if got, want := len(c1.LANMembersInAgentPartition()), 2; got != want {
r.Fatalf("got %d client LAN members want %d", got, want)
}
})
@ -845,8 +845,8 @@ func TestClient_ShortReconnectTimeout(t *testing.T) {
// up to 10x the time in the case of slow CI.
require.Eventually(t,
func() bool {
return len(cluster.Servers[0].LANMembers()) == 2 &&
len(cluster.Clients[0].LANMembers()) == 2
return len(cluster.Servers[0].LANMembersInAgentPartition()) == 2 &&
len(cluster.Clients[0].LANMembersInAgentPartition()) == 2
},
time.Second,

@ -108,8 +108,8 @@ func joinAddrWAN(s *Server) string {
}
type clientOrServer interface {
JoinLAN(addrs []string) (int, error)
LANMembers() []serf.Member
JoinLAN(addrs []string, entMeta *structs.EnterpriseMeta) (int, error)
LANMembersInAgentPartition() []serf.Member
}
// joinLAN is a convenience function for
@ -129,15 +129,15 @@ func joinLAN(t *testing.T, member clientOrServer, leader *Server) {
memberAddr = fmt.Sprintf("127.0.0.1:%d", x.config.SerfLANConfig.MemberlistConfig.BindPort)
}
leaderAddr := joinAddrLAN(leader)
if _, err := member.JoinLAN([]string{leaderAddr}); err != nil {
if _, err := member.JoinLAN([]string{leaderAddr}, nil); err != nil {
t.Fatal(err)
}
retry.Run(t, func(r *retry.R) {
if !seeEachOther(leader.LANMembers(), member.LANMembers(), leaderAddr, memberAddr) {
if !seeEachOther(leader.LANMembersInAgentPartition(), member.LANMembersInAgentPartition(), leaderAddr, memberAddr) {
r.Fatalf("leader and member cannot see each other on LAN")
}
})
if !seeEachOther(leader.LANMembers(), member.LANMembers(), leaderAddr, memberAddr) {
if !seeEachOther(leader.LANMembersInAgentPartition(), member.LANMembersInAgentPartition(), leaderAddr, memberAddr) {
t.Fatalf("leader and member cannot see each other on LAN")
}
}

@ -249,7 +249,7 @@ func TestLeader_ReapMember(t *testing.T) {
})
// Simulate a node reaping
mems := s1.LANMembers()
mems := s1.LANMembersInAgentPartition()
var c1mem serf.Member
for _, m := range mems {
if m.Name == c1.config.NodeName {
@ -688,7 +688,7 @@ func TestLeader_LeftServer(t *testing.T) {
servers[0].Shutdown()
// Force remove the non-leader (transition to left state)
if err := servers[1].RemoveFailedNode(servers[0].config.NodeName, false); err != nil {
if err := servers[1].RemoveFailedNode(servers[0].config.NodeName, false, nil); err != nil {
t.Fatalf("err: %v", err)
}
@ -1045,7 +1045,7 @@ func TestLeader_ChangeServerID(t *testing.T) {
retry.Run(t, func(r *retry.R) {
alive := 0
for _, m := range s1.LANMembers() {
for _, m := range s1.LANMembersInAgentPartition() {
if m.Status == serf.StatusAlive {
alive++
}
@ -1118,10 +1118,10 @@ func TestLeader_ChangeNodeID(t *testing.T) {
// Shut down a server, freeing up its address/port
s3.Shutdown()
// wait for s1.LANMembers() to show s3 as StatusFailed or StatusLeft on
// wait for s1.LANMembersInAgentPartition() to show s3 as StatusFailed or StatusLeft on
retry.Run(t, func(r *retry.R) {
var gone bool
for _, m := range s1.LANMembers() {
for _, m := range s1.LANMembersInAgentPartition() {
if m.Name == s3.config.NodeName && (m.Status == serf.StatusFailed || m.Status == serf.StatusLeft) {
gone = true
}
@ -1149,7 +1149,7 @@ func TestLeader_ChangeNodeID(t *testing.T) {
})
retry.Run(t, func(r *retry.R) {
for _, m := range s1.LANMembers() {
for _, m := range s1.LANMembersInAgentPartition() {
require.Equal(r, serf.StatusAlive, m.Status)
}
})

@ -20,20 +20,6 @@ var SegmentOSSSummaries = []prometheus.SummaryDefinition{
},
}
// LANMembersAllSegments returns members from all segments.
func (s *Server) LANMembersAllSegments() ([]serf.Member, error) {
return s.LANMembers(), nil
}
// LANSegmentMembers is used to return the members of the given LAN segment.
func (s *Server) LANSegmentMembers(segment string) ([]serf.Member, error) {
if segment == "" {
return s.LANMembers(), nil
}
return nil, structs.ErrSegmentsNotSupported
}
// LANSegmentAddr is used to return the address used for the given LAN segment.
func (s *Server) LANSegmentAddr(name string) string {
return ""

@ -0,0 +1,27 @@
package consul
import (
"fmt"
"github.com/hashicorp/consul/agent/structs"
)
type LANMemberFilter struct {
Partition string
Segment string
AllSegments bool
}
func (f LANMemberFilter) Validate() error {
if f.AllSegments && f.Segment != "" {
return fmt.Errorf("cannot specify both allSegments and segment filters")
}
if (f.AllSegments || f.Segment != "") && !structs.IsDefaultPartition(f.Partition) {
return fmt.Errorf("segments do not exist outside of the default partition")
}
return nil
}
func (f LANMemberFilter) PartitionOrDefault() string {
return structs.PartitionOrDefault(f.Partition)
}

@ -561,7 +561,7 @@ func NewServer(config *Config, flat Deps) (*Server, error) {
WithDatacenter(s.config.Datacenter).
WithReportingInterval(s.config.MetricsReportingInterval).
WithGetMembersFunc(func() []serf.Member {
members, err := s.LANMembersAllSegments()
members, err := s.lanPoolAllMembers()
if err != nil {
return []serf.Member{}
}
@ -993,7 +993,7 @@ func (s *Server) attemptLeadershipTransfer() (success bool) {
return true
}
// Leave is used to prepare for a graceful shutdown of the server
// Leave is used to prepare for a graceful shutdown.
func (s *Server) Leave() error {
s.logger.Info("server starting leave")
@ -1097,10 +1097,10 @@ func (s *Server) Leave() error {
return nil
}
// JoinLAN is used to have Consul join the inner-DC pool
// The target address should be another node inside the DC
// listening on the Serf LAN address
func (s *Server) JoinLAN(addrs []string) (int, error) {
// JoinLAN is used to have Consul join the inner-DC pool The target address
// should be another node inside the DC listening on the Serf LAN address
func (s *Server) JoinLAN(addrs []string, entMeta *structs.EnterpriseMeta) (int, error) {
// TODO(partitions): handle the different partitions
return s.serfLAN.Join(addrs, true)
}
@ -1149,13 +1149,13 @@ func (s *Server) PrimaryGatewayFallbackAddresses() []string {
return s.gatewayLocator.PrimaryGatewayFallbackAddresses()
}
// LocalMember is used to return the local node
func (s *Server) LocalMember() serf.Member {
// AgentLocalMember is used to retrieve the LAN member for the local node.
func (s *Server) AgentLocalMember() serf.Member {
return s.serfLAN.LocalMember()
}
// LANMembers is used to return the members of the LAN cluster
func (s *Server) LANMembers() []serf.Member {
// LANMembersInAgentPartition is used to return the members of the LAN cluster
func (s *Server) LANMembersInAgentPartition() []serf.Member {
return s.serfLAN.Members()
}
@ -1167,8 +1167,9 @@ func (s *Server) WANMembers() []serf.Member {
return s.serfWAN.Members()
}
// RemoveFailedNode is used to remove a failed node from the cluster
func (s *Server) RemoveFailedNode(node string, prune bool) error {
// RemoveFailedNode is used to remove a failed node from the cluster.
func (s *Server) RemoveFailedNode(node string, prune bool, entMeta *structs.EnterpriseMeta) error {
// TODO(partitions): handle the different partitions
var removeFn func(*serf.Serf, string) error
if prune {
removeFn = (*serf.Serf).RemoveFailedNodePrune

@ -1,3 +1,4 @@
//go:build !consulent
// +build !consulent
package consul
@ -5,6 +6,8 @@ package consul
import (
"github.com/hashicorp/serf/serf"
"google.golang.org/grpc"
"github.com/hashicorp/consul/agent/structs"
)
func (s *Server) removeFailedNodeEnterprise(remove func(*serf.Serf, string) error, node, wanNode string) error {
@ -13,3 +16,26 @@ func (s *Server) removeFailedNodeEnterprise(remove func(*serf.Serf, string) erro
}
func (s *Server) registerEnterpriseGRPCServices(deps Deps, srv *grpc.Server) {}
// lanPoolAllMembers only returns our own segment or partition's members, because
// OSS servers can't be in multiple segments or partitions.
func (s *Server) lanPoolAllMembers() ([]serf.Member, error) {
return s.LANMembersInAgentPartition(), nil
}
// LANMembers returns the LAN members for one of:
//
// - the requested partition
// - the requested segment
// - all segments
//
// This is limited to segments and partitions that the node is a member of.
func (s *Server) LANMembers(filter LANMemberFilter) ([]serf.Member, error) {
if err := filter.Validate(); err != nil {
return nil, err
}
if filter.Segment != "" {
return nil, structs.ErrSegmentsNotSupported
}
return s.LANMembersInAgentPartition(), nil
}

@ -380,10 +380,10 @@ func TestServer_JoinLAN(t *testing.T) {
// Try to join
joinLAN(t, s2, s1)
retry.Run(t, func(r *retry.R) {
if got, want := len(s1.LANMembers()), 2; got != want {
if got, want := len(s1.LANMembersInAgentPartition()), 2; got != want {
r.Fatalf("got %d s1 LAN members want %d", got, want)
}
if got, want := len(s2.LANMembers()), 2; got != want {
if got, want := len(s2.LANMembersInAgentPartition()), 2; got != want {
r.Fatalf("got %d s2 LAN members want %d", got, want)
}
})
@ -428,17 +428,17 @@ func TestServer_JoinLAN_SerfAllowedCIDRs(t *testing.T) {
defer rs3.Shutdown()
leaderAddr := joinAddrLAN(s1)
if _, err := a2.JoinLAN([]string{leaderAddr}); err != nil {
if _, err := a2.JoinLAN([]string{leaderAddr}, nil); err != nil {
t.Fatalf("Expected no error, had: %#v", err)
}
// Try to join
joinWAN(t, rs3, s1)
retry.Run(t, func(r *retry.R) {
if got, want := len(s1.LANMembers()), 1; got != want {
if got, want := len(s1.LANMembersInAgentPartition()), 1; got != want {
// LAN is blocked, should be 1 only
r.Fatalf("got %d s1 LAN members want %d", got, want)
}
if got, want := len(a2.LANMembers()), 2; got != want {
if got, want := len(a2.LANMembersInAgentPartition()), 2; got != want {
// LAN is blocked a2 can see s1, but not s1
r.Fatalf("got %d a2 LAN members want %d", got, want)
}
@ -497,9 +497,9 @@ func TestServer_LANReap(t *testing.T) {
testrpc.WaitForLeader(t, s3.RPC, "dc1")
retry.Run(t, func(r *retry.R) {
require.Len(r, s1.LANMembers(), 3)
require.Len(r, s2.LANMembers(), 3)
require.Len(r, s3.LANMembers(), 3)
require.Len(r, s1.LANMembersInAgentPartition(), 3)
require.Len(r, s2.LANMembersInAgentPartition(), 3)
require.Len(r, s3.LANMembersInAgentPartition(), 3)
})
// Check the router has both
@ -513,7 +513,7 @@ func TestServer_LANReap(t *testing.T) {
s2.Shutdown()
retry.Run(t, func(r *retry.R) {
require.Len(r, s1.LANMembers(), 2)
require.Len(r, s1.LANMembersInAgentPartition(), 2)
servers := s1.serverLookup.Servers()
require.Len(r, servers, 2)
// require.Equal(r, s1.config.NodeName, servers[0].Name)
@ -930,10 +930,10 @@ func TestServer_JoinSeparateLanAndWanAddresses(t *testing.T) {
if got, want := len(s2.WANMembers()), 3; got != want {
r.Fatalf("got %d s2 WAN members want %d", got, want)
}
if got, want := len(s2.LANMembers()), 2; got != want {
if got, want := len(s2.LANMembersInAgentPartition()), 2; got != want {
r.Fatalf("got %d s2 LAN members want %d", got, want)
}
if got, want := len(s3.LANMembers()), 2; got != want {
if got, want := len(s3.LANMembersInAgentPartition()), 2; got != want {
r.Fatalf("got %d s3 LAN members want %d", got, want)
}
})
@ -964,7 +964,7 @@ func TestServer_JoinSeparateLanAndWanAddresses(t *testing.T) {
// Get and check the lan address of s2 from s3
var s2LanAddr string
for _, lanmember := range s3.LANMembers() {
for _, lanmember := range s3.LANMembersInAgentPartition() {
if lanmember.Name == s2Name {
s2LanAddr = lanmember.Addr.String()
}

@ -25,31 +25,26 @@ func (m *delegateMock) Leave() error {
return m.Called().Error(0)
}
func (m *delegateMock) LANMembers() []serf.Member {
func (m *delegateMock) LANMembersInAgentPartition() []serf.Member {
return m.Called().Get(0).([]serf.Member)
}
func (m *delegateMock) LANMembersAllSegments() ([]serf.Member, error) {
ret := m.Called()
return ret.Get(0).([]serf.Member), ret.Error(1)
}
func (m *delegateMock) LANSegmentMembers(segment string) ([]serf.Member, error) {
ret := m.Called()
func (m *delegateMock) LANMembers(f consul.LANMemberFilter) ([]serf.Member, error) {
ret := m.Called(f)
return ret.Get(0).([]serf.Member), ret.Error(1)
}
func (m *delegateMock) LocalMember() serf.Member {
func (m *delegateMock) AgentLocalMember() serf.Member {
return m.Called().Get(0).(serf.Member)
}
func (m *delegateMock) JoinLAN(addrs []string) (n int, err error) {
ret := m.Called(addrs)
func (m *delegateMock) JoinLAN(addrs []string, entMeta *structs.EnterpriseMeta) (n int, err error) {
ret := m.Called(addrs, entMeta)
return ret.Int(0), ret.Error(1)
}
func (m *delegateMock) RemoveFailedNode(node string, prune bool) error {
return m.Called(node, prune).Error(0)
func (m *delegateMock) RemoveFailedNode(node string, prune bool, entMeta *structs.EnterpriseMeta) error {
return m.Called(node, prune, entMeta).Error(0)
}
func (m *delegateMock) ResolveTokenToIdentity(token string) (structs.ACLIdentity, error) {

@ -136,7 +136,7 @@ func NewDNSServer(a *Agent) (*DNSServer, error) {
domain: domain,
altDomain: altDomain,
logger: a.logger.Named(logging.DNS),
defaultEnterpriseMeta: *a.agentEnterpriseMeta(),
defaultEnterpriseMeta: *a.AgentEnterpriseMeta(),
}
cfg, err := GetDNSConfig(a.config)
if err != nil {

@ -6250,7 +6250,7 @@ func TestDNS_NonExistentDC_RPC(t *testing.T) {
// Join LAN cluster
addr := fmt.Sprintf("127.0.0.1:%d", s.Config.SerfPortLAN)
_, err := c.JoinLAN([]string{addr})
_, err := c.JoinLAN([]string{addr}, nil)
require.NoError(t, err)
testrpc.WaitForTestAgent(t, c.RPC, "dc1")

@ -5,10 +5,11 @@ import (
"strings"
"time"
"github.com/hashicorp/consul/lib"
discover "github.com/hashicorp/go-discover"
discoverk8s "github.com/hashicorp/go-discover/provider/k8s"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/lib"
)
func (a *Agent) retryJoinLAN() {
@ -18,8 +19,12 @@ func (a *Agent) retryJoinLAN() {
addrs: a.config.RetryJoinLAN,
maxAttempts: a.config.RetryJoinMaxAttemptsLAN,
interval: a.config.RetryJoinIntervalLAN,
join: a.JoinLAN,
logger: a.logger.With("cluster", "LAN"),
join: func(addrs []string) (int, error) {
// NOTE: For partitioned servers you are only capable of using retry join
// to join nodes in the default partition.
return a.JoinLAN(addrs, a.AgentEnterpriseMeta())
},
logger: a.logger.With("cluster", "LAN"),
}
if err := r.retryJoin(); err != nil {
a.retryJoinCh <- err

@ -330,7 +330,7 @@ func TestServiceManager_PersistService_API(t *testing.T) {
// Join first
_, err := a.JoinLAN([]string{
fmt.Sprintf("127.0.0.1:%d", serverAgent.Config.SerfPortLAN),
})
}, nil)
require.NoError(err)
testrpc.WaitForLeader(t, a.RPC, "dc1")
@ -589,7 +589,7 @@ func TestServiceManager_PersistService_ConfigFiles(t *testing.T) {
// Join first
_, err := a.JoinLAN([]string{
fmt.Sprintf("127.0.0.1:%d", serverAgent.Config.SerfPortLAN),
})
}, nil)
require.NoError(t, err)
testrpc.WaitForLeader(t, a.RPC, "dc1")

@ -75,6 +75,14 @@ func (m *EnterpriseMeta) PartitionOrDefault() string {
return "default"
}
func EqualPartitions(_, _ string) bool {
return true
}
func IsDefaultPartition(partition string) bool {
return true
}
func PartitionOrDefault(_ string) string {
return "default"
}

@ -114,7 +114,9 @@ func (c *cmd) startupJoin(agent *agent.Agent, cfg *config.RuntimeConfig) error {
}
c.logger.Info("Joining cluster")
n, err := agent.JoinLAN(cfg.StartJoinAddrsLAN)
// NOTE: For partitioned servers you are only capable of using start join
// to join nodes in the default partition.
n, err := agent.JoinLAN(cfg.StartJoinAddrsLAN, agent.AgentEnterpriseMeta())
if err != nil {
return err
}

@ -101,7 +101,7 @@ func TestRetryJoin(t *testing.T) {
testrpc.WaitForLeader(t, a.RPC, "dc1")
retry.Run(t, func(r *retry.R) {
if got, want := len(a.LANMembers()), 2; got != want {
if got, want := len(a.LANMembersInAgentPartition()), 2; got != want {
r.Fatalf("got %d LAN members want %d", got, want)
}
})

@ -4,10 +4,11 @@ import (
"strings"
"testing"
"github.com/hashicorp/consul/agent"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/serf/serf"
"github.com/mitchellh/cli"
"github.com/hashicorp/consul/agent"
"github.com/hashicorp/consul/sdk/testutil/retry"
)
func TestForceLeaveCommand_noTabs(t *testing.T) {
@ -28,7 +29,7 @@ func TestForceLeaveCommand(t *testing.T) {
defer a1.Shutdown()
defer a2.Shutdown()
_, err := a2.JoinLAN([]string{a1.Config.SerfBindAddrLAN.String()})
_, err := a2.JoinLAN([]string{a1.Config.SerfBindAddrLAN.String()}, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -48,12 +49,12 @@ func TestForceLeaveCommand(t *testing.T) {
t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String())
}
m := a1.LANMembers()
m := a1.LANMembersInAgentPartition()
if len(m) != 2 {
t.Fatalf("should have 2 members: %#v", m)
}
retry.Run(t, func(r *retry.R) {
m = a1.LANMembers()
m = a1.LANMembersInAgentPartition()
if got, want := m[1].Status, serf.StatusLeft; got != want {
r.Fatalf("got status %q want %q", got, want)
}
@ -93,7 +94,7 @@ func TestForceLeaveCommand_prune(t *testing.T) {
a2 := agent.StartTestAgent(t, agent.TestAgent{Name: "Agent2"})
defer a2.Shutdown()
_, err := a2.JoinLAN([]string{a1.Config.SerfBindAddrLAN.String()})
_, err := a2.JoinLAN([]string{a1.Config.SerfBindAddrLAN.String()}, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -114,7 +115,7 @@ func TestForceLeaveCommand_prune(t *testing.T) {
t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String())
}
retry.Run(t, func(r *retry.R) {
m := len(a1.LANMembers())
m := len(a1.LANMembersInAgentPartition())
if m != 1 {
r.Fatalf("should have 1 members, got %#v", m)
}

@ -38,8 +38,8 @@ func TestJoinCommandJoin_lan(t *testing.T) {
t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String())
}
if len(a1.LANMembers()) != 2 {
t.Fatalf("bad: %#v", a1.LANMembers())
if len(a1.LANMembersInAgentPartition()) != 2 {
t.Fatalf("bad: %#v", a1.LANMembersInAgentPartition())
}
}

Loading…
Cancel
Save