mirror of https://github.com/hashicorp/consul
Removes remoteConsuls in favor of the new router.
This has the next wave of RTT integration with the router and also factors some common RTT-related helpers out to lib. While we were in here we also got rid of the coordinate disable config so we don't need to deal with the complexity in the router (there was never a user-visible way to disable coordinates).pull/2801/head
parent
dc1572d82b
commit
1091c7314e
|
@ -242,16 +242,6 @@ func TestAgent_Self(t *testing.T) {
|
|||
t.Fatalf("meta fields are not equal: %v != %v", meta, val.Meta)
|
||||
}
|
||||
|
||||
srv.agent.config.DisableCoordinates = true
|
||||
obj, err = srv.AgentSelf(nil, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
val = obj.(AgentSelf)
|
||||
if val.Coord != nil {
|
||||
t.Fatalf("should have been nil: %v", val.Coord)
|
||||
}
|
||||
|
||||
// Make sure there's nothing called "token" that's leaked.
|
||||
raw, err := srv.marshalJSON(req, obj)
|
||||
if err != nil {
|
||||
|
|
|
@ -77,7 +77,6 @@ func nextConfig() *Config {
|
|||
cons.RaftConfig.HeartbeatTimeout = 40 * time.Millisecond
|
||||
cons.RaftConfig.ElectionTimeout = 40 * time.Millisecond
|
||||
|
||||
cons.DisableCoordinates = false
|
||||
cons.CoordinateUpdatePeriod = 100 * time.Millisecond
|
||||
return conf
|
||||
}
|
||||
|
|
|
@ -31,7 +31,7 @@ Usage: consul rtt [options] node1 [node2]
|
|||
the datacenter (eg. "myserver.dc1").
|
||||
|
||||
It is not possible to measure between LAN coordinates and WAN coordinates
|
||||
because they are maintained by independent Serf gossip pools, so they are
|
||||
because they are maintained by independent Serf gossip areas, so they are
|
||||
not compatible.
|
||||
|
||||
` + c.Command.Help()
|
||||
|
|
|
@ -149,7 +149,7 @@ func (c *Catalog) Deregister(args *structs.DeregisterRequest, reply *struct{}) e
|
|||
|
||||
// ListDatacenters is used to query for the list of known datacenters
|
||||
func (c *Catalog) ListDatacenters(args *struct{}, reply *[]string) error {
|
||||
dcs, err := c.srv.getDatacentersByDistance()
|
||||
dcs, err := c.srv.router.GetDatacentersByDistance()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -873,9 +873,9 @@ func TestCatalog_ListNodes_DistanceSort(t *testing.T) {
|
|||
|
||||
// Set all but one of the nodes to known coordinates.
|
||||
updates := structs.Coordinates{
|
||||
{"foo", generateCoordinate(2 * time.Millisecond)},
|
||||
{"bar", generateCoordinate(5 * time.Millisecond)},
|
||||
{"baz", generateCoordinate(1 * time.Millisecond)},
|
||||
{"foo", lib.GenerateCoordinate(2 * time.Millisecond)},
|
||||
{"bar", lib.GenerateCoordinate(5 * time.Millisecond)},
|
||||
{"baz", lib.GenerateCoordinate(1 * time.Millisecond)},
|
||||
}
|
||||
if err := s1.fsm.State().CoordinateBatchUpdate(5, updates); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
|
@ -1467,9 +1467,9 @@ func TestCatalog_ListServiceNodes_DistanceSort(t *testing.T) {
|
|||
|
||||
// Set all but one of the nodes to known coordinates.
|
||||
updates := structs.Coordinates{
|
||||
{"foo", generateCoordinate(2 * time.Millisecond)},
|
||||
{"bar", generateCoordinate(5 * time.Millisecond)},
|
||||
{"baz", generateCoordinate(1 * time.Millisecond)},
|
||||
{"foo", lib.GenerateCoordinate(2 * time.Millisecond)},
|
||||
{"bar", lib.GenerateCoordinate(5 * time.Millisecond)},
|
||||
{"baz", lib.GenerateCoordinate(1 * time.Millisecond)},
|
||||
}
|
||||
if err := s1.fsm.State().CoordinateBatchUpdate(9, updates); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
|
|
|
@ -157,7 +157,6 @@ func (c *Client) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (
|
|||
conf.ProtocolVersion = protocolVersionMap[c.config.ProtocolVersion]
|
||||
conf.RejoinAfterLeave = c.config.RejoinAfterLeave
|
||||
conf.Merge = &lanMergeDelegate{dc: c.config.Datacenter}
|
||||
conf.DisableCoordinates = c.config.DisableCoordinates
|
||||
if err := lib.EnsurePath(conf.SnapshotPath, false); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -251,9 +251,6 @@ type Config struct {
|
|||
// user events. This function should not block.
|
||||
UserEventHandler func(serf.UserEvent)
|
||||
|
||||
// DisableCoordinates controls features related to network coordinates.
|
||||
DisableCoordinates bool
|
||||
|
||||
// CoordinateUpdatePeriod controls how long a server batches coordinate
|
||||
// updates before applying them in a Raft transaction. A larger period
|
||||
// leads to fewer Raft transactions, but also the stored coordinates
|
||||
|
@ -344,7 +341,6 @@ func DefaultConfig() *Config {
|
|||
TombstoneTTL: 15 * time.Minute,
|
||||
TombstoneTTLGranularity: 30 * time.Second,
|
||||
SessionTTLMin: 10 * time.Second,
|
||||
DisableCoordinates: false,
|
||||
|
||||
// These are tuned to provide a total throughput of 128 updates
|
||||
// per second. If you update these, you should update the client-
|
||||
|
|
|
@ -2,6 +2,7 @@ package consul
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -146,6 +147,15 @@ func (c *Coordinate) ListDatacenters(args *struct{}, reply *[]structs.Datacenter
|
|||
return err
|
||||
}
|
||||
|
||||
// Strip the datacenter suffixes from all the node names.
|
||||
for i := range maps {
|
||||
suffix := fmt.Sprintf(".%s", maps[i].Datacenter)
|
||||
for j := range maps[i].Coordinates {
|
||||
node := maps[i].Coordinates[j].Node
|
||||
maps[i].Coordinates[j].Node = strings.TrimSuffix(node, suffix)
|
||||
}
|
||||
}
|
||||
|
||||
*reply = maps
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -167,8 +167,8 @@ func TestHealth_ChecksInState_DistanceSort(t *testing.T) {
|
|||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
updates := structs.Coordinates{
|
||||
{"foo", generateCoordinate(1 * time.Millisecond)},
|
||||
{"bar", generateCoordinate(2 * time.Millisecond)},
|
||||
{"foo", lib.GenerateCoordinate(1 * time.Millisecond)},
|
||||
{"bar", lib.GenerateCoordinate(2 * time.Millisecond)},
|
||||
}
|
||||
if err := s1.fsm.State().CoordinateBatchUpdate(3, updates); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
|
@ -436,8 +436,8 @@ func TestHealth_ServiceChecks_DistanceSort(t *testing.T) {
|
|||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
updates := structs.Coordinates{
|
||||
{"foo", generateCoordinate(1 * time.Millisecond)},
|
||||
{"bar", generateCoordinate(2 * time.Millisecond)},
|
||||
{"foo", lib.GenerateCoordinate(1 * time.Millisecond)},
|
||||
{"bar", lib.GenerateCoordinate(2 * time.Millisecond)},
|
||||
}
|
||||
if err := s1.fsm.State().CoordinateBatchUpdate(3, updates); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
|
@ -737,8 +737,8 @@ func TestHealth_ServiceNodes_DistanceSort(t *testing.T) {
|
|||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
updates := structs.Coordinates{
|
||||
{"foo", generateCoordinate(1 * time.Millisecond)},
|
||||
{"bar", generateCoordinate(2 * time.Millisecond)},
|
||||
{"foo", lib.GenerateCoordinate(1 * time.Millisecond)},
|
||||
{"bar", lib.GenerateCoordinate(2 * time.Millisecond)},
|
||||
}
|
||||
if err := s1.fsm.State().CoordinateBatchUpdate(3, updates); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
|
|
|
@ -600,7 +600,9 @@ func (q *queryServerWrapper) GetLogger() *log.Logger {
|
|||
// GetOtherDatacentersByDistance calls into the server's fn and filters out the
|
||||
// server's own DC.
|
||||
func (q *queryServerWrapper) GetOtherDatacentersByDistance() ([]string, error) {
|
||||
dcs, err := q.srv.getDatacentersByDistance()
|
||||
// TODO (slackpad) - We should cache this result since it's expensive to
|
||||
// compute.
|
||||
dcs, err := q.srv.router.GetDatacentersByDistance()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
203
consul/rtt.go
203
consul/rtt.go
|
@ -2,24 +2,13 @@ package consul
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"sort"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/serf/coordinate"
|
||||
)
|
||||
|
||||
// computeDistance returns the distance between the two network coordinates in
|
||||
// seconds. If either of the coordinates is nil then this will return positive
|
||||
// infinity.
|
||||
func computeDistance(a *coordinate.Coordinate, b *coordinate.Coordinate) float64 {
|
||||
if a == nil || b == nil {
|
||||
return math.Inf(1.0)
|
||||
}
|
||||
|
||||
return a.DistanceTo(b).Seconds()
|
||||
}
|
||||
|
||||
// nodeSorter takes a list of nodes and a parallel vector of distances and
|
||||
// implements sort.Interface, keeping both structures coherent and sorting by
|
||||
// distance.
|
||||
|
@ -38,7 +27,7 @@ func (s *Server) newNodeSorter(c *coordinate.Coordinate, nodes structs.Nodes) (s
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
vec[i] = computeDistance(c, coord)
|
||||
vec[i] = lib.ComputeDistance(c, coord)
|
||||
}
|
||||
return &nodeSorter{nodes, vec}, nil
|
||||
}
|
||||
|
@ -77,7 +66,7 @@ func (s *Server) newServiceNodeSorter(c *coordinate.Coordinate, nodes structs.Se
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
vec[i] = computeDistance(c, coord)
|
||||
vec[i] = lib.ComputeDistance(c, coord)
|
||||
}
|
||||
return &serviceNodeSorter{nodes, vec}, nil
|
||||
}
|
||||
|
@ -116,7 +105,7 @@ func (s *Server) newHealthCheckSorter(c *coordinate.Coordinate, checks structs.H
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
vec[i] = computeDistance(c, coord)
|
||||
vec[i] = lib.ComputeDistance(c, coord)
|
||||
}
|
||||
return &healthCheckSorter{checks, vec}, nil
|
||||
}
|
||||
|
@ -155,7 +144,7 @@ func (s *Server) newCheckServiceNodeSorter(c *coordinate.Coordinate, nodes struc
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
vec[i] = computeDistance(c, coord)
|
||||
vec[i] = lib.ComputeDistance(c, coord)
|
||||
}
|
||||
return &checkServiceNodeSorter{nodes, vec}, nil
|
||||
}
|
||||
|
@ -198,12 +187,6 @@ func (s *Server) newSorterByDistanceFrom(c *coordinate.Coordinate, subj interfac
|
|||
//
|
||||
// If coordinates are disabled this will be a no-op.
|
||||
func (s *Server) sortNodesByDistanceFrom(source structs.QuerySource, subj interface{}) error {
|
||||
// Make it safe to call this without having to check if coordinates are
|
||||
// disabled first.
|
||||
if s.config.DisableCoordinates {
|
||||
return nil
|
||||
}
|
||||
|
||||
// We can't sort if there's no source node.
|
||||
if source.Node == "" {
|
||||
return nil
|
||||
|
@ -233,179 +216,3 @@ func (s *Server) sortNodesByDistanceFrom(source structs.QuerySource, subj interf
|
|||
sort.Stable(sorter)
|
||||
return nil
|
||||
}
|
||||
|
||||
// serfer provides the coordinate information we need from the Server in an
|
||||
// interface that's easy to mock out for testing. Without this, we'd have to
|
||||
// do some really painful setup to get good unit test coverage of all the cases.
|
||||
type serfer interface {
|
||||
GetDatacenter() string
|
||||
GetCoordinate() (*coordinate.Coordinate, error)
|
||||
GetCachedCoordinate(node string) (*coordinate.Coordinate, bool)
|
||||
GetNodesForDatacenter(dc string) []string
|
||||
}
|
||||
|
||||
// serverSerfer wraps a Server with the serfer interface.
|
||||
type serverSerfer struct {
|
||||
server *Server
|
||||
}
|
||||
|
||||
// See serfer.
|
||||
func (s *serverSerfer) GetDatacenter() string {
|
||||
return s.server.config.Datacenter
|
||||
}
|
||||
|
||||
// See serfer.
|
||||
func (s *serverSerfer) GetCoordinate() (*coordinate.Coordinate, error) {
|
||||
return s.server.serfWAN.GetCoordinate()
|
||||
}
|
||||
|
||||
// See serfer.
|
||||
func (s *serverSerfer) GetCachedCoordinate(node string) (*coordinate.Coordinate, bool) {
|
||||
return s.server.serfWAN.GetCachedCoordinate(node)
|
||||
}
|
||||
|
||||
// See serfer.
|
||||
func (s *serverSerfer) GetNodesForDatacenter(dc string) []string {
|
||||
s.server.remoteLock.RLock()
|
||||
defer s.server.remoteLock.RUnlock()
|
||||
|
||||
nodes := make([]string, 0)
|
||||
for _, part := range s.server.remoteConsuls[dc] {
|
||||
nodes = append(nodes, part.Name)
|
||||
}
|
||||
return nodes
|
||||
}
|
||||
|
||||
// getDatacenterDistance will return the median round trip time estimate for
|
||||
// the given DC from the given serfer, in seconds. This will return positive
|
||||
// infinity if no coordinates are available.
|
||||
func getDatacenterDistance(s serfer, dc string) (float64, error) {
|
||||
// If this is the serfer's DC then just bail with zero RTT.
|
||||
if dc == s.GetDatacenter() {
|
||||
return 0.0, nil
|
||||
}
|
||||
|
||||
// Otherwise measure from the serfer to the nodes in the other DC.
|
||||
coord, err := s.GetCoordinate()
|
||||
if err != nil {
|
||||
return 0.0, err
|
||||
}
|
||||
|
||||
// Fetch all the nodes in the DC and record their distance, if available.
|
||||
nodes := s.GetNodesForDatacenter(dc)
|
||||
subvec := make([]float64, 0, len(nodes))
|
||||
for _, node := range nodes {
|
||||
if other, ok := s.GetCachedCoordinate(node); ok {
|
||||
subvec = append(subvec, computeDistance(coord, other))
|
||||
}
|
||||
}
|
||||
|
||||
// Compute the median by sorting and taking the middle item.
|
||||
if len(subvec) > 0 {
|
||||
sort.Float64s(subvec)
|
||||
return subvec[len(subvec)/2], nil
|
||||
}
|
||||
|
||||
// Return the default infinity value.
|
||||
return computeDistance(coord, nil), nil
|
||||
}
|
||||
|
||||
// datacenterSorter takes a list of DC names and a parallel vector of distances
|
||||
// and implements sort.Interface, keeping both structures coherent and sorting
|
||||
// by distance.
|
||||
type datacenterSorter struct {
|
||||
Names []string
|
||||
Vec []float64
|
||||
}
|
||||
|
||||
// See sort.Interface.
|
||||
func (n *datacenterSorter) Len() int {
|
||||
return len(n.Names)
|
||||
}
|
||||
|
||||
// See sort.Interface.
|
||||
func (n *datacenterSorter) Swap(i, j int) {
|
||||
n.Names[i], n.Names[j] = n.Names[j], n.Names[i]
|
||||
n.Vec[i], n.Vec[j] = n.Vec[j], n.Vec[i]
|
||||
}
|
||||
|
||||
// See sort.Interface.
|
||||
func (n *datacenterSorter) Less(i, j int) bool {
|
||||
return n.Vec[i] < n.Vec[j]
|
||||
}
|
||||
|
||||
// sortDatacentersByDistance will sort the given list of DCs based on the
|
||||
// median RTT to all nodes the given serfer knows about from the WAN gossip
|
||||
// pool). DCs with missing coordinates will be stable sorted to the end of the
|
||||
// list.
|
||||
func sortDatacentersByDistance(s serfer, dcs []string) error {
|
||||
// Build up a list of median distances to the other DCs.
|
||||
vec := make([]float64, len(dcs))
|
||||
for i, dc := range dcs {
|
||||
rtt, err := getDatacenterDistance(s, dc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
vec[i] = rtt
|
||||
}
|
||||
|
||||
sorter := &datacenterSorter{dcs, vec}
|
||||
sort.Stable(sorter)
|
||||
return nil
|
||||
}
|
||||
|
||||
// getDatacenterMaps returns the raw coordinates of all the nodes in the
|
||||
// given list of DCs (the output list will preserve the incoming order).
|
||||
func (s *Server) getDatacenterMaps(dcs []string) []structs.DatacenterMap {
|
||||
serfer := serverSerfer{s}
|
||||
return getDatacenterMaps(&serfer, dcs)
|
||||
}
|
||||
|
||||
// getDatacenterMaps returns the raw coordinates of all the nodes in the
|
||||
// given list of DCs (the output list will preserve the incoming order).
|
||||
func getDatacenterMaps(s serfer, dcs []string) []structs.DatacenterMap {
|
||||
maps := make([]structs.DatacenterMap, 0, len(dcs))
|
||||
for _, dc := range dcs {
|
||||
m := structs.DatacenterMap{Datacenter: dc}
|
||||
nodes := s.GetNodesForDatacenter(dc)
|
||||
for _, node := range nodes {
|
||||
if coord, ok := s.GetCachedCoordinate(node); ok {
|
||||
entry := &structs.Coordinate{Node: node, Coord: coord}
|
||||
m.Coordinates = append(m.Coordinates, entry)
|
||||
}
|
||||
}
|
||||
maps = append(maps, m)
|
||||
}
|
||||
return maps
|
||||
}
|
||||
|
||||
// getDatacentersByDistance will return the list of DCs, sorted in order
|
||||
// of increasing distance based on the median distance to that DC from all
|
||||
// servers we know about in the WAN gossip pool. This will sort by name all
|
||||
// other things being equal (or if coordinates are disabled).
|
||||
func (s *Server) getDatacentersByDistance() ([]string, error) {
|
||||
s.remoteLock.RLock()
|
||||
dcs := make([]string, 0, len(s.remoteConsuls))
|
||||
for dc := range s.remoteConsuls {
|
||||
dcs = append(dcs, dc)
|
||||
}
|
||||
s.remoteLock.RUnlock()
|
||||
|
||||
// Sort by name first, since the coordinate sort is stable.
|
||||
sort.Strings(dcs)
|
||||
|
||||
// Make it safe to call this without having to check if coordinates are
|
||||
// disabled first.
|
||||
if s.config.DisableCoordinates {
|
||||
return dcs, nil
|
||||
}
|
||||
|
||||
// Do the sort!
|
||||
serfer := serverSerfer{s}
|
||||
if err := sortDatacentersByDistance(&serfer, dcs); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return dcs, nil
|
||||
}
|
||||
|
|
|
@ -2,29 +2,18 @@ package consul
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"net/rpc"
|
||||
"os"
|
||||
"sort"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/testutil"
|
||||
"github.com/hashicorp/net-rpc-msgpackrpc"
|
||||
"github.com/hashicorp/serf/coordinate"
|
||||
)
|
||||
|
||||
// generateCoordinate creates a new coordinate with the given distance from the
|
||||
// origin.
|
||||
func generateCoordinate(rtt time.Duration) *coordinate.Coordinate {
|
||||
coord := coordinate.NewCoordinate(coordinate.DefaultConfig())
|
||||
coord.Vec[0] = rtt.Seconds()
|
||||
coord.Height = 0
|
||||
return coord
|
||||
}
|
||||
|
||||
// verifyNodeSort makes sure the order of the nodes in the slice is the same as
|
||||
// the expected order, expressed as a comma-separated string.
|
||||
func verifyNodeSort(t *testing.T, nodes structs.Nodes, expected string) {
|
||||
|
@ -106,27 +95,27 @@ func seedCoordinates(t *testing.T, codec rpc.ClientCodec, server *Server) {
|
|||
structs.CoordinateUpdateRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "node1",
|
||||
Coord: generateCoordinate(10 * time.Millisecond),
|
||||
Coord: lib.GenerateCoordinate(10 * time.Millisecond),
|
||||
},
|
||||
structs.CoordinateUpdateRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "node2",
|
||||
Coord: generateCoordinate(2 * time.Millisecond),
|
||||
Coord: lib.GenerateCoordinate(2 * time.Millisecond),
|
||||
},
|
||||
structs.CoordinateUpdateRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "node3",
|
||||
Coord: generateCoordinate(1 * time.Millisecond),
|
||||
Coord: lib.GenerateCoordinate(1 * time.Millisecond),
|
||||
},
|
||||
structs.CoordinateUpdateRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "node4",
|
||||
Coord: generateCoordinate(8 * time.Millisecond),
|
||||
Coord: lib.GenerateCoordinate(8 * time.Millisecond),
|
||||
},
|
||||
structs.CoordinateUpdateRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "node5",
|
||||
Coord: generateCoordinate(3 * time.Millisecond),
|
||||
Coord: lib.GenerateCoordinate(3 * time.Millisecond),
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -183,19 +172,10 @@ func TestRTT_sortNodesByDistanceFrom(t *testing.T) {
|
|||
}
|
||||
verifyNodeSort(t, nodes, "apple,node1,node2,node3,node4,node5")
|
||||
|
||||
// Set source to legit values relative to node1 but disable coordinates.
|
||||
// Now sort relative to node1, note that apple doesn't have any seeded
|
||||
// coordinate info so it should end up at the end, despite its lexical
|
||||
// hegemony.
|
||||
source.Node = "node1"
|
||||
source.Datacenter = "dc1"
|
||||
server.config.DisableCoordinates = true
|
||||
if err := server.sortNodesByDistanceFrom(source, nodes); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
verifyNodeSort(t, nodes, "apple,node1,node2,node3,node4,node5")
|
||||
|
||||
// Now enable coordinates and sort relative to node1, note that apple
|
||||
// doesn't have any seeded coordinate info so it should end up at the
|
||||
// end, despite its lexical hegemony.
|
||||
server.config.DisableCoordinates = false
|
||||
if err := server.sortNodesByDistanceFrom(source, nodes); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
@ -398,6 +378,8 @@ func TestRTT_sortNodesByDistanceFrom_CheckServiceNodes(t *testing.T) {
|
|||
verifyCheckServiceNodeSort(t, nodes, "node2,node3,node5,node4,node1,apple")
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
// mockNodeMap is keyed by node name and the values are the coordinates of the
|
||||
// node.
|
||||
type mockNodeMap map[string]*coordinate.Coordinate
|
||||
|
@ -422,16 +404,16 @@ type mockServer map[string]mockNodeMap
|
|||
func newMockServer() *mockServer {
|
||||
s := make(mockServer)
|
||||
s["dc0"] = mockNodeMap{
|
||||
"dc0.node1": generateCoordinate(10 * time.Millisecond),
|
||||
"dc0.node1": lib.GenerateCoordinate(10 * time.Millisecond),
|
||||
}
|
||||
s["dc1"] = mockNodeMap{
|
||||
"dc1.node1": generateCoordinate(3 * time.Millisecond),
|
||||
"dc1.node2": generateCoordinate(2 * time.Millisecond),
|
||||
"dc1.node3": generateCoordinate(5 * time.Millisecond),
|
||||
"dc1.node1": lib.GenerateCoordinate(3 * time.Millisecond),
|
||||
"dc1.node2": lib.GenerateCoordinate(2 * time.Millisecond),
|
||||
"dc1.node3": lib.GenerateCoordinate(5 * time.Millisecond),
|
||||
"dc1.node4": nil, // no known coordinate
|
||||
}
|
||||
s["dc2"] = mockNodeMap{
|
||||
"dc2.node1": generateCoordinate(8 * time.Millisecond),
|
||||
"dc2.node1": lib.GenerateCoordinate(8 * time.Millisecond),
|
||||
}
|
||||
s["dcX"] = mockNodeMap{
|
||||
"dcX.node1": nil, // no known coordinate
|
||||
|
@ -548,7 +530,7 @@ func TestRTT_getDatacenterMaps(t *testing.T) {
|
|||
t.Fatalf("bad: %v", maps[0])
|
||||
}
|
||||
verifyCoordinatesEqual(t, maps[0].Coordinates[0].Coord,
|
||||
generateCoordinate(10*time.Millisecond))
|
||||
lib.GenerateCoordinate(10*time.Millisecond))
|
||||
|
||||
if maps[1].Datacenter != "acdc" || len(maps[1].Coordinates) != 0 {
|
||||
t.Fatalf("bad: %v", maps[1])
|
||||
|
@ -561,18 +543,18 @@ func TestRTT_getDatacenterMaps(t *testing.T) {
|
|||
t.Fatalf("bad: %v", maps[2])
|
||||
}
|
||||
verifyCoordinatesEqual(t, maps[2].Coordinates[0].Coord,
|
||||
generateCoordinate(3*time.Millisecond))
|
||||
lib.GenerateCoordinate(3*time.Millisecond))
|
||||
verifyCoordinatesEqual(t, maps[2].Coordinates[1].Coord,
|
||||
generateCoordinate(2*time.Millisecond))
|
||||
lib.GenerateCoordinate(2*time.Millisecond))
|
||||
verifyCoordinatesEqual(t, maps[2].Coordinates[2].Coord,
|
||||
generateCoordinate(5*time.Millisecond))
|
||||
lib.GenerateCoordinate(5*time.Millisecond))
|
||||
|
||||
if maps[3].Datacenter != "dc2" || len(maps[3].Coordinates) != 1 ||
|
||||
maps[3].Coordinates[0].Node != "dc2.node1" {
|
||||
t.Fatalf("bad: %v", maps[3])
|
||||
}
|
||||
verifyCoordinatesEqual(t, maps[3].Coordinates[0].Coord,
|
||||
generateCoordinate(8*time.Millisecond))
|
||||
lib.GenerateCoordinate(8*time.Millisecond))
|
||||
|
||||
if maps[4].Datacenter != "dcX" || len(maps[4].Coordinates) != 0 {
|
||||
t.Fatalf("bad: %v", maps[4])
|
||||
|
@ -646,3 +628,5 @@ func TestRTT_getDatacentersByDistance(t *testing.T) {
|
|||
t.Fatalf("bad: %v", dcs)
|
||||
}
|
||||
}
|
||||
|
||||
*/
|
||||
|
|
|
@ -137,11 +137,6 @@ type Server struct {
|
|||
// updated
|
||||
reconcileCh chan serf.Member
|
||||
|
||||
// remoteConsuls is used to track the known consuls in
|
||||
// remote datacenters. Used to do DC forwarding.
|
||||
remoteConsuls map[string][]*agent.Server
|
||||
remoteLock sync.RWMutex
|
||||
|
||||
// router is used to map out Consul servers in the WAN and in Consul
|
||||
// Enterprise user-defined areas.
|
||||
router *servers.Router
|
||||
|
@ -256,8 +251,7 @@ func NewServer(config *Config) (*Server, error) {
|
|||
localConsuls: make(map[raft.ServerAddress]*agent.Server),
|
||||
logger: logger,
|
||||
reconcileCh: make(chan serf.Member, 32),
|
||||
remoteConsuls: make(map[string][]*agent.Server, 4),
|
||||
router: servers.NewRouter(logger, shutdownCh),
|
||||
router: servers.NewRouter(logger, shutdownCh, config.Datacenter),
|
||||
rpcServer: rpc.NewServer(),
|
||||
rpcTLS: incomingTLS,
|
||||
tombstoneGC: gc,
|
||||
|
@ -386,9 +380,6 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w
|
|||
return nil, err
|
||||
}
|
||||
|
||||
// Plumb down the enable coordinates flag.
|
||||
conf.DisableCoordinates = s.config.DisableCoordinates
|
||||
|
||||
return serf.Create(conf)
|
||||
}
|
||||
|
||||
|
|
|
@ -73,7 +73,6 @@ func testServerConfig(t *testing.T, NodeName string) (string, *Config) {
|
|||
|
||||
config.ReconcileInterval = 100 * time.Millisecond
|
||||
|
||||
config.DisableCoordinates = false
|
||||
config.CoordinateUpdatePeriod = 100 * time.Millisecond
|
||||
return dir, config
|
||||
}
|
||||
|
@ -214,13 +213,13 @@ func TestServer_JoinWAN(t *testing.T) {
|
|||
t.Fatalf("bad len")
|
||||
})
|
||||
|
||||
// Check the remoteConsuls has both
|
||||
if len(s1.remoteConsuls) != 2 {
|
||||
// Check the router has both
|
||||
if len(s1.router.GetDatacenters()) != 2 {
|
||||
t.Fatalf("remote consul missing")
|
||||
}
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
return len(s2.remoteConsuls) == 2, nil
|
||||
return len(s2.router.GetDatacenters()) == 2, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("remote consul missing")
|
||||
})
|
||||
|
@ -289,12 +288,12 @@ func TestServer_JoinSeparateLanAndWanAddresses(t *testing.T) {
|
|||
t.Fatalf("bad len")
|
||||
})
|
||||
|
||||
// Check the remoteConsuls has both
|
||||
if len(s1.remoteConsuls) != 2 {
|
||||
// Check the router has both
|
||||
if len(s1.router.GetDatacenters()) != 2 {
|
||||
t.Fatalf("remote consul missing")
|
||||
}
|
||||
|
||||
if len(s2.remoteConsuls) != 2 {
|
||||
if len(s2.router.GetDatacenters()) != 2 {
|
||||
t.Fatalf("remote consul missing")
|
||||
}
|
||||
|
||||
|
@ -693,9 +692,9 @@ func TestServer_globalRPCErrors(t *testing.T) {
|
|||
defer s1.Shutdown()
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
return len(s1.remoteConsuls) == 1, nil
|
||||
return len(s1.router.GetDatacenters()) == 1, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("Server did not join LAN successfully")
|
||||
t.Fatalf("Server did not join WAN successfully")
|
||||
})
|
||||
|
||||
// Check that an error from a remote DC is returned
|
||||
|
|
|
@ -3,10 +3,12 @@ package servers
|
|||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
"github.com/hashicorp/consul/consul/agent"
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/types"
|
||||
"github.com/hashicorp/serf/coordinate"
|
||||
"github.com/hashicorp/serf/serf"
|
||||
|
@ -15,8 +17,9 @@ import (
|
|||
type Router struct {
|
||||
logger *log.Logger
|
||||
|
||||
areas map[types.AreaID]*areaInfo
|
||||
managers map[string][]*Manager
|
||||
localDatacenter string
|
||||
areas map[types.AreaID]*areaInfo
|
||||
managers map[string][]*Manager
|
||||
|
||||
// This top-level lock covers all the internal state.
|
||||
sync.RWMutex
|
||||
|
@ -42,11 +45,12 @@ type areaInfo struct {
|
|||
managers map[string]*managerInfo
|
||||
}
|
||||
|
||||
func NewRouter(logger *log.Logger, shutdownCh chan struct{}) *Router {
|
||||
func NewRouter(logger *log.Logger, shutdownCh chan struct{}, localDatacenter string) *Router {
|
||||
router := &Router{
|
||||
logger: logger,
|
||||
areas: make(map[types.AreaID]*areaInfo),
|
||||
managers: make(map[string][]*Manager),
|
||||
logger: logger,
|
||||
localDatacenter: localDatacenter,
|
||||
areas: make(map[types.AreaID]*areaInfo),
|
||||
managers: make(map[string][]*Manager),
|
||||
}
|
||||
|
||||
// This will propagate a top-level shutdown to all the managers.
|
||||
|
@ -203,9 +207,105 @@ func (r *Router) GetDatacenters() []string {
|
|||
for dc, _ := range r.managers {
|
||||
dcs = append(dcs, dc)
|
||||
}
|
||||
|
||||
sort.Strings(dcs)
|
||||
return dcs
|
||||
}
|
||||
|
||||
// datacenterSorter takes a list of DC names and a parallel vector of distances
|
||||
// and implements sort.Interface, keeping both structures coherent and sorting
|
||||
// by distance.
|
||||
type datacenterSorter struct {
|
||||
Names []string
|
||||
Vec []float64
|
||||
}
|
||||
|
||||
// See sort.Interface.
|
||||
func (n *datacenterSorter) Len() int {
|
||||
return len(n.Names)
|
||||
}
|
||||
|
||||
// See sort.Interface.
|
||||
func (n *datacenterSorter) Swap(i, j int) {
|
||||
n.Names[i], n.Names[j] = n.Names[j], n.Names[i]
|
||||
n.Vec[i], n.Vec[j] = n.Vec[j], n.Vec[i]
|
||||
}
|
||||
|
||||
// See sort.Interface.
|
||||
func (n *datacenterSorter) Less(i, j int) bool {
|
||||
return n.Vec[i] < n.Vec[j]
|
||||
}
|
||||
|
||||
func (r *Router) GetDatacentersByDistance() ([]string, error) {
|
||||
r.RLock()
|
||||
defer r.RUnlock()
|
||||
|
||||
// Calculate a median RTT to the servers in each datacenter, by area.
|
||||
dcs := make(map[string]float64)
|
||||
for areaID, info := range r.areas {
|
||||
index := make(map[string][]float64)
|
||||
coord, err := info.cluster.GetCoordinate()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, m := range info.cluster.Members() {
|
||||
ok, parts := agent.IsConsulServer(m)
|
||||
if !ok {
|
||||
r.logger.Printf("[WARN]: consul: Non-server %q in server-only area %q",
|
||||
m.Name, areaID)
|
||||
continue
|
||||
}
|
||||
|
||||
existing := index[parts.Datacenter]
|
||||
if parts.Datacenter == r.localDatacenter {
|
||||
// Everything in the local datacenter looks like zero RTT.
|
||||
index[parts.Datacenter] = append(existing, 0.0)
|
||||
} else {
|
||||
// It's OK to get a nil coordinate back, ComputeDistance
|
||||
// will put the RTT at positive infinity.
|
||||
other, _ := info.cluster.GetCachedCoordinate(parts.Name)
|
||||
rtt := lib.ComputeDistance(coord, other)
|
||||
index[parts.Datacenter] = append(existing, rtt)
|
||||
}
|
||||
}
|
||||
|
||||
// Compute the median RTT between this server and the servers
|
||||
// in each datacenter. We accumulate the lowest RTT to each DC
|
||||
// in the master map, since a given DC might appear in multiple
|
||||
// areas.
|
||||
for dc, rtts := range index {
|
||||
var rtt float64
|
||||
if len(rtts) > 0 {
|
||||
sort.Float64s(rtts)
|
||||
rtt = rtts[len(rtts)/2]
|
||||
} else {
|
||||
rtt = lib.ComputeDistance(coord, nil)
|
||||
}
|
||||
|
||||
current, ok := dcs[dc]
|
||||
if !ok || (ok && rtt < current) {
|
||||
dcs[dc] = rtt
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// First sort by DC name, since we do a stable sort later.
|
||||
names := make([]string, 0, len(dcs))
|
||||
for dc, _ := range dcs {
|
||||
names = append(names, dc)
|
||||
}
|
||||
sort.Strings(names)
|
||||
|
||||
// Then stable sort by median RTT.
|
||||
vec := make([]float64, 0, len(dcs))
|
||||
for _, dc := range names {
|
||||
vec = append(vec, dcs[dc])
|
||||
}
|
||||
sort.Stable(&datacenterSorter{names, vec})
|
||||
return names, nil
|
||||
}
|
||||
|
||||
func (r *Router) GetDatacenterMaps() ([]structs.DatacenterMap, error) {
|
||||
r.RLock()
|
||||
defer r.RUnlock()
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
package lib
|
||||
|
||||
import (
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/serf/coordinate"
|
||||
)
|
||||
|
||||
// ComputeDistance returns the distance between the two network coordinates in
|
||||
// seconds. If either of the coordinates is nil then this will return positive
|
||||
// infinity.
|
||||
func ComputeDistance(a *coordinate.Coordinate, b *coordinate.Coordinate) float64 {
|
||||
if a == nil || b == nil {
|
||||
return math.Inf(1.0)
|
||||
}
|
||||
|
||||
return a.DistanceTo(b).Seconds()
|
||||
}
|
||||
|
||||
// GenerateCoordinate creates a new coordinate with the given distance from the
|
||||
// origin. This should only be used for tests.
|
||||
func GenerateCoordinate(rtt time.Duration) *coordinate.Coordinate {
|
||||
coord := coordinate.NewCoordinate(coordinate.DefaultConfig())
|
||||
coord.Vec[0] = rtt.Seconds()
|
||||
coord.Height = 0
|
||||
return coord
|
||||
}
|
|
@ -6,4 +6,4 @@ type AreaID string
|
|||
|
||||
// This represents the existing WAN area that's built in to Consul. Consul
|
||||
// Enterprise generalizes areas, which are represented with UUIDs.
|
||||
const AreaWAN AreaID = "WAN"
|
||||
const AreaWAN AreaID = "wan"
|
||||
|
|
|
@ -35,7 +35,7 @@ Consul as the `consul members` command would show, not IP addresses.
|
|||
datacenter and the LAN coordinates are used. If the -wan option is given,
|
||||
then the WAN coordinates are used, and the node names must be suffixed by a period
|
||||
and the datacenter (eg. "myserver.dc1"). It is not possible to measure between
|
||||
LAN coordinates and WAN coordinates, so both nodes must be in the same pool.
|
||||
LAN coordinates and WAN coordinates, so both nodes must be in the same area.
|
||||
|
||||
The following environment variables control accessing the HTTP server via SSL:
|
||||
|
||||
|
|
Loading…
Reference in New Issue