Adds open source side of network segments (feature is Enterprise-only).

pull/3431/head
James Phillips 2017-08-14 07:36:07 -07:00 committed by Kyle Havlovitz
parent 9ef2156195
commit b1a15e0c3d
No known key found for this signature in database
GPG Key ID: 8A5E6B173056AD6C
44 changed files with 1089 additions and 452 deletions

View File

@ -58,7 +58,7 @@ cov:
test: dev-build vet test: dev-build vet
go test -tags '$(GOTAGS)' -i ./... go test -tags '$(GOTAGS)' -i ./...
go test $(GOTEST_FLAGS) -tags '$(GOTAGS)' -timeout 7m -v ./... 2>&1 >test$(GOTEST_FLAGS).log ; echo $$? > exit-code go test $(GOTEST_FLAGS) -tags '$(GOTAGS)' -timeout 7m -v ./... 2>&1 >test.log ; echo $$? > exit-code
@echo "Exit code: `cat exit-code`" >> test$(GOTEST_FLAGS).log @echo "Exit code: `cat exit-code`" >> test$(GOTEST_FLAGS).log
@echo "----" @echo "----"
@grep -A5 'DATA RACE' test.log || true @grep -A5 'DATA RACE' test.log || true

View File

@ -32,7 +32,6 @@ import (
"github.com/hashicorp/consul/watch" "github.com/hashicorp/consul/watch"
"github.com/hashicorp/go-uuid" "github.com/hashicorp/go-uuid"
"github.com/hashicorp/raft" "github.com/hashicorp/raft"
"github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/serf" "github.com/hashicorp/serf/serf"
"github.com/shirou/gopsutil/host" "github.com/shirou/gopsutil/host"
) )
@ -56,9 +55,10 @@ const (
// consul.Client and consul.Server. // consul.Client and consul.Server.
type delegate interface { type delegate interface {
Encrypted() bool Encrypted() bool
GetLANCoordinate() (*coordinate.Coordinate, error) GetLANCoordinate() (lib.CoordinateSet, error)
Leave() error Leave() error
LANMembers() []serf.Member LANMembers() []serf.Member
LANSegmentMembers(name string) ([]serf.Member, error)
LocalMember() serf.Member LocalMember() serf.Member
JoinLAN(addrs []string) (n int, err error) JoinLAN(addrs []string) (n int, err error)
RemoveFailedNode(node string) error RemoveFailedNode(node string) error
@ -647,6 +647,32 @@ func (a *Agent) consulConfig() (*consul.Config, error) {
if a.config.AdvertiseAddrs.RPC != nil { if a.config.AdvertiseAddrs.RPC != nil {
base.RPCAdvertise = a.config.AdvertiseAddrs.RPC base.RPCAdvertise = a.config.AdvertiseAddrs.RPC
} }
base.Segment = a.config.Segment
for _, segment := range a.config.Segments {
config := consul.DefaultConfig().SerfLANConfig
config.MemberlistConfig.AdvertiseAddr = segment.Advertise
config.MemberlistConfig.AdvertisePort = segment.Port
config.MemberlistConfig.BindAddr = segment.Bind
config.MemberlistConfig.BindPort = segment.Port
if a.config.ReconnectTimeoutLan != 0 {
config.ReconnectTimeout = a.config.ReconnectTimeoutLan
}
if a.config.EncryptVerifyIncoming != nil {
config.MemberlistConfig.GossipVerifyIncoming = *a.config.EncryptVerifyIncoming
}
if a.config.EncryptVerifyOutgoing != nil {
config.MemberlistConfig.GossipVerifyOutgoing = *a.config.EncryptVerifyOutgoing
}
base.Segments = append(base.Segments, consul.NetworkSegment{
Name: segment.Name,
Bind: segment.Bind,
Port: segment.Port,
Advertise: segment.Advertise,
SerfConfig: config,
})
}
if a.config.Bootstrap { if a.config.Bootstrap {
base.Bootstrap = true base.Bootstrap = true
} }
@ -1154,15 +1180,16 @@ func (a *Agent) ResumeSync() {
a.state.Resume() a.state.Resume()
} }
// GetLANCoordinate returns the coordinate of this node in the local pool (assumes coordinates // GetLANCoordinate returns the coordinates of this node in the local pools
// are enabled, so check that before calling). // (assumes coordinates are enabled, so check that before calling).
func (a *Agent) GetLANCoordinate() (*coordinate.Coordinate, error) { func (a *Agent) GetLANCoordinate() (lib.CoordinateSet, error) {
return a.delegate.GetLANCoordinate() return a.delegate.GetLANCoordinate()
} }
// sendCoordinate is a long-running loop that periodically sends our coordinate // sendCoordinate is a long-running loop that periodically sends our coordinate
// to the server. Closing the agent's shutdownChannel will cause this to exit. // to the server. Closing the agent's shutdownChannel will cause this to exit.
func (a *Agent) sendCoordinate() { func (a *Agent) sendCoordinate() {
OUTER:
for { for {
rate := a.config.SyncCoordinateRateTarget rate := a.config.SyncCoordinateRateTarget
min := a.config.SyncCoordinateIntervalMin min := a.config.SyncCoordinateIntervalMin
@ -1182,16 +1209,18 @@ func (a *Agent) sendCoordinate() {
continue continue
} }
c, err := a.GetLANCoordinate() cs, err := a.GetLANCoordinate()
if err != nil { if err != nil {
a.logger.Printf("[ERR] agent: Failed to get coordinate: %s", err) a.logger.Printf("[ERR] agent: Failed to get coordinate: %s", err)
continue continue
} }
for segment, coord := range cs {
req := structs.CoordinateUpdateRequest{ req := structs.CoordinateUpdateRequest{
Datacenter: a.config.Datacenter, Datacenter: a.config.Datacenter,
Node: a.config.NodeName, Node: a.config.NodeName,
Coord: c, Segment: segment,
Coord: coord,
WriteRequest: structs.WriteRequest{Token: a.tokens.AgentToken()}, WriteRequest: structs.WriteRequest{Token: a.tokens.AgentToken()},
} }
var reply struct{} var reply struct{}
@ -1201,7 +1230,8 @@ func (a *Agent) sendCoordinate() {
} else { } else {
a.logger.Printf("[ERR] agent: Coordinate update error: %v", err) a.logger.Printf("[ERR] agent: Coordinate update error: %v", err)
} }
continue continue OUTER
}
} }
case <-a.shutdownCh: case <-a.shutdownCh:
return return
@ -2105,6 +2135,11 @@ func (a *Agent) loadMetadata(conf *Config) error {
a.state.metadata[key] = value a.state.metadata[key] = value
} }
// The segment isn't reloadable so we only add it once.
if _, ok := a.state.metadata[structs.MetaSegmentKey]; !ok {
a.state.metadata[structs.MetaSegmentKey] = conf.Segment
}
a.state.changeMade() a.state.changeMade()
return nil return nil

View File

@ -11,6 +11,7 @@ import (
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/ipaddr" "github.com/hashicorp/consul/ipaddr"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/logger" "github.com/hashicorp/consul/logger"
"github.com/hashicorp/consul/types" "github.com/hashicorp/consul/types"
"github.com/hashicorp/logutils" "github.com/hashicorp/logutils"
@ -27,10 +28,10 @@ type Self struct {
} }
func (s *HTTPServer) AgentSelf(resp http.ResponseWriter, req *http.Request) (interface{}, error) { func (s *HTTPServer) AgentSelf(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var c *coordinate.Coordinate var cs lib.CoordinateSet
if !s.agent.config.DisableCoordinates { if !s.agent.config.DisableCoordinates {
var err error var err error
if c, err = s.agent.GetLANCoordinate(); err != nil { if cs, err = s.agent.GetLANCoordinate(); err != nil {
return nil, err return nil, err
} }
} }
@ -48,7 +49,7 @@ func (s *HTTPServer) AgentSelf(resp http.ResponseWriter, req *http.Request) (int
return Self{ return Self{
Config: s.agent.config, Config: s.agent.config,
Coord: c, Coord: cs[s.agent.config.Segment],
Member: s.agent.LocalMember(), Member: s.agent.LocalMember(),
Stats: s.agent.Stats(), Stats: s.agent.Stats(),
Meta: s.agent.state.Metadata(), Meta: s.agent.state.Metadata(),
@ -155,11 +156,24 @@ func (s *HTTPServer) AgentMembers(resp http.ResponseWriter, req *http.Request) (
wan = true wan = true
} }
segment := req.URL.Query().Get("segment")
if wan && segment != "" {
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprint(resp, "Cannot provide a segment with wan=true")
return nil, nil
}
var members []serf.Member var members []serf.Member
if wan { if wan {
members = s.agent.WANMembers() members = s.agent.WANMembers()
} else { } else if segment == "" {
members = s.agent.LANMembers() members = s.agent.LANMembers()
} else {
var err error
members, err = s.agent.delegate.LANSegmentMembers(segment)
if err != nil {
return nil, err
}
} }
if err := s.agent.filterMembers(token, &members); err != nil { if err := s.agent.filterMembers(token, &members); err != nil {
return nil, err return nil, err

View File

@ -191,13 +191,14 @@ func TestAgent_Self(t *testing.T) {
t.Fatalf("incorrect port: %v", obj) t.Fatalf("incorrect port: %v", obj)
} }
c, err := a.GetLANCoordinate() cs, err := a.GetLANCoordinate()
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if !reflect.DeepEqual(c, val.Coord) { if c := cs[cfg.Segment]; !reflect.DeepEqual(c, val.Coord) {
t.Fatalf("coordinates are not equal: %v != %v", c, val.Coord) t.Fatalf("coordinates are not equal: %v != %v", c, val.Coord)
} }
delete(val.Meta, structs.MetaSegmentKey) // Added later, not in config.
if !reflect.DeepEqual(cfg.Meta, val.Meta) { if !reflect.DeepEqual(cfg.Meta, val.Meta) {
t.Fatalf("meta fields are not equal: %v != %v", cfg.Meta, val.Meta) t.Fatalf("meta fields are not equal: %v != %v", cfg.Meta, val.Meta)
} }

View File

@ -342,6 +342,22 @@ type Autopilot struct {
UpgradeVersionTag string `mapstructure:"upgrade_version_tag"` UpgradeVersionTag string `mapstructure:"upgrade_version_tag"`
} }
// (Enterprise-only) NetworkSegment is the configuration for a network segment, which is an
// isolated serf group on the LAN.
type NetworkSegment struct {
// Name is the name of the segment.
Name string `mapstructure:"name"`
// Bind is the bind address for this segment.
Bind string `mapstructure:"bind"`
// Port is the port for this segment.
Port int `mapstructure:"port"`
// Advertise is the advertise address of this segment.
Advertise string `mapstructure:"advertise"`
}
// Config is the configuration that can be set for an Agent. // Config is the configuration that can be set for an Agent.
// Some of this is configurable as CLI flags, but most must // Some of this is configurable as CLI flags, but most must
// be set using a configuration file. // be set using a configuration file.
@ -465,6 +481,12 @@ type Config struct {
// Address configurations // Address configurations
Addresses AddressConfig Addresses AddressConfig
// (Enterprise-only) NetworkSegment is the network segment for this client to join
Segment string `mapstructure:"segment"`
// Segments
Segments []NetworkSegment `mapstructure:"segments"`
// Tagged addresses. These are used to publish a set of addresses for // Tagged addresses. These are used to publish a set of addresses for
// for a node, which can be used by the remote agent. We currently // for a node, which can be used by the remote agent. We currently
// populate only the "wan" tag based on the SerfWan advertise address, // populate only the "wan" tag based on the SerfWan advertise address,
@ -1426,6 +1448,11 @@ func DecodeConfig(r io.Reader) (*Config, error) {
} }
} }
// Validate node meta fields
if err := structs.ValidateMetadata(result.Meta); err != nil {
return nil, fmt.Errorf("Failed to parse node metadata: %v", err)
}
return &result, nil return &result, nil
} }
@ -1861,6 +1888,12 @@ func MergeConfig(a, b *Config) *Config {
if b.Addresses.RPC != "" { if b.Addresses.RPC != "" {
result.Addresses.RPC = b.Addresses.RPC result.Addresses.RPC = b.Addresses.RPC
} }
if b.Segment != "" {
result.Segment = b.Segment
}
if len(b.Segments) > 0 {
result.Segments = append(result.Segments, b.Segments...)
}
if b.EnableUI { if b.EnableUI {
result.EnableUI = true result.EnableUI = true
} }
@ -2204,6 +2237,11 @@ func (c *Config) ResolveTmplAddrs() (err error) {
parse(&c.ClientAddr, true, "Client address") parse(&c.ClientAddr, true, "Client address")
parse(&c.SerfLanBindAddr, false, "Serf LAN address") parse(&c.SerfLanBindAddr, false, "Serf LAN address")
parse(&c.SerfWanBindAddr, false, "Serf WAN address") parse(&c.SerfWanBindAddr, false, "Serf WAN address")
for i, segment := range c.Segments {
parse(&c.Segments[i].Bind, false, fmt.Sprintf("Segment %q bind address", segment.Name))
parse(&c.Segments[i].Advertise, false, fmt.Sprintf("Segment %q advertise address", segment.Name))
}
return return
} }

View File

@ -592,6 +592,14 @@ func TestDecodeConfig(t *testing.T) {
in: `{"retry_max_wan":123}`, in: `{"retry_max_wan":123}`,
c: &Config{RetryMaxAttemptsWan: 123}, c: &Config{RetryMaxAttemptsWan: 123},
}, },
{
in: `{"segment":"thing"}`,
c: &Config{Segment: "thing"},
},
{
in: `{"segments":[{"name": "alpha", "bind": "127.0.0.1", "port": 1234, "advertise": "1.1.1.1"}]}`,
c: &Config{Segments: []NetworkSegment{{Name: "alpha", Bind: "127.0.0.1", Port: 1234, Advertise: "1.1.1.1"}}},
},
{ {
in: `{"serf_lan_bind":"1.2.3.4"}`, in: `{"serf_lan_bind":"1.2.3.4"}`,
c: &Config{SerfLanBindAddr: "1.2.3.4"}, c: &Config{SerfLanBindAddr: "1.2.3.4"},
@ -1401,6 +1409,15 @@ func TestMergeConfig(t *testing.T) {
HTTP: "127.0.0.2", HTTP: "127.0.0.2",
HTTPS: "127.0.0.4", HTTPS: "127.0.0.4",
}, },
Segment: "alpha",
Segments: []NetworkSegment{
{
Name: "alpha",
Bind: "127.0.0.1",
Port: 1234,
Advertise: "127.0.0.2",
},
},
Server: true, Server: true,
LeaveOnTerm: Bool(true), LeaveOnTerm: Bool(true),
SkipLeaveOnInt: Bool(true), SkipLeaveOnInt: Bool(true),

View File

@ -890,9 +890,9 @@ func TestCatalog_ListNodes_DistanceSort(t *testing.T) {
// Set all but one of the nodes to known coordinates. // Set all but one of the nodes to known coordinates.
updates := structs.Coordinates{ updates := structs.Coordinates{
{"foo", lib.GenerateCoordinate(2 * time.Millisecond)}, {Node: "foo", Coord: lib.GenerateCoordinate(2 * time.Millisecond)},
{"bar", lib.GenerateCoordinate(5 * time.Millisecond)}, {Node: "bar", Coord: lib.GenerateCoordinate(5 * time.Millisecond)},
{"baz", lib.GenerateCoordinate(1 * time.Millisecond)}, {Node: "baz", Coord: lib.GenerateCoordinate(1 * time.Millisecond)},
} }
if err := s1.fsm.State().CoordinateBatchUpdate(5, updates); err != nil { if err := s1.fsm.State().CoordinateBatchUpdate(5, updates); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
@ -1495,9 +1495,9 @@ func TestCatalog_ListServiceNodes_DistanceSort(t *testing.T) {
// Set all but one of the nodes to known coordinates. // Set all but one of the nodes to known coordinates.
updates := structs.Coordinates{ updates := structs.Coordinates{
{"foo", lib.GenerateCoordinate(2 * time.Millisecond)}, {Node: "foo", Coord: lib.GenerateCoordinate(2 * time.Millisecond)},
{"bar", lib.GenerateCoordinate(5 * time.Millisecond)}, {Node: "bar", Coord: lib.GenerateCoordinate(5 * time.Millisecond)},
{"baz", lib.GenerateCoordinate(1 * time.Millisecond)}, {Node: "baz", Coord: lib.GenerateCoordinate(1 * time.Millisecond)},
} }
if err := s1.fsm.State().CoordinateBatchUpdate(9, updates); err != nil { if err := s1.fsm.State().CoordinateBatchUpdate(9, updates); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)

View File

@ -5,18 +5,14 @@ import (
"io" "io"
"log" "log"
"os" "os"
"path/filepath"
"strconv" "strconv"
"strings"
"sync" "sync"
"time" "time"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/router" "github.com/hashicorp/consul/agent/router"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/lib"
"github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/serf" "github.com/hashicorp/serf/serf"
) )
@ -146,35 +142,6 @@ func NewClientLogger(config *Config, logger *log.Logger) (*Client, error) {
return c, nil return c, nil
} }
// setupSerf is used to setup and initialize a Serf
func (c *Client) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (*serf.Serf, error) {
conf.Init()
conf.NodeName = c.config.NodeName
conf.Tags["role"] = "node"
conf.Tags["dc"] = c.config.Datacenter
conf.Tags["id"] = string(c.config.NodeID)
conf.Tags["vsn"] = fmt.Sprintf("%d", c.config.ProtocolVersion)
conf.Tags["vsn_min"] = fmt.Sprintf("%d", ProtocolVersionMin)
conf.Tags["vsn_max"] = fmt.Sprintf("%d", ProtocolVersionMax)
conf.Tags["build"] = c.config.Build
conf.MemberlistConfig.LogOutput = c.config.LogOutput
conf.LogOutput = c.config.LogOutput
conf.Logger = c.logger
conf.EventCh = ch
conf.SnapshotPath = filepath.Join(c.config.DataDir, path)
conf.ProtocolVersion = protocolVersionMap[c.config.ProtocolVersion]
conf.RejoinAfterLeave = c.config.RejoinAfterLeave
conf.Merge = &lanMergeDelegate{
dc: c.config.Datacenter,
nodeID: c.config.NodeID,
nodeName: c.config.NodeName,
}
if err := lib.EnsurePath(conf.SnapshotPath, false); err != nil {
return nil, err
}
return serf.Create(conf)
}
// Shutdown is used to shutdown the client // Shutdown is used to shutdown the client
func (c *Client) Shutdown() error { func (c *Client) Shutdown() error {
c.logger.Printf("[INFO] consul: shutting down client") c.logger.Printf("[INFO] consul: shutting down client")
@ -227,6 +194,16 @@ func (c *Client) LANMembers() []serf.Member {
return c.serf.Members() return c.serf.Members()
} }
// LANSegmentMembers only returns our own segment's members, because clients
// can't be in multiple segments.
func (c *Client) LANSegmentMembers(name string) ([]serf.Member, error) {
if name == c.config.Segment {
return c.LANMembers(), nil
}
return nil, fmt.Errorf("segment %q not found", name)
}
// RemoveFailedNode is used to remove a failed node from the cluster // RemoveFailedNode is used to remove a failed node from the cluster
func (c *Client) RemoveFailedNode(node string) error { func (c *Client) RemoveFailedNode(node string) error {
return c.serf.RemoveFailedNode(node) return c.serf.RemoveFailedNode(node)
@ -242,98 +219,6 @@ func (c *Client) Encrypted() bool {
return c.serf.EncryptionEnabled() return c.serf.EncryptionEnabled()
} }
// lanEventHandler is used to handle events from the lan Serf cluster
func (c *Client) lanEventHandler() {
var numQueuedEvents int
for {
numQueuedEvents = len(c.eventCh)
if numQueuedEvents > serfEventBacklogWarning {
c.logger.Printf("[WARN] consul: number of queued serf events above warning threshold: %d/%d", numQueuedEvents, serfEventBacklogWarning)
}
select {
case e := <-c.eventCh:
switch e.EventType() {
case serf.EventMemberJoin:
c.nodeJoin(e.(serf.MemberEvent))
case serf.EventMemberLeave, serf.EventMemberFailed:
c.nodeFail(e.(serf.MemberEvent))
case serf.EventUser:
c.localEvent(e.(serf.UserEvent))
case serf.EventMemberUpdate: // Ignore
case serf.EventMemberReap: // Ignore
case serf.EventQuery: // Ignore
default:
c.logger.Printf("[WARN] consul: unhandled LAN Serf Event: %#v", e)
}
case <-c.shutdownCh:
return
}
}
}
// nodeJoin is used to handle join events on the serf cluster
func (c *Client) nodeJoin(me serf.MemberEvent) {
for _, m := range me.Members {
ok, parts := metadata.IsConsulServer(m)
if !ok {
continue
}
if parts.Datacenter != c.config.Datacenter {
c.logger.Printf("[WARN] consul: server %s for datacenter %s has joined wrong cluster",
m.Name, parts.Datacenter)
continue
}
c.logger.Printf("[INFO] consul: adding server %s", parts)
c.routers.AddServer(parts)
// Trigger the callback
if c.config.ServerUp != nil {
c.config.ServerUp()
}
}
}
// nodeFail is used to handle fail events on the serf cluster
func (c *Client) nodeFail(me serf.MemberEvent) {
for _, m := range me.Members {
ok, parts := metadata.IsConsulServer(m)
if !ok {
continue
}
c.logger.Printf("[INFO] consul: removing server %s", parts)
c.routers.RemoveServer(parts)
}
}
// localEvent is called when we receive an event on the local Serf
func (c *Client) localEvent(event serf.UserEvent) {
// Handle only consul events
if !strings.HasPrefix(event.Name, "consul:") {
return
}
switch name := event.Name; {
case name == newLeaderEvent:
c.logger.Printf("[INFO] consul: New leader elected: %s", event.Payload)
// Trigger the callback
if c.config.ServerUp != nil {
c.config.ServerUp()
}
case isUserEvent(name):
event.Name = rawUserEventName(name)
c.logger.Printf("[DEBUG] consul: user event: %s", event.Name)
// Trigger the callback
if c.config.UserEventHandler != nil {
c.config.UserEventHandler(event)
}
default:
c.logger.Printf("[WARN] consul: Unhandled local event: %v", event)
}
}
// RPC is used to forward an RPC call to a consul server, or fail if no servers // RPC is used to forward an RPC call to a consul server, or fail if no servers
func (c *Client) RPC(method string, args interface{}, reply interface{}) error { func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
server := c.routers.FindServer() server := c.routers.FindServer()
@ -413,6 +298,12 @@ func (c *Client) Stats() map[string]map[string]string {
// GetLANCoordinate returns the network coordinate of the current node, as // GetLANCoordinate returns the network coordinate of the current node, as
// maintained by Serf. // maintained by Serf.
func (c *Client) GetLANCoordinate() (*coordinate.Coordinate, error) { func (c *Client) GetLANCoordinate() (lib.CoordinateSet, error) {
return c.serf.GetCoordinate() lan, err := c.serf.GetCoordinate()
if err != nil {
return nil, err
}
cs := lib.CoordinateSet{c.config.Segment: lan}
return cs, nil
} }

137
agent/consul/client_serf.go Normal file
View File

@ -0,0 +1,137 @@
package consul
import (
"fmt"
"path/filepath"
"strings"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/serf/serf"
)
// setupSerf is used to setup and initialize a Serf
func (c *Client) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (*serf.Serf, error) {
conf.Init()
conf.NodeName = c.config.NodeName
conf.Tags["role"] = "node"
conf.Tags["dc"] = c.config.Datacenter
conf.Tags["segment"] = c.config.Segment
conf.Tags["id"] = string(c.config.NodeID)
conf.Tags["vsn"] = fmt.Sprintf("%d", c.config.ProtocolVersion)
conf.Tags["vsn_min"] = fmt.Sprintf("%d", ProtocolVersionMin)
conf.Tags["vsn_max"] = fmt.Sprintf("%d", ProtocolVersionMax)
conf.Tags["build"] = c.config.Build
conf.MemberlistConfig.LogOutput = c.config.LogOutput
conf.LogOutput = c.config.LogOutput
conf.Logger = c.logger
conf.EventCh = ch
conf.ProtocolVersion = protocolVersionMap[c.config.ProtocolVersion]
conf.RejoinAfterLeave = c.config.RejoinAfterLeave
conf.Merge = &lanMergeDelegate{
dc: c.config.Datacenter,
nodeID: c.config.NodeID,
nodeName: c.config.NodeName,
segment: c.config.Segment,
}
conf.SnapshotPath = filepath.Join(c.config.DataDir, path)
if err := lib.EnsurePath(conf.SnapshotPath, false); err != nil {
return nil, err
}
return serf.Create(conf)
}
// lanEventHandler is used to handle events from the lan Serf cluster
func (c *Client) lanEventHandler() {
var numQueuedEvents int
for {
numQueuedEvents = len(c.eventCh)
if numQueuedEvents > serfEventBacklogWarning {
c.logger.Printf("[WARN] consul: number of queued serf events above warning threshold: %d/%d", numQueuedEvents, serfEventBacklogWarning)
}
select {
case e := <-c.eventCh:
switch e.EventType() {
case serf.EventMemberJoin:
c.nodeJoin(e.(serf.MemberEvent))
case serf.EventMemberLeave, serf.EventMemberFailed:
c.nodeFail(e.(serf.MemberEvent))
case serf.EventUser:
c.localEvent(e.(serf.UserEvent))
case serf.EventMemberUpdate: // Ignore
case serf.EventMemberReap: // Ignore
case serf.EventQuery: // Ignore
default:
c.logger.Printf("[WARN] consul: unhandled LAN Serf Event: %#v", e)
}
case <-c.shutdownCh:
return
}
}
}
// nodeJoin is used to handle join events on the serf cluster
func (c *Client) nodeJoin(me serf.MemberEvent) {
for _, m := range me.Members {
ok, parts := metadata.IsConsulServer(m)
if !ok {
continue
}
if parts.Datacenter != c.config.Datacenter {
c.logger.Printf("[WARN] consul: server %s for datacenter %s has joined wrong cluster",
m.Name, parts.Datacenter)
continue
}
c.logger.Printf("[INFO] consul: adding server %s", parts)
c.routers.AddServer(parts)
// Trigger the callback
if c.config.ServerUp != nil {
c.config.ServerUp()
}
}
}
// nodeFail is used to handle fail events on the serf cluster
func (c *Client) nodeFail(me serf.MemberEvent) {
for _, m := range me.Members {
ok, parts := metadata.IsConsulServer(m)
if !ok {
continue
}
c.logger.Printf("[INFO] consul: removing server %s", parts)
c.routers.RemoveServer(parts)
}
}
// localEvent is called when we receive an event on the local Serf
func (c *Client) localEvent(event serf.UserEvent) {
// Handle only consul events
if !strings.HasPrefix(event.Name, "consul:") {
return
}
switch name := event.Name; {
case name == newLeaderEvent:
c.logger.Printf("[INFO] consul: New leader elected: %s", event.Payload)
// Trigger the callback
if c.config.ServerUp != nil {
c.config.ServerUp()
}
case isUserEvent(name):
event.Name = rawUserEventName(name)
c.logger.Printf("[DEBUG] consul: user event: %s", event.Name)
// Trigger the callback
if c.config.UserEventHandler != nil {
c.config.UserEventHandler(event)
}
default:
c.logger.Printf("[WARN] consul: Unhandled local event: %v", event)
}
}

View File

@ -49,6 +49,15 @@ func init() {
} }
} }
// (Enterprise-only)
type NetworkSegment struct {
Name string
Bind string
Port int
Advertise string
SerfConfig *serf.Config
}
// Config is used to configure the server // Config is used to configure the server
type Config struct { type Config struct {
// Bootstrap mode is used to bring up the first Consul server. // Bootstrap mode is used to bring up the first Consul server.
@ -105,6 +114,13 @@ type Config struct {
// RPCSrcAddr is the source address for outgoing RPC connections. // RPCSrcAddr is the source address for outgoing RPC connections.
RPCSrcAddr *net.TCPAddr RPCSrcAddr *net.TCPAddr
// (Enterprise-only) The network segment this agent is part of.
Segment string
// (Enterprise-only) Segments is a list of network segments for a server to
// bind on.
Segments []NetworkSegment
// SerfLANConfig is the configuration for the intra-dc serf // SerfLANConfig is the configuration for the intra-dc serf
SerfLANConfig *serf.Config SerfLANConfig *serf.Config

View File

@ -10,7 +10,6 @@ import (
"github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
"github.com/hashicorp/serf/coordinate"
) )
// Coordinate manages queries and updates for network coordinates. // Coordinate manages queries and updates for network coordinates.
@ -18,8 +17,10 @@ type Coordinate struct {
// srv is a pointer back to the server. // srv is a pointer back to the server.
srv *Server srv *Server
// updates holds pending coordinate updates for the given nodes. // updates holds pending coordinate updates for the given nodes. This is
updates map[string]*coordinate.Coordinate // keyed by node:segment so we can get a coordinate for each segment for
// servers, and we only track the latest update per node:segment.
updates map[string]*structs.CoordinateUpdateRequest
// updatesLock synchronizes access to the updates map. // updatesLock synchronizes access to the updates map.
updatesLock sync.Mutex updatesLock sync.Mutex
@ -29,7 +30,7 @@ type Coordinate struct {
func NewCoordinate(srv *Server) *Coordinate { func NewCoordinate(srv *Server) *Coordinate {
c := &Coordinate{ c := &Coordinate{
srv: srv, srv: srv,
updates: make(map[string]*coordinate.Coordinate), updates: make(map[string]*structs.CoordinateUpdateRequest),
} }
go c.batchUpdate() go c.batchUpdate()
@ -58,7 +59,7 @@ func (c *Coordinate) batchApplyUpdates() error {
// incoming messages. // incoming messages.
c.updatesLock.Lock() c.updatesLock.Lock()
pending := c.updates pending := c.updates
c.updates = make(map[string]*coordinate.Coordinate) c.updates = make(map[string]*structs.CoordinateUpdateRequest)
c.updatesLock.Unlock() c.updatesLock.Unlock()
// Enforce the rate limit. // Enforce the rate limit.
@ -73,12 +74,16 @@ func (c *Coordinate) batchApplyUpdates() error {
// batches. // batches.
i := 0 i := 0
updates := make(structs.Coordinates, size) updates := make(structs.Coordinates, size)
for node, coord := range pending { for _, update := range pending {
if !(i < size) { if !(i < size) {
break break
} }
updates[i] = &structs.Coordinate{Node: node, Coord: coord} updates[i] = &structs.Coordinate{
Node: update.Node,
Segment: update.Segment,
Coord: update.Coord,
}
i++ i++
} }
@ -140,8 +145,9 @@ func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct
} }
// Add the coordinate to the map of pending updates. // Add the coordinate to the map of pending updates.
key := fmt.Sprintf("%s:%s", args.Node, args.Segment)
c.updatesLock.Lock() c.updatesLock.Lock()
c.updates[args.Node] = args.Coord c.updates[key] = args
c.updatesLock.Unlock() c.updatesLock.Unlock()
return nil return nil
} }
@ -187,6 +193,7 @@ func (c *Coordinate) ListNodes(args *structs.DCSpecificRequest, reply *structs.I
if err := c.srv.filterACL(args.Token, reply); err != nil { if err := c.srv.filterACL(args.Token, reply); err != nil {
return err return err
} }
return nil return nil
}) })
} }

View File

@ -5,17 +5,18 @@ import (
"math" "math"
"math/rand" "math/rand"
"os" "os"
"reflect"
"strings" "strings"
"testing" "testing"
"time" "time"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/testrpc" "github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/testutil/retry" "github.com/hashicorp/consul/testutil/retry"
"github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/serf/coordinate" "github.com/hashicorp/serf/coordinate"
"github.com/pascaldekloe/goe/verify"
) )
// generateRandomCoordinate creates a random coordinate. This mucks with the // generateRandomCoordinate creates a random coordinate. This mucks with the
@ -33,15 +34,6 @@ func generateRandomCoordinate() *coordinate.Coordinate {
return coord return coord
} }
// verifyCoordinatesEqual will compare a and b and fail if they are not exactly
// equal (no floating point fuzz is considered since we are trying to make sure
// we are getting exactly the coordinates we expect, without math on them).
func verifyCoordinatesEqual(t *testing.T, a, b *coordinate.Coordinate) {
if !reflect.DeepEqual(a, b) {
t.Fatalf("coordinates are not equal: %v != %v", a, b)
}
}
func TestCoordinate_Update(t *testing.T) { func TestCoordinate_Update(t *testing.T) {
t.Parallel() t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) { dir1, s1 := testServerWithConfig(t, func(c *Config) {
@ -94,20 +86,17 @@ func TestCoordinate_Update(t *testing.T) {
// Make sure the updates did not yet apply because the update period // Make sure the updates did not yet apply because the update period
// hasn't expired. // hasn't expired.
state := s1.fsm.State() state := s1.fsm.State()
c, err := state.CoordinateGetRaw("node1") c, err := state.Coordinate("node1")
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if c != nil { verify.Values(t, "", c, lib.CoordinateSet{})
t.Fatalf("should be nil because the update should be batched")
} c, err = state.Coordinate("node2")
c, err = state.CoordinateGetRaw("node2")
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if c != nil { verify.Values(t, "", c, lib.CoordinateSet{})
t.Fatalf("should be nil because the update should be batched")
}
// Send another update for the second node. It should take precedence // Send another update for the second node. It should take precedence
// since there will be two updates in the same batch. // since there will be two updates in the same batch.
@ -118,22 +107,23 @@ func TestCoordinate_Update(t *testing.T) {
// Wait a while and the updates should get picked up. // Wait a while and the updates should get picked up.
time.Sleep(3 * s1.config.CoordinateUpdatePeriod) time.Sleep(3 * s1.config.CoordinateUpdatePeriod)
c, err = state.CoordinateGetRaw("node1") c, err = state.Coordinate("node1")
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if c == nil { expected := lib.CoordinateSet{
t.Fatalf("should return a coordinate but it's nil") "": arg1.Coord,
} }
verifyCoordinatesEqual(t, c, arg1.Coord) verify.Values(t, "", c, expected)
c, err = state.CoordinateGetRaw("node2")
c, err = state.Coordinate("node2")
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if c == nil { expected = lib.CoordinateSet{
t.Fatalf("should return a coordinate but it's nil") "": arg2.Coord,
} }
verifyCoordinatesEqual(t, c, arg2.Coord) verify.Values(t, "", c, expected)
// Register a bunch of additional nodes. // Register a bunch of additional nodes.
spamLen := s1.config.CoordinateUpdateBatchSize*s1.config.CoordinateUpdateMaxBatches + 1 spamLen := s1.config.CoordinateUpdateBatchSize*s1.config.CoordinateUpdateMaxBatches + 1
@ -165,11 +155,11 @@ func TestCoordinate_Update(t *testing.T) {
time.Sleep(3 * s1.config.CoordinateUpdatePeriod) time.Sleep(3 * s1.config.CoordinateUpdatePeriod)
numDropped := 0 numDropped := 0
for i := 0; i < spamLen; i++ { for i := 0; i < spamLen; i++ {
c, err = state.CoordinateGetRaw(fmt.Sprintf("bogusnode%d", i)) c, err = state.Coordinate(fmt.Sprintf("bogusnode%d", i))
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if c == nil { if len(c) == 0 {
numDropped++ numDropped++
} }
} }
@ -304,7 +294,7 @@ func TestCoordinate_ListDatacenters(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("bad: %v", err) t.Fatalf("bad: %v", err)
} }
verifyCoordinatesEqual(t, c, out[0].Coordinates[0].Coord) verify.Values(t, "", c, out[0].Coordinates[0].Coord)
} }
func TestCoordinate_ListNodes(t *testing.T) { func TestCoordinate_ListNodes(t *testing.T) {
@ -374,9 +364,9 @@ func TestCoordinate_ListNodes(t *testing.T) {
resp.Coordinates[2].Node != "foo" { resp.Coordinates[2].Node != "foo" {
r.Fatalf("bad: %v", resp.Coordinates) r.Fatalf("bad: %v", resp.Coordinates)
} }
verifyCoordinatesEqual(t, resp.Coordinates[0].Coord, arg2.Coord) // bar verify.Values(t, "", resp.Coordinates[0].Coord, arg2.Coord) // bar
verifyCoordinatesEqual(t, resp.Coordinates[1].Coord, arg3.Coord) // baz verify.Values(t, "", resp.Coordinates[1].Coord, arg3.Coord) // baz
verifyCoordinatesEqual(t, resp.Coordinates[2].Coord, arg1.Coord) // foo verify.Values(t, "", resp.Coordinates[2].Coord, arg1.Coord) // foo
}) })
} }

View File

@ -171,8 +171,8 @@ func TestHealth_ChecksInState_DistanceSort(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
updates := structs.Coordinates{ updates := structs.Coordinates{
{"foo", lib.GenerateCoordinate(1 * time.Millisecond)}, {Node: "foo", Coord: lib.GenerateCoordinate(1 * time.Millisecond)},
{"bar", lib.GenerateCoordinate(2 * time.Millisecond)}, {Node: "bar", Coord: lib.GenerateCoordinate(2 * time.Millisecond)},
} }
if err := s1.fsm.State().CoordinateBatchUpdate(3, updates); err != nil { if err := s1.fsm.State().CoordinateBatchUpdate(3, updates); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
@ -444,8 +444,8 @@ func TestHealth_ServiceChecks_DistanceSort(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
updates := structs.Coordinates{ updates := structs.Coordinates{
{"foo", lib.GenerateCoordinate(1 * time.Millisecond)}, {Node: "foo", Coord: lib.GenerateCoordinate(1 * time.Millisecond)},
{"bar", lib.GenerateCoordinate(2 * time.Millisecond)}, {Node: "bar", Coord: lib.GenerateCoordinate(2 * time.Millisecond)},
} }
if err := s1.fsm.State().CoordinateBatchUpdate(3, updates); err != nil { if err := s1.fsm.State().CoordinateBatchUpdate(3, updates); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
@ -748,8 +748,8 @@ func TestHealth_ServiceNodes_DistanceSort(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
updates := structs.Coordinates{ updates := structs.Coordinates{
{"foo", lib.GenerateCoordinate(1 * time.Millisecond)}, {Node: "foo", Coord: lib.GenerateCoordinate(1 * time.Millisecond)},
{"bar", lib.GenerateCoordinate(2 * time.Millisecond)}, {Node: "bar", Coord: lib.GenerateCoordinate(2 * time.Millisecond)},
} }
if err := s1.fsm.State().CoordinateBatchUpdate(3, updates); err != nil { if err := s1.fsm.State().CoordinateBatchUpdate(3, updates); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)

View File

@ -7,6 +7,7 @@ import (
"github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/serf/serf" "github.com/hashicorp/serf/serf"
) )
@ -85,8 +86,22 @@ func (m *Internal) EventFire(args *structs.EventFireRequest,
// Add the consul prefix to the event name // Add the consul prefix to the event name
eventName := userEventName(args.Name) eventName := userEventName(args.Name)
// Fire the event // Fire the event on all LAN segments
return m.srv.serfLAN.UserEvent(eventName, args.Payload, false) segments := m.srv.LANSegments()
var errs error
err = m.srv.serfLAN.UserEvent(eventName, args.Payload, false)
if err != nil {
err = fmt.Errorf("error broadcasting event to default segment: %v", err)
errs = multierror.Append(errs, err)
}
for name, segment := range segments {
err := segment.UserEvent(eventName, args.Payload, false)
if err != nil {
err = fmt.Errorf("error broadcasting event to segment %q: %v", name, err)
errs = multierror.Append(errs, err)
}
}
return errs
} }
// KeyringOperation will query the WAN and LAN gossip keyrings of all nodes. // KeyringOperation will query the WAN and LAN gossip keyrings of all nodes.
@ -130,23 +145,36 @@ func (m *Internal) KeyringOperation(
return nil return nil
} }
// executeKeyringOp executes the appropriate keyring-related function based on // executeKeyringOp executes the keyring-related operation in the request
// the type of keyring operation in the request. It takes the KeyManager as an // on either the WAN or LAN pools.
// argument, so it can handle any operation for either LAN or WAN pools.
func (m *Internal) executeKeyringOp( func (m *Internal) executeKeyringOp(
args *structs.KeyringRequest, args *structs.KeyringRequest,
reply *structs.KeyringResponses, reply *structs.KeyringResponses,
wan bool) { wan bool) {
if wan {
mgr := m.srv.KeyManagerWAN()
m.executeKeyringOpMgr(mgr, args, reply, wan)
} else {
segments := m.srv.LANSegments()
m.executeKeyringOpMgr(m.srv.KeyManagerLAN(), args, reply, wan)
for _, segment := range segments {
mgr := segment.KeyManager()
m.executeKeyringOpMgr(mgr, args, reply, wan)
}
}
}
// executeKeyringOpMgr executes the appropriate keyring-related function based on
// the type of keyring operation in the request. It takes the KeyManager as an
// argument, so it can handle any operation for either LAN or WAN pools.
func (m *Internal) executeKeyringOpMgr(
mgr *serf.KeyManager,
args *structs.KeyringRequest,
reply *structs.KeyringResponses,
wan bool) {
var serfResp *serf.KeyResponse var serfResp *serf.KeyResponse
var err error var err error
var mgr *serf.KeyManager
if wan {
mgr = m.srv.KeyManagerWAN()
} else {
mgr = m.srv.KeyManagerLAN()
}
opts := &serf.KeyRequestOptions{RelayFactor: args.RelayFactor} opts := &serf.KeyRequestOptions{RelayFactor: args.RelayFactor}
switch args.Operation { switch args.Operation {

View File

@ -64,7 +64,12 @@ func (s *Server) leaderLoop(stopCh chan struct{}) {
// Fire a user event indicating a new leader // Fire a user event indicating a new leader
payload := []byte(s.config.NodeName) payload := []byte(s.config.NodeName)
if err := s.serfLAN.UserEvent(newLeaderEvent, payload, false); err != nil { if err := s.serfLAN.UserEvent(newLeaderEvent, payload, false); err != nil {
s.logger.Printf("[WARN] consul: failed to broadcast new leader event: %v", err) s.logger.Printf("[WARN] consul: failed to broadcast new leader event on default segment: %v", err)
}
for name, segment := range s.LANSegments() {
if err := segment.UserEvent(newLeaderEvent, payload, false); err != nil {
s.logger.Printf("[WARN] consul: failed to broadcast new leader event on segment %q: %v", name, err)
}
} }
// Reconcile channel is only used once initial reconcile // Reconcile channel is only used once initial reconcile
@ -439,7 +444,9 @@ func (s *Server) shouldHandleMember(member serf.Member) bool {
if valid, dc := isConsulNode(member); valid && dc == s.config.Datacenter { if valid, dc := isConsulNode(member); valid && dc == s.config.Datacenter {
return true return true
} }
if valid, parts := metadata.IsConsulServer(member); valid && parts.Datacenter == s.config.Datacenter { if valid, parts := metadata.IsConsulServer(member); valid &&
parts.Segment == "" &&
parts.Datacenter == s.config.Datacenter {
return true return true
} }
return false return false

View File

@ -15,6 +15,7 @@ type lanMergeDelegate struct {
dc string dc string
nodeID types.NodeID nodeID types.NodeID
nodeName string nodeName string
segment string
} }
func (md *lanMergeDelegate) NotifyMerge(members []*serf.Member) error { func (md *lanMergeDelegate) NotifyMerge(members []*serf.Member) error {
@ -53,6 +54,10 @@ func (md *lanMergeDelegate) NotifyMerge(members []*serf.Member) error {
return fmt.Errorf("Member '%s' part of wrong datacenter '%s'", return fmt.Errorf("Member '%s' part of wrong datacenter '%s'",
m.Name, parts.Datacenter) m.Name, parts.Datacenter)
} }
if segment := m.Tags["segment"]; segment != md.segment {
return fmt.Errorf("Member '%s' part of wrong segment '%s' (expected '%s')", m.Name, segment, md.segment)
}
} }
return nil return nil
} }

View File

@ -101,6 +101,7 @@ func TestMerge_LAN(t *testing.T) {
dc: "dc1", dc: "dc1",
nodeID: types.NodeID("ee954a2f-80de-4b34-8780-97b942a50a99"), nodeID: types.NodeID("ee954a2f-80de-4b34-8780-97b942a50a99"),
nodeName: "node0", nodeName: "node0",
segment: "",
} }
for i, c := range cases { for i, c := range cases {
if err := delegate.NotifyMerge(c.members); c.expect == "" { if err := delegate.NotifyMerge(c.members); c.expect == "" {

View File

@ -6,7 +6,6 @@ import (
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/lib"
"github.com/hashicorp/serf/coordinate"
) )
// nodeSorter takes a list of nodes and a parallel vector of distances and // nodeSorter takes a list of nodes and a parallel vector of distances and
@ -19,15 +18,16 @@ type nodeSorter struct {
// newNodeSorter returns a new sorter for the given source coordinate and set of // newNodeSorter returns a new sorter for the given source coordinate and set of
// nodes. // nodes.
func (s *Server) newNodeSorter(c *coordinate.Coordinate, nodes structs.Nodes) (sort.Interface, error) { func (s *Server) newNodeSorter(cs lib.CoordinateSet, nodes structs.Nodes) (sort.Interface, error) {
state := s.fsm.State() state := s.fsm.State()
vec := make([]float64, len(nodes)) vec := make([]float64, len(nodes))
for i, node := range nodes { for i, node := range nodes {
coord, err := state.CoordinateGetRaw(node.Node) other, err := state.Coordinate(node.Node)
if err != nil { if err != nil {
return nil, err return nil, err
} }
vec[i] = lib.ComputeDistance(c, coord) c1, c2 := cs.Intersect(other)
vec[i] = lib.ComputeDistance(c1, c2)
} }
return &nodeSorter{nodes, vec}, nil return &nodeSorter{nodes, vec}, nil
} }
@ -58,15 +58,16 @@ type serviceNodeSorter struct {
// newServiceNodeSorter returns a new sorter for the given source coordinate and // newServiceNodeSorter returns a new sorter for the given source coordinate and
// set of service nodes. // set of service nodes.
func (s *Server) newServiceNodeSorter(c *coordinate.Coordinate, nodes structs.ServiceNodes) (sort.Interface, error) { func (s *Server) newServiceNodeSorter(cs lib.CoordinateSet, nodes structs.ServiceNodes) (sort.Interface, error) {
state := s.fsm.State() state := s.fsm.State()
vec := make([]float64, len(nodes)) vec := make([]float64, len(nodes))
for i, node := range nodes { for i, node := range nodes {
coord, err := state.CoordinateGetRaw(node.Node) other, err := state.Coordinate(node.Node)
if err != nil { if err != nil {
return nil, err return nil, err
} }
vec[i] = lib.ComputeDistance(c, coord) c1, c2 := cs.Intersect(other)
vec[i] = lib.ComputeDistance(c1, c2)
} }
return &serviceNodeSorter{nodes, vec}, nil return &serviceNodeSorter{nodes, vec}, nil
} }
@ -97,15 +98,16 @@ type healthCheckSorter struct {
// newHealthCheckSorter returns a new sorter for the given source coordinate and // newHealthCheckSorter returns a new sorter for the given source coordinate and
// set of health checks with nodes. // set of health checks with nodes.
func (s *Server) newHealthCheckSorter(c *coordinate.Coordinate, checks structs.HealthChecks) (sort.Interface, error) { func (s *Server) newHealthCheckSorter(cs lib.CoordinateSet, checks structs.HealthChecks) (sort.Interface, error) {
state := s.fsm.State() state := s.fsm.State()
vec := make([]float64, len(checks)) vec := make([]float64, len(checks))
for i, check := range checks { for i, check := range checks {
coord, err := state.CoordinateGetRaw(check.Node) other, err := state.Coordinate(check.Node)
if err != nil { if err != nil {
return nil, err return nil, err
} }
vec[i] = lib.ComputeDistance(c, coord) c1, c2 := cs.Intersect(other)
vec[i] = lib.ComputeDistance(c1, c2)
} }
return &healthCheckSorter{checks, vec}, nil return &healthCheckSorter{checks, vec}, nil
} }
@ -136,15 +138,16 @@ type checkServiceNodeSorter struct {
// newCheckServiceNodeSorter returns a new sorter for the given source coordinate // newCheckServiceNodeSorter returns a new sorter for the given source coordinate
// and set of nodes with health checks. // and set of nodes with health checks.
func (s *Server) newCheckServiceNodeSorter(c *coordinate.Coordinate, nodes structs.CheckServiceNodes) (sort.Interface, error) { func (s *Server) newCheckServiceNodeSorter(cs lib.CoordinateSet, nodes structs.CheckServiceNodes) (sort.Interface, error) {
state := s.fsm.State() state := s.fsm.State()
vec := make([]float64, len(nodes)) vec := make([]float64, len(nodes))
for i, node := range nodes { for i, node := range nodes {
coord, err := state.CoordinateGetRaw(node.Node.Node) other, err := state.Coordinate(node.Node.Node)
if err != nil { if err != nil {
return nil, err return nil, err
} }
vec[i] = lib.ComputeDistance(c, coord) c1, c2 := cs.Intersect(other)
vec[i] = lib.ComputeDistance(c1, c2)
} }
return &checkServiceNodeSorter{nodes, vec}, nil return &checkServiceNodeSorter{nodes, vec}, nil
} }
@ -166,16 +169,16 @@ func (n *checkServiceNodeSorter) Less(i, j int) bool {
} }
// newSorterByDistanceFrom returns a sorter for the given type. // newSorterByDistanceFrom returns a sorter for the given type.
func (s *Server) newSorterByDistanceFrom(c *coordinate.Coordinate, subj interface{}) (sort.Interface, error) { func (s *Server) newSorterByDistanceFrom(cs lib.CoordinateSet, subj interface{}) (sort.Interface, error) {
switch v := subj.(type) { switch v := subj.(type) {
case structs.Nodes: case structs.Nodes:
return s.newNodeSorter(c, v) return s.newNodeSorter(cs, v)
case structs.ServiceNodes: case structs.ServiceNodes:
return s.newServiceNodeSorter(c, v) return s.newServiceNodeSorter(cs, v)
case structs.HealthChecks: case structs.HealthChecks:
return s.newHealthCheckSorter(c, v) return s.newHealthCheckSorter(cs, v)
case structs.CheckServiceNodes: case structs.CheckServiceNodes:
return s.newCheckServiceNodeSorter(c, v) return s.newCheckServiceNodeSorter(cs, v)
default: default:
panic(fmt.Errorf("Unhandled type passed to newSorterByDistanceFrom: %#v", subj)) panic(fmt.Errorf("Unhandled type passed to newSorterByDistanceFrom: %#v", subj))
} }
@ -197,19 +200,19 @@ func (s *Server) sortNodesByDistanceFrom(source structs.QuerySource, subj interf
return nil return nil
} }
// There won't always be a coordinate for the source node. If there's not // There won't always be coordinates for the source node. If there are
// one then we can bail out because there's no meaning for the sort. // none then we can bail out because there's no meaning for the sort.
state := s.fsm.State() state := s.fsm.State()
coord, err := state.CoordinateGetRaw(source.Node) cs, err := state.Coordinate(source.Node)
if err != nil { if err != nil {
return err return err
} }
if coord == nil { if len(cs) == 0 {
return nil return nil
} }
// Do the sort! // Do the sort!
sorter, err := s.newSorterByDistanceFrom(coord, subj) sorter, err := s.newSorterByDistanceFrom(cs, subj)
if err != nil { if err != nil {
return err return err
} }

View File

@ -0,0 +1,45 @@
// +build !ent
package consul
import (
"errors"
"github.com/hashicorp/serf/serf"
)
const (
errSegmentsNotSupported = "network segments are not supported in this version of Consul"
)
var (
ErrSegmentsNotSupported = errors.New(errSegmentsNotSupported)
)
// LANSegmentMembers is used to return the members of the given LAN segment.
func (s *Server) LANSegmentMembers(name string) ([]serf.Member, error) {
if name == "" {
return s.LANMembers(), nil
}
return nil, ErrSegmentsNotSupported
}
// LANSegmentAddr is used to return the address used for the given LAN segment.
func (s *Server) LANSegmentAddr(name string) string {
return ""
}
// setupSegments returns an error if any segments are defined since the OSS
// version of Consul doens't support them.
func (s *Server) setupSegments(config *Config, port int) error {
if len(config.Segments) > 0 {
return ErrSegmentsNotSupported
}
return nil
}
// floodSegments is a NOP in the OSS version of Consul.
func (s *Server) floodSegments(config *Config) {
}

View File

@ -51,6 +51,7 @@ const (
const ( const (
serfLANSnapshot = "serf/local.snapshot" serfLANSnapshot = "serf/local.snapshot"
serfLANSegmentSnapshot = "serf/local-segment-%s.snapshot"
serfWANSnapshot = "serf/remote.snapshot" serfWANSnapshot = "serf/remote.snapshot"
raftState = "raft/" raftState = "raft/"
snapshotsRetained = 2 snapshotsRetained = 2
@ -162,6 +163,10 @@ type Server struct {
// which contains all the DC nodes // which contains all the DC nodes
serfLAN *serf.Serf serfLAN *serf.Serf
// segmentLAN maps segment names to their Serf cluster
segmentLAN map[string]*serf.Serf
segmentLock sync.RWMutex
// serfWAN is the Serf cluster maintained between DC's // serfWAN is the Serf cluster maintained between DC's
// which SHOULD only consist of Consul servers // which SHOULD only consist of Consul servers
serfWAN *serf.Serf serfWAN *serf.Serf
@ -300,6 +305,7 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
rpcServer: rpc.NewServer(), rpcServer: rpc.NewServer(),
rpcTLS: incomingTLS, rpcTLS: incomingTLS,
reassertLeaderCh: make(chan chan error), reassertLeaderCh: make(chan chan error),
segmentLAN: make(map[string]*serf.Serf, len(config.Segments)),
sessionTimers: NewSessionTimers(), sessionTimers: NewSessionTimers(),
tombstoneGC: gc, tombstoneGC: gc,
serverLookup: NewServerLookup(), serverLookup: NewServerLookup(),
@ -353,7 +359,7 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
// Initialize the WAN Serf. // Initialize the WAN Serf.
serfBindPortWAN := config.SerfWANConfig.MemberlistConfig.BindPort serfBindPortWAN := config.SerfWANConfig.MemberlistConfig.BindPort
s.serfWAN, err = s.setupSerf(config.SerfWANConfig, s.eventChWAN, serfWANSnapshot, true, serfBindPortWAN) s.serfWAN, err = s.setupSerf(config.SerfWANConfig, s.eventChWAN, serfWANSnapshot, true, serfBindPortWAN, "")
if err != nil { if err != nil {
s.Shutdown() s.Shutdown()
return nil, fmt.Errorf("Failed to start WAN Serf: %v", err) return nil, fmt.Errorf("Failed to start WAN Serf: %v", err)
@ -368,14 +374,24 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
s.logger.Printf("[INFO] agent: Serf WAN TCP bound to port %d", serfBindPortWAN) s.logger.Printf("[INFO] agent: Serf WAN TCP bound to port %d", serfBindPortWAN)
} }
// Initialize the LAN Serf. // Initialize the LAN segments before the default LAN Serf so we have
s.serfLAN, err = s.setupSerf(config.SerfLANConfig, s.eventChLAN, serfLANSnapshot, false, serfBindPortWAN) // updated port information to publish there.
if err := s.setupSegments(config, serfBindPortWAN); err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to setup network segments: %v", err)
}
// Initialize the LAN Serf for the default network segment.
s.serfLAN, err = s.setupSerf(config.SerfLANConfig, s.eventChLAN, serfLANSnapshot, false, serfBindPortWAN, "")
if err != nil { if err != nil {
s.Shutdown() s.Shutdown()
return nil, fmt.Errorf("Failed to start LAN Serf: %v", err) return nil, fmt.Errorf("Failed to start LAN Serf: %v", err)
} }
go s.lanEventHandler() go s.lanEventHandler()
// Start the flooders after the LAN event handler is wired up.
s.floodSegments(config)
// Add a "static route" to the WAN Serf and hook it up to Serf events. // Add a "static route" to the WAN Serf and hook it up to Serf events.
if err := s.router.AddArea(types.AreaWAN, s.serfWAN, s.connPool, s.config.VerifyOutgoing); err != nil { if err := s.router.AddArea(types.AreaWAN, s.serfWAN, s.connPool, s.config.VerifyOutgoing); err != nil {
s.Shutdown() s.Shutdown()
@ -413,67 +429,6 @@ func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*
return s, nil return s, nil
} }
// setupSerf is used to setup and initialize a Serf
func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, wan bool, wanPort int) (*serf.Serf, error) {
addr := s.Listener.Addr().(*net.TCPAddr)
conf.Init()
if wan {
conf.NodeName = fmt.Sprintf("%s.%s", s.config.NodeName, s.config.Datacenter)
} else {
conf.NodeName = s.config.NodeName
conf.Tags["wan_join_port"] = fmt.Sprintf("%d", wanPort)
}
conf.Tags["role"] = "consul"
conf.Tags["dc"] = s.config.Datacenter
conf.Tags["id"] = string(s.config.NodeID)
conf.Tags["vsn"] = fmt.Sprintf("%d", s.config.ProtocolVersion)
conf.Tags["vsn_min"] = fmt.Sprintf("%d", ProtocolVersionMin)
conf.Tags["vsn_max"] = fmt.Sprintf("%d", ProtocolVersionMax)
conf.Tags["raft_vsn"] = fmt.Sprintf("%d", s.config.RaftConfig.ProtocolVersion)
conf.Tags["build"] = s.config.Build
conf.Tags["port"] = fmt.Sprintf("%d", addr.Port)
if s.config.Bootstrap {
conf.Tags["bootstrap"] = "1"
}
if s.config.BootstrapExpect != 0 {
conf.Tags["expect"] = fmt.Sprintf("%d", s.config.BootstrapExpect)
}
if s.config.NonVoter {
conf.Tags["nonvoter"] = "1"
}
if s.config.UseTLS {
conf.Tags["use_tls"] = "1"
}
conf.MemberlistConfig.LogOutput = s.config.LogOutput
conf.LogOutput = s.config.LogOutput
conf.Logger = s.logger
conf.EventCh = ch
if !s.config.DevMode {
conf.SnapshotPath = filepath.Join(s.config.DataDir, path)
}
conf.ProtocolVersion = protocolVersionMap[s.config.ProtocolVersion]
conf.RejoinAfterLeave = s.config.RejoinAfterLeave
if wan {
conf.Merge = &wanMergeDelegate{}
} else {
conf.Merge = &lanMergeDelegate{
dc: s.config.Datacenter,
nodeID: s.config.NodeID,
nodeName: s.config.NodeName,
}
}
// Until Consul supports this fully, we disable automatic resolution.
// When enabled, the Serf gossip may just turn off if we are the minority
// node which is rather unexpected.
conf.EnableNameConflictResolution = false
if err := lib.EnsurePath(conf.SnapshotPath, false); err != nil {
return nil, err
}
return serf.Create(conf)
}
// setupRaft is used to setup and initialize Raft // setupRaft is used to setup and initialize Raft
func (s *Server) setupRaft() error { func (s *Server) setupRaft() error {
// If we have an unclean exit then attempt to close the Raft store. // If we have an unclean exit then attempt to close the Raft store.
@ -931,6 +886,19 @@ func (s *Server) Encrypted() bool {
return s.serfLAN.EncryptionEnabled() && s.serfWAN.EncryptionEnabled() return s.serfLAN.EncryptionEnabled() && s.serfWAN.EncryptionEnabled()
} }
// LANSegments returns a map of LAN segments by name
func (s *Server) LANSegments() map[string]*serf.Serf {
s.segmentLock.RLock()
defer s.segmentLock.RUnlock()
segments := make(map[string]*serf.Serf, len(s.segmentLAN))
for name, segment := range s.segmentLAN {
segments[name] = segment
}
return segments
}
// inmemCodec is used to do an RPC call without going over a network // inmemCodec is used to do an RPC call without going over a network
type inmemCodec struct { type inmemCodec struct {
method string method string
@ -1042,8 +1010,21 @@ func (s *Server) Stats() map[string]map[string]string {
} }
// GetLANCoordinate returns the coordinate of the server in the LAN gossip pool. // GetLANCoordinate returns the coordinate of the server in the LAN gossip pool.
func (s *Server) GetLANCoordinate() (*coordinate.Coordinate, error) { func (s *Server) GetLANCoordinate() (lib.CoordinateSet, error) {
return s.serfLAN.GetCoordinate() lan, err := s.serfLAN.GetCoordinate()
if err != nil {
return nil, err
}
cs := lib.CoordinateSet{"": lan}
for name, segment := range s.segmentLAN {
c, err := segment.GetCoordinate()
if err != nil {
return nil, err
}
cs[name] = c
}
return cs, nil
} }
// GetWANCoordinate returns the coordinate of the server in the WAN gossip pool. // GetWANCoordinate returns the coordinate of the server in the WAN gossip pool.

View File

@ -1,10 +1,14 @@
package consul package consul
import ( import (
"fmt"
"net"
"path/filepath"
"strings" "strings"
"time" "time"
"github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/raft" "github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf" "github.com/hashicorp/serf/serf"
) )
@ -24,6 +28,76 @@ const (
peerRetryBase = 1 * time.Second peerRetryBase = 1 * time.Second
) )
// setupSerf is used to setup and initialize a Serf
func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, wan bool, wanPort int, segment string) (*serf.Serf, error) {
conf.Init()
if wan {
conf.NodeName = fmt.Sprintf("%s.%s", s.config.NodeName, s.config.Datacenter)
} else {
conf.NodeName = s.config.NodeName
conf.Tags["wan_join_port"] = fmt.Sprintf("%d", wanPort)
}
conf.Tags["role"] = "consul"
conf.Tags["dc"] = s.config.Datacenter
conf.Tags["segment"] = segment
if segment == "" {
for _, s := range s.config.Segments {
conf.Tags["segment_port_"+s.Name] = fmt.Sprintf("%d", s.Port)
}
}
conf.Tags["id"] = string(s.config.NodeID)
conf.Tags["vsn"] = fmt.Sprintf("%d", s.config.ProtocolVersion)
conf.Tags["vsn_min"] = fmt.Sprintf("%d", ProtocolVersionMin)
conf.Tags["vsn_max"] = fmt.Sprintf("%d", ProtocolVersionMax)
conf.Tags["raft_vsn"] = fmt.Sprintf("%d", s.config.RaftConfig.ProtocolVersion)
conf.Tags["build"] = s.config.Build
addr := s.Listener.Addr().(*net.TCPAddr)
conf.Tags["port"] = fmt.Sprintf("%d", addr.Port)
if s.config.Bootstrap {
conf.Tags["bootstrap"] = "1"
}
if s.config.BootstrapExpect != 0 {
conf.Tags["expect"] = fmt.Sprintf("%d", s.config.BootstrapExpect)
}
if s.config.NonVoter {
conf.Tags["nonvoter"] = "1"
}
if s.config.UseTLS {
conf.Tags["use_tls"] = "1"
}
conf.MemberlistConfig.LogOutput = s.config.LogOutput
conf.LogOutput = s.config.LogOutput
conf.Logger = s.logger
conf.EventCh = ch
conf.ProtocolVersion = protocolVersionMap[s.config.ProtocolVersion]
conf.RejoinAfterLeave = s.config.RejoinAfterLeave
if wan {
conf.Merge = &wanMergeDelegate{}
} else {
conf.Merge = &lanMergeDelegate{
dc: s.config.Datacenter,
nodeID: s.config.NodeID,
nodeName: s.config.NodeName,
segment: segment,
}
}
// Until Consul supports this fully, we disable automatic resolution.
// When enabled, the Serf gossip may just turn off if we are the minority
// node which is rather unexpected.
conf.EnableNameConflictResolution = false
if !s.config.DevMode {
conf.SnapshotPath = filepath.Join(s.config.DataDir, path)
}
if err := lib.EnsurePath(conf.SnapshotPath, false); err != nil {
return nil, err
}
return serf.Create(conf)
}
// userEventName computes the name of a user event // userEventName computes the name of a user event
func userEventName(name string) string { func userEventName(name string) string {
return userEventPrefix + name return userEventPrefix + name
@ -126,7 +200,7 @@ func (s *Server) localEvent(event serf.UserEvent) {
func (s *Server) lanNodeJoin(me serf.MemberEvent) { func (s *Server) lanNodeJoin(me serf.MemberEvent) {
for _, m := range me.Members { for _, m := range me.Members {
ok, serverMeta := metadata.IsConsulServer(m) ok, serverMeta := metadata.IsConsulServer(m)
if !ok { if !ok || serverMeta.Segment != "" {
continue continue
} }
s.logger.Printf("[INFO] consul: Adding LAN server %s", serverMeta) s.logger.Printf("[INFO] consul: Adding LAN server %s", serverMeta)
@ -262,7 +336,7 @@ func (s *Server) maybeBootstrap() {
func (s *Server) lanNodeFailed(me serf.MemberEvent) { func (s *Server) lanNodeFailed(me serf.MemberEvent) {
for _, m := range me.Members { for _, m := range me.Members {
ok, serverMeta := metadata.IsConsulServer(m) ok, serverMeta := metadata.IsConsulServer(m)
if !ok { if !ok || serverMeta.Segment != "" {
continue continue
} }
s.logger.Printf("[INFO] consul: Removing LAN server %s", serverMeta) s.logger.Printf("[INFO] consul: Removing LAN server %s", serverMeta)

View File

@ -370,12 +370,12 @@ func (s *Store) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeName string) error
} }
} }
// Delete any coordinate associated with this node. // Delete any coordinates associated with this node.
coord, err := tx.First("coordinates", "id", nodeName) coords, err := tx.Get("coordinates", "node", nodeName)
if err != nil { if err != nil {
return fmt.Errorf("failed coordinate lookup: %s", err) return fmt.Errorf("failed coordinate lookup: %s", err)
} }
if coord != nil { for coord := coords.Next(); coord != nil; coord = coords.Next() {
if err := tx.Delete("coordinates", coord); err != nil { if err := tx.Delete("coordinates", coord); err != nil {
return fmt.Errorf("failed deleting coordinate: %s", err) return fmt.Errorf("failed deleting coordinate: %s", err)
} }

View File

@ -4,8 +4,8 @@ import (
"fmt" "fmt"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
"github.com/hashicorp/serf/coordinate"
) )
// Coordinates is used to pull all the coordinates from the snapshot. // Coordinates is used to pull all the coordinates from the snapshot.
@ -40,26 +40,23 @@ func (s *Restore) Coordinates(idx uint64, updates structs.Coordinates) error {
return nil return nil
} }
// CoordinateGetRaw queries for the coordinate of the given node. This is an // Coordinate returns a map of coordinates for the given node, indexed by
// unusual state store method because it just returns the raw coordinate or // network segment.
// nil, none of the Raft or node information is returned. This hits the 90% func (s *Store) Coordinate(node string) (lib.CoordinateSet, error) {
// internal-to-Consul use case for this data, and this isn't exposed via an
// endpoint, so it doesn't matter that the Raft info isn't available.
func (s *Store) CoordinateGetRaw(node string) (*coordinate.Coordinate, error) {
tx := s.db.Txn(false) tx := s.db.Txn(false)
defer tx.Abort() defer tx.Abort()
// Pull the full coordinate entry. iter, err := tx.Get("coordinates", "node", node)
coord, err := tx.First("coordinates", "id", node)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed coordinate lookup: %s", err) return nil, fmt.Errorf("failed coordinate lookup: %s", err)
} }
// Pick out just the raw coordinate. results := make(lib.CoordinateSet)
if coord != nil { for raw := iter.Next(); raw != nil; raw = iter.Next() {
return coord.(*structs.Coordinate).Coord, nil coord := raw.(*structs.Coordinate)
results[coord.Segment] = coord.Coord
} }
return nil, nil return results, nil
} }
// Coordinates queries for all nodes with coordinates. // Coordinates queries for all nodes with coordinates.

View File

@ -3,12 +3,13 @@ package state
import ( import (
"math" "math"
"math/rand" "math/rand"
"reflect"
"testing" "testing"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
"github.com/hashicorp/serf/coordinate" "github.com/hashicorp/serf/coordinate"
"github.com/pascaldekloe/goe/verify"
) )
// generateRandomCoordinate creates a random coordinate. This mucks with the // generateRandomCoordinate creates a random coordinate. This mucks with the
@ -30,25 +31,22 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
s := testStateStore(t) s := testStateStore(t)
// Make sure the coordinates list starts out empty, and that a query for // Make sure the coordinates list starts out empty, and that a query for
// a raw coordinate for a nonexistent node doesn't do anything bad. // a per-node coordinate for a nonexistent node doesn't do anything bad.
ws := memdb.NewWatchSet() ws := memdb.NewWatchSet()
idx, coords, err := s.Coordinates(ws) idx, all, err := s.Coordinates(ws)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
if idx != 0 { if idx != 0 {
t.Fatalf("bad index: %d", idx) t.Fatalf("bad index: %d", idx)
} }
if coords != nil { verify.Values(t, "", all, structs.Coordinates{})
t.Fatalf("bad: %#v", coords)
} coords, err := s.Coordinate("nope")
coord, err := s.CoordinateGetRaw("nope")
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
if coord != nil { verify.Values(t, "", coords, lib.CoordinateSet{})
t.Fatalf("bad: %#v", coord)
}
// Make an update for nodes that don't exist and make sure they get // Make an update for nodes that don't exist and make sure they get
// ignored. // ignored.
@ -72,16 +70,14 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
// Should still be empty, though applying an empty batch does bump // Should still be empty, though applying an empty batch does bump
// the table index. // the table index.
ws = memdb.NewWatchSet() ws = memdb.NewWatchSet()
idx, coords, err = s.Coordinates(ws) idx, all, err = s.Coordinates(ws)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
if idx != 1 { if idx != 1 {
t.Fatalf("bad index: %d", idx) t.Fatalf("bad index: %d", idx)
} }
if coords != nil { verify.Values(t, "", all, structs.Coordinates{})
t.Fatalf("bad: %#v", coords)
}
// Register the nodes then do the update again. // Register the nodes then do the update again.
testRegisterNode(t, s, 1, "node1") testRegisterNode(t, s, 1, "node1")
@ -95,26 +91,25 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
// Should go through now. // Should go through now.
ws = memdb.NewWatchSet() ws = memdb.NewWatchSet()
idx, coords, err = s.Coordinates(ws) idx, all, err = s.Coordinates(ws)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
if idx != 3 { if idx != 3 {
t.Fatalf("bad index: %d", idx) t.Fatalf("bad index: %d", idx)
} }
if !reflect.DeepEqual(coords, updates) { verify.Values(t, "", all, updates)
t.Fatalf("bad: %#v", coords)
}
// Also verify the raw coordinate interface. // Also verify the per-node coordinate interface.
for _, update := range updates { for _, update := range updates {
coord, err := s.CoordinateGetRaw(update.Node) coords, err := s.Coordinate(update.Node)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
if !reflect.DeepEqual(coord, update.Coord) { expected := lib.CoordinateSet{
t.Fatalf("bad: %#v", coord) "": update.Coord,
} }
verify.Values(t, "", coords, expected)
} }
// Update the coordinate for one of the nodes. // Update the coordinate for one of the nodes.
@ -127,26 +122,25 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
} }
// Verify it got applied. // Verify it got applied.
idx, coords, err = s.Coordinates(nil) idx, all, err = s.Coordinates(nil)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
if idx != 4 { if idx != 4 {
t.Fatalf("bad index: %d", idx) t.Fatalf("bad index: %d", idx)
} }
if !reflect.DeepEqual(coords, updates) { verify.Values(t, "", all, updates)
t.Fatalf("bad: %#v", coords)
}
// And check the raw coordinate version of the same thing. // And check the per-node coordinate version of the same thing.
for _, update := range updates { for _, update := range updates {
coord, err := s.CoordinateGetRaw(update.Node) coords, err := s.Coordinate(update.Node)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
if !reflect.DeepEqual(coord, update.Coord) { expected := lib.CoordinateSet{
t.Fatalf("bad: %#v", coord) "": update.Coord,
} }
verify.Values(t, "", coords, expected)
} }
// Apply an invalid update and make sure it gets ignored. // Apply an invalid update and make sure it gets ignored.
@ -162,16 +156,14 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
// Verify we are at the previous state, though the empty batch does bump // Verify we are at the previous state, though the empty batch does bump
// the table index. // the table index.
idx, coords, err = s.Coordinates(nil) idx, all, err = s.Coordinates(nil)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
if idx != 5 { if idx != 5 {
t.Fatalf("bad index: %d", idx) t.Fatalf("bad index: %d", idx)
} }
if !reflect.DeepEqual(coords, updates) { verify.Values(t, "", all, updates)
t.Fatalf("bad: %#v", coords)
}
} }
func TestStateStore_Coordinate_Cleanup(t *testing.T) { func TestStateStore_Coordinate_Cleanup(t *testing.T) {
@ -182,6 +174,12 @@ func TestStateStore_Coordinate_Cleanup(t *testing.T) {
updates := structs.Coordinates{ updates := structs.Coordinates{
&structs.Coordinate{ &structs.Coordinate{
Node: "node1", Node: "node1",
Segment: "alpha",
Coord: generateRandomCoordinate(),
},
&structs.Coordinate{
Node: "node1",
Segment: "beta",
Coord: generateRandomCoordinate(), Coord: generateRandomCoordinate(),
}, },
} }
@ -190,13 +188,15 @@ func TestStateStore_Coordinate_Cleanup(t *testing.T) {
} }
// Make sure it's in there. // Make sure it's in there.
coord, err := s.CoordinateGetRaw("node1") coords, err := s.Coordinate("node1")
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
if !reflect.DeepEqual(coord, updates[0].Coord) { expected := lib.CoordinateSet{
t.Fatalf("bad: %#v", coord) "alpha": updates[0].Coord,
"beta": updates[1].Coord,
} }
verify.Values(t, "", coords, expected)
// Now delete the node. // Now delete the node.
if err := s.DeleteNode(3, "node1"); err != nil { if err := s.DeleteNode(3, "node1"); err != nil {
@ -204,25 +204,21 @@ func TestStateStore_Coordinate_Cleanup(t *testing.T) {
} }
// Make sure the coordinate is gone. // Make sure the coordinate is gone.
coord, err = s.CoordinateGetRaw("node1") coords, err = s.Coordinate("node1")
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
if coord != nil { verify.Values(t, "", coords, lib.CoordinateSet{})
t.Fatalf("bad: %#v", coord)
}
// Make sure the index got updated. // Make sure the index got updated.
idx, coords, err := s.Coordinates(nil) idx, all, err := s.Coordinates(nil)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
if idx != 3 { if idx != 3 {
t.Fatalf("bad index: %d", idx) t.Fatalf("bad index: %d", idx)
} }
if coords != nil { verify.Values(t, "", all, structs.Coordinates{})
t.Fatalf("bad: %#v", coords)
}
} }
func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) { func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) {
@ -291,9 +287,7 @@ func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) {
// The snapshot will have the bad update in it, since we don't filter on // The snapshot will have the bad update in it, since we don't filter on
// the read side. // the read side.
if !reflect.DeepEqual(dump, append(updates, badUpdate)) { verify.Values(t, "", dump, append(updates, badUpdate))
t.Fatalf("bad: %#v", dump)
}
// Restore the values into a new state store. // Restore the values into a new state store.
func() { func() {
@ -312,9 +306,7 @@ func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) {
if idx != 6 { if idx != 6 {
t.Fatalf("bad index: %d", idx) t.Fatalf("bad index: %d", idx)
} }
if !reflect.DeepEqual(res, updates) { verify.Values(t, "", res, updates)
t.Fatalf("bad: %#v", res)
}
// Check that the index was updated (note that it got passed // Check that the index was updated (note that it got passed
// in during the restore). // in during the restore).

View File

@ -374,6 +374,26 @@ func coordinatesTableSchema() *memdb.TableSchema {
Name: "id", Name: "id",
AllowMissing: false, AllowMissing: false,
Unique: true, Unique: true,
Indexer: &memdb.CompoundIndex{
// AllowMissing is required since we allow
// Segment to be an empty string.
AllowMissing: true,
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "Node",
Lowercase: true,
},
&memdb.StringFieldIndex{
Field: "Segment",
Lowercase: true,
},
},
},
},
"node": &memdb.IndexSchema{
Name: "node",
AllowMissing: false,
Unique: false,
Indexer: &memdb.StringFieldIndex{ Indexer: &memdb.StringFieldIndex{
Field: "Node", Field: "Node",
Lowercase: true, Lowercase: true,

View File

@ -81,5 +81,18 @@ func (s *HTTPServer) CoordinateNodes(resp http.ResponseWriter, req *http.Request
if out.Coordinates == nil { if out.Coordinates == nil {
out.Coordinates = make(structs.Coordinates, 0) out.Coordinates = make(structs.Coordinates, 0)
} }
// Filter by segment if applicable
if v, ok := req.URL.Query()["segment"]; ok && len(v) > 0 {
segment := v[0]
filtered := make(structs.Coordinates, 0)
for _, coord := range out.Coordinates {
if coord.Segment == segment {
filtered = append(filtered, coord)
}
}
out.Coordinates = filtered
}
return out.Coordinates, nil return out.Coordinates, nil
} }

View File

@ -68,6 +68,7 @@ func TestCoordinate_Nodes(t *testing.T) {
arg1 := structs.CoordinateUpdateRequest{ arg1 := structs.CoordinateUpdateRequest{
Datacenter: "dc1", Datacenter: "dc1",
Node: "foo", Node: "foo",
Segment: "alpha",
Coord: coordinate.NewCoordinate(coordinate.DefaultConfig()), Coord: coordinate.NewCoordinate(coordinate.DefaultConfig()),
} }
var out struct{} var out struct{}
@ -99,4 +100,43 @@ func TestCoordinate_Nodes(t *testing.T) {
coordinates[1].Node != "foo" { coordinates[1].Node != "foo" {
t.Fatalf("bad: %v", coordinates) t.Fatalf("bad: %v", coordinates)
} }
// Filter on a nonexistant node segment
req, _ = http.NewRequest("GET", "/v1/coordinate/nodes?segment=nope", nil)
resp = httptest.NewRecorder()
obj, err = a.srv.CoordinateNodes(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
coordinates = obj.(structs.Coordinates)
if len(coordinates) != 0 {
t.Fatalf("bad: %v", coordinates)
}
// Filter on a real node segment
req, _ = http.NewRequest("GET", "/v1/coordinate/nodes?segment=alpha", nil)
resp = httptest.NewRecorder()
obj, err = a.srv.CoordinateNodes(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
coordinates = obj.(structs.Coordinates)
if len(coordinates) != 1 || coordinates[0].Node != "foo" {
t.Fatalf("bad: %v", coordinates)
}
// Make sure the empty filter works
req, _ = http.NewRequest("GET", "/v1/coordinate/nodes?segment=", nil)
resp = httptest.NewRecorder()
obj, err = a.srv.CoordinateNodes(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
coordinates = obj.(structs.Coordinates)
if len(coordinates) != 1 || coordinates[0].Node != "bar" {
t.Fatalf("bad: %v", coordinates)
}
} }

View File

@ -127,6 +127,7 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
id := services.NodeServices.Node.ID id := services.NodeServices.Node.ID
addrs := services.NodeServices.Node.TaggedAddresses addrs := services.NodeServices.Node.TaggedAddresses
meta := services.NodeServices.Node.Meta meta := services.NodeServices.Node.Meta
delete(meta, structs.MetaSegmentKey) // Added later, not in config.
if id != a.Config.NodeID || if id != a.Config.NodeID ||
!reflect.DeepEqual(addrs, a.Config.TaggedAddresses) || !reflect.DeepEqual(addrs, a.Config.TaggedAddresses) ||
!reflect.DeepEqual(meta, a.Config.Meta) { !reflect.DeepEqual(meta, a.Config.Meta) {
@ -828,6 +829,7 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
id := services.NodeServices.Node.ID id := services.NodeServices.Node.ID
addrs := services.NodeServices.Node.TaggedAddresses addrs := services.NodeServices.Node.TaggedAddresses
meta := services.NodeServices.Node.Meta meta := services.NodeServices.Node.Meta
delete(meta, structs.MetaSegmentKey) // Added later, not in config.
if id != a.Config.NodeID || if id != a.Config.NodeID ||
!reflect.DeepEqual(addrs, a.Config.TaggedAddresses) || !reflect.DeepEqual(addrs, a.Config.TaggedAddresses) ||
!reflect.DeepEqual(meta, a.Config.Meta) { !reflect.DeepEqual(meta, a.Config.Meta) {
@ -1364,6 +1366,7 @@ func TestAgentAntiEntropy_NodeInfo(t *testing.T) {
id := services.NodeServices.Node.ID id := services.NodeServices.Node.ID
addrs := services.NodeServices.Node.TaggedAddresses addrs := services.NodeServices.Node.TaggedAddresses
meta := services.NodeServices.Node.Meta meta := services.NodeServices.Node.Meta
delete(meta, structs.MetaSegmentKey) // Added later, not in config.
if id != cfg.NodeID || if id != cfg.NodeID ||
!reflect.DeepEqual(addrs, cfg.TaggedAddresses) || !reflect.DeepEqual(addrs, cfg.TaggedAddresses) ||
!reflect.DeepEqual(meta, cfg.Meta) { !reflect.DeepEqual(meta, cfg.Meta) {
@ -1387,6 +1390,7 @@ func TestAgentAntiEntropy_NodeInfo(t *testing.T) {
id := services.NodeServices.Node.ID id := services.NodeServices.Node.ID
addrs := services.NodeServices.Node.TaggedAddresses addrs := services.NodeServices.Node.TaggedAddresses
meta := services.NodeServices.Node.Meta meta := services.NodeServices.Node.Meta
delete(meta, structs.MetaSegmentKey) // Added later, not in config.
if id != cfg.NodeID || if id != cfg.NodeID ||
!reflect.DeepEqual(addrs, cfg.TaggedAddresses) || !reflect.DeepEqual(addrs, cfg.TaggedAddresses) ||
!reflect.DeepEqual(meta, cfg.Meta) { !reflect.DeepEqual(meta, cfg.Meta) {

View File

@ -10,6 +10,7 @@ import (
"net" "net"
"regexp" "regexp"
"strconv" "strconv"
"strings"
"github.com/hashicorp/go-version" "github.com/hashicorp/go-version"
"github.com/hashicorp/serf/serf" "github.com/hashicorp/serf/serf"
@ -30,7 +31,9 @@ type Server struct {
Name string Name string
ID string ID string
Datacenter string Datacenter string
Segment string
Port int Port int
SegmentPorts map[string]int
WanJoinPort int WanJoinPort int
Bootstrap bool Bootstrap bool
Expect int Expect int
@ -73,8 +76,8 @@ func IsConsulServer(m serf.Member) (bool, *Server) {
} }
datacenter := m.Tags["dc"] datacenter := m.Tags["dc"]
segment := m.Tags["segment"]
_, bootstrap := m.Tags["bootstrap"] _, bootstrap := m.Tags["bootstrap"]
_, useTLS := m.Tags["use_tls"] _, useTLS := m.Tags["use_tls"]
expect := 0 expect := 0
@ -93,6 +96,17 @@ func IsConsulServer(m serf.Member) (bool, *Server) {
return false, nil return false, nil
} }
segment_ports := make(map[string]int)
for name, value := range m.Tags {
if strings.HasPrefix(name, "segment_port_") {
segment_port, err := strconv.Atoi(value)
if err != nil {
return false, nil
}
segment_ports[strings.TrimPrefix(name, "segment_port_")] = segment_port
}
}
build_version, err := version.NewVersion(versionFormat.FindString(m.Tags["build"])) build_version, err := version.NewVersion(versionFormat.FindString(m.Tags["build"]))
if err != nil { if err != nil {
return false, nil return false, nil
@ -130,7 +144,9 @@ func IsConsulServer(m serf.Member) (bool, *Server) {
Name: m.Name, Name: m.Name,
ID: m.Tags["id"], ID: m.Tags["id"],
Datacenter: datacenter, Datacenter: datacenter,
Segment: segment,
Port: port, Port: port,
SegmentPorts: segment_ports,
WanJoinPort: wan_join_port, WanJoinPort: wan_join_port,
Bootstrap: bootstrap, Bootstrap: bootstrap,
Expect: expect, Expect: expect,

View File

@ -74,6 +74,9 @@ const (
// metaValueMaxLength is the maximum allowed length of a metadata value // metaValueMaxLength is the maximum allowed length of a metadata value
metaValueMaxLength = 512 metaValueMaxLength = 512
// MetaSegmentKey is the node metadata key used to store the node's network segment
MetaSegmentKey = "consul-network-segment"
// MaxLockDelay provides a maximum LockDelay value for // MaxLockDelay provides a maximum LockDelay value for
// a session. Any value above this will not be respected. // a session. Any value above this will not be respected.
MaxLockDelay = 60 * time.Second MaxLockDelay = 60 * time.Second
@ -748,6 +751,7 @@ type IndexedSessions struct {
// Coordinate stores a node name with its associated network coordinate. // Coordinate stores a node name with its associated network coordinate.
type Coordinate struct { type Coordinate struct {
Node string Node string
Segment string
Coord *coordinate.Coordinate Coord *coordinate.Coordinate
} }
@ -781,6 +785,7 @@ type DatacenterMap struct {
type CoordinateUpdateRequest struct { type CoordinateUpdateRequest struct {
Datacenter string Datacenter string
Node string Node string
Segment string
Coord *coordinate.Coordinate Coord *coordinate.Coordinate
WriteRequest WriteRequest
} }

View File

@ -235,6 +235,13 @@ func (a *TestAgent) HTTPAddr() string {
return a.srv.Addr return a.srv.Addr
} }
func (a *TestAgent) SegmentAddr(name string) string {
if server, ok := a.Agent.delegate.(*consul.Server); ok {
return server.LANSegmentAddr(name)
}
return ""
}
func (a *TestAgent) Client() *api.Client { func (a *TestAgent) Client() *api.Client {
conf := api.DefaultConfig() conf := api.DefaultConfig()
conf.Address = a.HTTPAddr() conf.Address = a.HTTPAddr()

View File

@ -44,6 +44,15 @@ type AgentMember struct {
DelegateCur uint8 DelegateCur uint8
} }
// MemberOpts is used for querying member information.
type MemberOpts struct {
// Wan is whether to show members from the LAN.
Wan bool
// Segment is the LAN segment to show members
Segment string
}
// AgentServiceRegistration is used to register a new service // AgentServiceRegistration is used to register a new service
type AgentServiceRegistration struct { type AgentServiceRegistration struct {
ID string `json:",omitempty"` ID string `json:",omitempty"`
@ -256,6 +265,28 @@ func (a *Agent) Members(wan bool) ([]*AgentMember, error) {
return out, nil return out, nil
} }
// Members returns the known gossip members. The WAN
// flag can be used to query a server for WAN members.
func (a *Agent) MembersOpts(wan bool, segment string) ([]*AgentMember, error) {
r := a.c.newRequest("GET", "/v1/agent/members")
r.params.Set("segment", segment)
if wan {
r.params.Set("wan", "1")
}
_, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return nil, err
}
defer resp.Body.Close()
var out []*AgentMember
if err := decodeBody(resp, &out); err != nil {
return nil, err
}
return out, nil
}
// ServiceRegister is used to register a new service with // ServiceRegister is used to register a new service with
// the local agent // the local agent
func (a *Agent) ServiceRegister(service *AgentServiceRegistration) error { func (a *Agent) ServiceRegister(service *AgentServiceRegistration) error {

View File

@ -48,7 +48,9 @@ func TestAPI_CatalogNodes(t *testing.T) {
"lan": "127.0.0.1", "lan": "127.0.0.1",
"wan": "127.0.0.1", "wan": "127.0.0.1",
}, },
Meta: map[string]string{}, Meta: map[string]string{
"consul-network-segment": "",
},
CreateIndex: meta.LastIndex - 1, CreateIndex: meta.LastIndex - 1,
ModifyIndex: meta.LastIndex, ModifyIndex: meta.LastIndex,
}, },

View File

@ -7,6 +7,7 @@ import (
// CoordinateEntry represents a node and its associated network coordinate. // CoordinateEntry represents a node and its associated network coordinate.
type CoordinateEntry struct { type CoordinateEntry struct {
Node string Node string
Segment string
Coord *coordinate.Coordinate Coord *coordinate.Coordinate
} }

11
api/operator_segment.go Normal file
View File

@ -0,0 +1,11 @@
package api
// SegmentList returns all the available LAN segments.
func (op *Operator) SegmentList(q *QueryOptions) ([]string, *QueryMeta, error) {
var out []string
qm, err := op.c.query("/v1/operator/segment/list", &out, q)
if err != nil {
return nil, nil, err
}
return out, qm, nil
}

View File

@ -117,6 +117,7 @@ func (cmd *AgentCommand) readConfig() *agent.Config {
f.StringVar(&cmdCfg.AdvertiseAddr, "advertise", "", "Sets the advertise address to use.") f.StringVar(&cmdCfg.AdvertiseAddr, "advertise", "", "Sets the advertise address to use.")
f.StringVar(&cmdCfg.AdvertiseAddrWan, "advertise-wan", "", f.StringVar(&cmdCfg.AdvertiseAddrWan, "advertise-wan", "",
"Sets address to advertise on WAN instead of -advertise address.") "Sets address to advertise on WAN instead of -advertise address.")
f.StringVar(&cmdCfg.Segment, "segment", "", "(Enterprise-only) Sets the network segment to join.")
f.IntVar(&cmdCfg.Protocol, "protocol", -1, f.IntVar(&cmdCfg.Protocol, "protocol", -1,
"Sets the protocol version. Defaults to latest.") "Sets the protocol version. Defaults to latest.")
@ -224,6 +225,10 @@ func (cmd *AgentCommand) readConfig() *agent.Config {
key, value := agent.ParseMetaPair(entry) key, value := agent.ParseMetaPair(entry)
cmdCfg.Meta[key] = value cmdCfg.Meta[key] = value
} }
if err := structs.ValidateMetadata(cmdCfg.Meta); err != nil {
cmd.UI.Error(fmt.Sprintf("Failed to parse node metadata: %v", err))
return nil
}
} }
cfg := agent.DefaultConfig() cfg := agent.DefaultConfig()
@ -508,11 +513,6 @@ func (cmd *AgentCommand) readConfig() *agent.Config {
cmd.UI.Error("WARNING: Bootstrap mode enabled! Do not enable unless necessary") cmd.UI.Error("WARNING: Bootstrap mode enabled! Do not enable unless necessary")
} }
// Verify the node metadata entries are valid
if err := structs.ValidateMetadata(cfg.Meta); err != nil {
cmd.UI.Error(fmt.Sprintf("Failed to parse node metadata: %v", err))
}
// It doesn't make sense to include both UI options. // It doesn't make sense to include both UI options.
if cfg.EnableUI == true && cfg.UIDir != "" { if cfg.EnableUI == true && cfg.UIDir != "" {
cmd.UI.Error("Both the ui and ui-dir flags were specified, please provide only one") cmd.UI.Error("Both the ui and ui-dir flags were specified, please provide only one")
@ -804,17 +804,22 @@ func (cmd *AgentCommand) run(args []string) int {
// Let the agent know we've finished registration // Let the agent know we've finished registration
agent.StartSync() agent.StartSync()
segment := config.Segment
if config.Server {
segment = "<all>"
}
cmd.UI.Output("Consul agent running!") cmd.UI.Output("Consul agent running!")
cmd.UI.Info(fmt.Sprintf(" Version: '%s'", cmd.HumanVersion)) cmd.UI.Info(fmt.Sprintf(" Version: '%s'", cmd.HumanVersion))
cmd.UI.Info(fmt.Sprintf(" Node ID: '%s'", config.NodeID)) cmd.UI.Info(fmt.Sprintf(" Node ID: '%s'", config.NodeID))
cmd.UI.Info(fmt.Sprintf(" Node name: '%s'", config.NodeName)) cmd.UI.Info(fmt.Sprintf(" Node name: '%s'", config.NodeName))
cmd.UI.Info(fmt.Sprintf(" Datacenter: '%s'", config.Datacenter)) cmd.UI.Info(fmt.Sprintf(" Datacenter: '%s' (Segment: '%s')", config.Datacenter, segment))
cmd.UI.Info(fmt.Sprintf(" Server: %v (bootstrap: %v)", config.Server, config.Bootstrap)) cmd.UI.Info(fmt.Sprintf(" Server: %v (Bootstrap: %v)", config.Server, config.Bootstrap))
cmd.UI.Info(fmt.Sprintf(" Client Addr: %v (HTTP: %d, HTTPS: %d, DNS: %d)", config.ClientAddr, cmd.UI.Info(fmt.Sprintf(" Client Addr: %v (HTTP: %d, HTTPS: %d, DNS: %d)", config.ClientAddr,
config.Ports.HTTP, config.Ports.HTTPS, config.Ports.DNS)) config.Ports.HTTP, config.Ports.HTTPS, config.Ports.DNS))
cmd.UI.Info(fmt.Sprintf(" Cluster Addr: %v (LAN: %d, WAN: %d)", config.AdvertiseAddr, cmd.UI.Info(fmt.Sprintf(" Cluster Addr: %v (LAN: %d, WAN: %d)", config.AdvertiseAddr,
config.Ports.SerfLan, config.Ports.SerfWan)) config.Ports.SerfLan, config.Ports.SerfWan))
cmd.UI.Info(fmt.Sprintf("Gossip encrypt: %v, RPC-TLS: %v, TLS-Incoming: %v", cmd.UI.Info(fmt.Sprintf(" Encrypt: Gossip: %v, TLS-Outgoing: %v, TLS-Incoming: %v",
agent.GossipEncrypted(), config.VerifyOutgoing, config.VerifyIncoming)) agent.GossipEncrypted(), config.VerifyOutgoing, config.VerifyIncoming))
// Enable log streaming // Enable log streaming

View File

@ -196,8 +196,11 @@ func TestReadCliConfig(t *testing.T) {
if config.SerfLanBindAddr != "4.3.2.2" { if config.SerfLanBindAddr != "4.3.2.2" {
t.Fatalf("expected -serf-lan-bind 4.3.2.2 got %s", config.SerfLanBindAddr) t.Fatalf("expected -serf-lan-bind 4.3.2.2 got %s", config.SerfLanBindAddr)
} }
if len(config.Meta) != 1 || config.Meta["somekey"] != "somevalue" { expected := map[string]string{
t.Fatalf("expected somekey=somevalue, got %v", config.Meta) "somekey": "somevalue",
}
if !reflect.DeepEqual(config.Meta, expected) {
t.Fatalf("bad: %v %v", config.Meta, expected)
} }
} }
@ -213,11 +216,11 @@ func TestReadCliConfig(t *testing.T) {
ShutdownCh: shutdownCh, ShutdownCh: shutdownCh,
BaseCommand: baseCommand(cli.NewMockUi()), BaseCommand: baseCommand(cli.NewMockUi()),
} }
config := cmd.readConfig()
expected := map[string]string{ expected := map[string]string{
"somekey": "somevalue", "somekey": "somevalue",
"otherkey": "othervalue", "otherkey": "othervalue",
} }
config := cmd.readConfig()
if !reflect.DeepEqual(config.Meta, expected) { if !reflect.DeepEqual(config.Meta, expected) {
t.Fatalf("bad: %v %v", config.Meta, expected) t.Fatalf("bad: %v %v", config.Meta, expected)
} }

View File

@ -33,6 +33,7 @@ func (c *MembersCommand) Run(args []string) int {
var detailed bool var detailed bool
var wan bool var wan bool
var statusFilter string var statusFilter string
var segment string
f := c.BaseCommand.NewFlagSet(c) f := c.BaseCommand.NewFlagSet(c)
f.BoolVar(&detailed, "detailed", false, f.BoolVar(&detailed, "detailed", false,
@ -43,6 +44,9 @@ func (c *MembersCommand) Run(args []string) int {
f.StringVar(&statusFilter, "status", ".*", f.StringVar(&statusFilter, "status", ".*",
"If provided, output is filtered to only nodes matching the regular "+ "If provided, output is filtered to only nodes matching the regular "+
"expression for status.") "expression for status.")
f.StringVar(&segment, "segment", "",
"(Enterprise-only) If provided, output is filtered to only nodes in"+
"the given segment.")
if err := c.BaseCommand.Parse(args); err != nil { if err := c.BaseCommand.Parse(args); err != nil {
return 1 return 1
@ -61,16 +65,39 @@ func (c *MembersCommand) Run(args []string) int {
return 1 return 1
} }
members, err := client.Agent().Members(wan) members, err := client.Agent().MembersOpts(wan, segment)
if err != nil { if err != nil {
c.UI.Error(fmt.Sprintf("Error retrieving members: %s", err)) c.UI.Error(fmt.Sprintf("Error retrieving members: %s", err))
return 1 return 1
} }
// Check if we queried a server and need to query for members in all segments.
if !wan && segment == "" {
self, err := client.Agent().Self()
if err != nil {
c.UI.Error(fmt.Sprintf("Error retrieving agent info: %s", err))
return 1
}
if self["Config"]["Server"].(bool) {
segmentMembers, err := getSegmentMembers(client)
if err != nil {
c.UI.Error(fmt.Sprintf("Error retrieving members in segments: %s", err))
return 1
}
members = append(members, segmentMembers...)
}
}
// Filter the results // Filter the results
n := len(members) n := len(members)
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
member := members[i] member := members[i]
if member.Tags["segment"] == "" {
member.Tags["segment"] = "<default>"
if member.Tags["role"] == "consul" {
member.Tags["segment"] = "<all>"
}
}
statusString := serf.MemberStatus(member.Status).String() statusString := serf.MemberStatus(member.Status).String()
if !statusRe.MatchString(statusString) { if !statusRe.MatchString(statusString) {
members[i], members[n-1] = members[n-1], members[i] members[i], members[n-1] = members[n-1], members[i]
@ -86,7 +113,7 @@ func (c *MembersCommand) Run(args []string) int {
return 2 return 2
} }
sort.Sort(ByMemberName(members)) sort.Sort(ByMemberNameAndSegment(members))
// Generate the output // Generate the output
var result []string var result []string
@ -104,17 +131,26 @@ func (c *MembersCommand) Run(args []string) int {
} }
// so we can sort members by name // so we can sort members by name
type ByMemberName []*consulapi.AgentMember type ByMemberNameAndSegment []*consulapi.AgentMember
func (m ByMemberName) Len() int { return len(m) } func (m ByMemberNameAndSegment) Len() int { return len(m) }
func (m ByMemberName) Swap(i, j int) { m[i], m[j] = m[j], m[i] } func (m ByMemberNameAndSegment) Swap(i, j int) { m[i], m[j] = m[j], m[i] }
func (m ByMemberName) Less(i, j int) bool { return m[i].Name < m[j].Name } func (m ByMemberNameAndSegment) Less(i, j int) bool {
switch {
case m[i].Tags["segment"] < m[j].Tags["segment"]:
return true
case m[i].Tags["segment"] > m[j].Tags["segment"]:
return false
default:
return m[i].Name < m[j].Name
}
}
// standardOutput is used to dump the most useful information about nodes // standardOutput is used to dump the most useful information about nodes
// in a more human-friendly format // in a more human-friendly format
func (c *MembersCommand) standardOutput(members []*consulapi.AgentMember) []string { func (c *MembersCommand) standardOutput(members []*consulapi.AgentMember) []string {
result := make([]string, 0, len(members)) result := make([]string, 0, len(members))
header := "Node|Address|Status|Type|Build|Protocol|DC" header := "Node|Address|Status|Type|Build|Protocol|DC|Segment"
result = append(result, header) result = append(result, header)
for _, member := range members { for _, member := range members {
addr := net.TCPAddr{IP: net.ParseIP(member.Addr), Port: int(member.Port)} addr := net.TCPAddr{IP: net.ParseIP(member.Addr), Port: int(member.Port)}
@ -126,19 +162,20 @@ func (c *MembersCommand) standardOutput(members []*consulapi.AgentMember) []stri
build = build[:idx] build = build[:idx]
} }
dc := member.Tags["dc"] dc := member.Tags["dc"]
segment := member.Tags["segment"]
statusString := serf.MemberStatus(member.Status).String() statusString := serf.MemberStatus(member.Status).String()
switch member.Tags["role"] { switch member.Tags["role"] {
case "node": case "node":
line := fmt.Sprintf("%s|%s|%s|client|%s|%s|%s", line := fmt.Sprintf("%s|%s|%s|client|%s|%s|%s|%s",
member.Name, addr.String(), statusString, build, protocol, dc) member.Name, addr.String(), statusString, build, protocol, dc, segment)
result = append(result, line) result = append(result, line)
case "consul": case "consul":
line := fmt.Sprintf("%s|%s|%s|server|%s|%s|%s", line := fmt.Sprintf("%s|%s|%s|server|%s|%s|%s|%s",
member.Name, addr.String(), statusString, build, protocol, dc) member.Name, addr.String(), statusString, build, protocol, dc, segment)
result = append(result, line) result = append(result, line)
default: default:
line := fmt.Sprintf("%s|%s|%s|unknown|||", line := fmt.Sprintf("%s|%s|%s|unknown||||",
member.Name, addr.String(), statusString) member.Name, addr.String(), statusString)
result = append(result, line) result = append(result, line)
} }

View File

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"strings" "strings"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/serf/coordinate" "github.com/hashicorp/serf/coordinate"
) )
@ -145,20 +146,23 @@ func (c *RTTCommand) Run(args []string) int {
return 1 return 1
} }
// See if the requested nodes are in there. // Index all the coordinates by segment.
cs1, cs2 := make(lib.CoordinateSet), make(lib.CoordinateSet)
for _, entry := range entries { for _, entry := range entries {
if entry.Node == nodes[0] { if entry.Node == nodes[0] {
coord1 = entry.Coord cs1[entry.Segment] = entry.Coord
} }
if entry.Node == nodes[1] { if entry.Node == nodes[1] {
coord2 = entry.Coord cs2[entry.Segment] = entry.Coord
}
} }
// See if there's a compatible set of coordinates.
coord1, coord2 = cs1.Intersect(cs2)
if coord1 != nil && coord2 != nil { if coord1 != nil && coord2 != nil {
goto SHOW_RTT goto SHOW_RTT
} }
} }
}
// Make sure we found both coordinates. // Make sure we found both coordinates.
if coord1 == nil { if coord1 == nil {

13
command/segment_stub.go Normal file
View File

@ -0,0 +1,13 @@
// +build !ent
package command
import (
consulapi "github.com/hashicorp/consul/api"
)
// getSegmentMembers returns an empty list since network segments are not
// supported in OSS Consul.
func getSegmentMembers(client *consulapi.Client) ([]*consulapi.AgentMember, error) {
return nil, nil
}

View File

@ -18,6 +18,39 @@ func ComputeDistance(a *coordinate.Coordinate, b *coordinate.Coordinate) float64
return a.DistanceTo(b).Seconds() return a.DistanceTo(b).Seconds()
} }
// CoordinateSet holds all the coordinates for a given node, indexed by network
// segment name.
type CoordinateSet map[string]*coordinate.Coordinate
// Intersect tries to return a pair of coordinates which are compatible with the
// current set and a given set. We employ some special knowledge about network
// segments to avoid doing a full intersection, since this is in several hot
// paths. This might return nil for either coordinate in the output pair if an
// intersection cannot be found. The ComputeDistance function above is designed
// to deal with that.
func (cs CoordinateSet) Intersect(other CoordinateSet) (*coordinate.Coordinate, *coordinate.Coordinate) {
// Use the empty segment by default.
segment := ""
// If we have a single segment, then let our segment take priority since
// we are possibly a client. Any node with more than one segment can only
// be a server, which means it should be in all segments.
if len(cs) == 1 {
for s, _ := range cs {
segment = s
}
}
// Likewise for the other set.
if len(other) == 1 {
for s, _ := range other {
segment = s
}
}
return cs[segment], other[segment]
}
// GenerateCoordinate creates a new coordinate with the given distance from the // GenerateCoordinate creates a new coordinate with the given distance from the
// origin. This should only be used for tests. // origin. This should only be used for tests.
func GenerateCoordinate(rtt time.Duration) *coordinate.Coordinate { func GenerateCoordinate(rtt time.Duration) *coordinate.Coordinate {

View File

@ -6,49 +6,148 @@ import (
"time" "time"
"github.com/hashicorp/serf/coordinate" "github.com/hashicorp/serf/coordinate"
"github.com/pascaldekloe/goe/verify"
) )
func TestRTT(t *testing.T) { func TestRTT_ComputeDistance(t *testing.T) {
cases := []struct { tests := []struct {
desc string
a *coordinate.Coordinate a *coordinate.Coordinate
b *coordinate.Coordinate b *coordinate.Coordinate
dist float64 dist float64
}{ }{
{ {
"10 ms",
GenerateCoordinate(0), GenerateCoordinate(0),
GenerateCoordinate(10 * time.Millisecond), GenerateCoordinate(10 * time.Millisecond),
0.010, 0.010,
}, },
{ {
"0 ms",
GenerateCoordinate(10 * time.Millisecond), GenerateCoordinate(10 * time.Millisecond),
GenerateCoordinate(10 * time.Millisecond), GenerateCoordinate(10 * time.Millisecond),
0.0, 0.0,
}, },
{ {
"2 ms",
GenerateCoordinate(8 * time.Millisecond), GenerateCoordinate(8 * time.Millisecond),
GenerateCoordinate(10 * time.Millisecond), GenerateCoordinate(10 * time.Millisecond),
0.002, 0.002,
}, },
{ {
"2 ms reversed",
GenerateCoordinate(10 * time.Millisecond), GenerateCoordinate(10 * time.Millisecond),
GenerateCoordinate(8 * time.Millisecond), GenerateCoordinate(8 * time.Millisecond),
0.002, 0.002,
}, },
{ {
"a nil",
nil, nil,
GenerateCoordinate(8 * time.Millisecond), GenerateCoordinate(8 * time.Millisecond),
math.Inf(1.0), math.Inf(1.0),
}, },
{ {
"b nil",
GenerateCoordinate(8 * time.Millisecond), GenerateCoordinate(8 * time.Millisecond),
nil, nil,
math.Inf(1.0), math.Inf(1.0),
}, },
{
"both nil",
nil,
nil,
math.Inf(1.0),
},
} }
for i, c := range cases { for _, tt := range tests {
dist := ComputeDistance(c.a, c.b) t.Run(tt.desc, func(t *testing.T) {
if c.dist != dist { dist := ComputeDistance(tt.a, tt.b)
t.Fatalf("bad (%d): %9.6f != %9.6f", i, c.dist, dist) verify.Values(t, "", dist, tt.dist)
} })
}
}
func TestRTT_Intersect(t *testing.T) {
// The numbers here don't matter, we just want a unique coordinate for
// each one.
server_1 := CoordinateSet{
"": GenerateCoordinate(1 * time.Millisecond),
"alpha": GenerateCoordinate(2 * time.Millisecond),
"beta": GenerateCoordinate(3 * time.Millisecond),
}
server_2 := CoordinateSet{
"": GenerateCoordinate(4 * time.Millisecond),
"alpha": GenerateCoordinate(5 * time.Millisecond),
"beta": GenerateCoordinate(6 * time.Millisecond),
}
client_alpha := CoordinateSet{
"alpha": GenerateCoordinate(7 * time.Millisecond),
}
client_beta_1 := CoordinateSet{
"beta": GenerateCoordinate(8 * time.Millisecond),
}
client_beta_2 := CoordinateSet{
"beta": GenerateCoordinate(9 * time.Millisecond),
}
tests := []struct {
desc string
a CoordinateSet
b CoordinateSet
c1 *coordinate.Coordinate
c2 *coordinate.Coordinate
}{
{
"nil maps",
nil, nil,
nil, nil,
},
{
"two servers",
server_1, server_2,
server_1[""], server_2[""],
},
{
"two clients",
client_beta_1, client_beta_2,
client_beta_1["beta"], client_beta_2["beta"],
},
{
"server_1 and client alpha",
server_1, client_alpha,
server_1["alpha"], client_alpha["alpha"],
},
{
"server_1 and client beta 1",
server_1, client_beta_1,
server_1["beta"], client_beta_1["beta"],
},
{
"server_1 and client alpha reversed",
client_alpha, server_1,
client_alpha["alpha"], server_1["alpha"],
},
{
"server_1 and client beta 1 reversed",
client_beta_1, server_1,
client_beta_1["beta"], server_1["beta"],
},
{
"nothing in common",
client_alpha, client_beta_1,
nil, client_beta_1["beta"],
},
{
"nothing in common reversed",
client_beta_1, client_alpha,
nil, client_alpha["alpha"],
},
}
for _, tt := range tests {
t.Run(tt.desc, func(t *testing.T) {
r1, r2 := tt.a.Intersect(tt.b)
verify.Values(t, "", r1, tt.c1)
verify.Values(t, "", r2, tt.c2)
})
} }
} }

View File

@ -58,6 +58,14 @@ type TestAddressConfig struct {
HTTP string `json:"http,omitempty"` HTTP string `json:"http,omitempty"`
} }
// TestNetworkSegment contains the configuration for a network segment.
type TestNetworkSegment struct {
Name string `json:"name"`
Bind string `json:"bind"`
Port int `json:"port"`
Advertise string `json:"advertise"`
}
// TestServerConfig is the main server configuration struct. // TestServerConfig is the main server configuration struct.
type TestServerConfig struct { type TestServerConfig struct {
NodeName string `json:"node_name"` NodeName string `json:"node_name"`
@ -68,6 +76,7 @@ type TestServerConfig struct {
Server bool `json:"server,omitempty"` Server bool `json:"server,omitempty"`
DataDir string `json:"data_dir,omitempty"` DataDir string `json:"data_dir,omitempty"`
Datacenter string `json:"datacenter,omitempty"` Datacenter string `json:"datacenter,omitempty"`
Segments []TestNetworkSegment `json:"segments"`
DisableCheckpoint bool `json:"disable_update_check"` DisableCheckpoint bool `json:"disable_update_check"`
LogLevel string `json:"log_level,omitempty"` LogLevel string `json:"log_level,omitempty"`
Bind string `json:"bind_addr,omitempty"` Bind string `json:"bind_addr,omitempty"`

View File

@ -108,6 +108,7 @@ $ curl \
[ [
{ {
"Node": "agent-one", "Node": "agent-one",
"Segment": "",
"Coord": { "Coord": {
"Adjustment": 0, "Adjustment": 0,
"Error": 1.5, "Error": 1.5,
@ -117,3 +118,7 @@ $ curl \
} }
] ]
``` ```
In **Consul Enterprise**, this may include multiple coordinates for the same node,
each marked with a different `Segment`. Coordinates are only compatible within the same
segment.