mirror of https://github.com/hashicorp/consul
Merge pull request #1698 from hashicorp/pr-1547-slackpad
Implements WAN address translation.pull/1699/head
commit
d2cc2801fa
|
@ -161,6 +161,11 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) {
|
|||
config.AdvertiseAddrWan = config.AdvertiseAddr
|
||||
}
|
||||
|
||||
// Create the default set of tagged addresses.
|
||||
config.TaggedAddresses = map[string]string{
|
||||
"wan": config.AdvertiseAddrWan,
|
||||
}
|
||||
|
||||
agent := &Agent{
|
||||
config: config,
|
||||
logger: log.New(logOutput, "", log.LstdFlags),
|
||||
|
|
|
@ -168,6 +168,12 @@ func TestAgent_CheckAdvertiseAddrsSettings(t *testing.T) {
|
|||
if rpc != c.AdvertiseAddrs.RPC {
|
||||
t.Fatalf("RPC is not properly set to %v: %s", c.AdvertiseAddrs.RPC, rpc)
|
||||
}
|
||||
expected := map[string]string{
|
||||
"wan": agent.config.AdvertiseAddrWan,
|
||||
}
|
||||
if !reflect.DeepEqual(agent.config.TaggedAddresses, expected) {
|
||||
t.Fatalf("Tagged addresses not set up properly: %v", agent.config.TaggedAddresses)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAgent_AddService(t *testing.T) {
|
||||
|
|
|
@ -80,6 +80,7 @@ func TestRetryJoin(t *testing.T) {
|
|||
"-server",
|
||||
"-data-dir", tmpDir,
|
||||
"-node", fmt.Sprintf(`"%s"`, conf2.NodeName),
|
||||
"-advertise", agent.config.BindAddr,
|
||||
"-retry-join", serfAddr,
|
||||
"-retry-interval", "1s",
|
||||
"-retry-join-wan", serfWanAddr,
|
||||
|
|
|
@ -189,12 +189,25 @@ type Config struct {
|
|||
// Serf WAN IP. If not specified, the general advertise address is used.
|
||||
AdvertiseAddrWan string `mapstructure:"advertise_addr_wan"`
|
||||
|
||||
// TranslateWanAddrs controls whether or not Consul should prefer
|
||||
// the "wan" tagged address when doing lookups in remote datacenters.
|
||||
// See TaggedAddresses below for more details.
|
||||
TranslateWanAddrs bool `mapstructure:"translate_wan_addrs"`
|
||||
|
||||
// Port configurations
|
||||
Ports PortConfig
|
||||
|
||||
// Address configurations
|
||||
Addresses AddressConfig
|
||||
|
||||
// Tagged addresses. These are used to publish a set of addresses for
|
||||
// for a node, which can be used by the remote agent. We currently
|
||||
// populate only the "wan" tag based on the SerfWan advertise address,
|
||||
// but this structure is here for possible future features with other
|
||||
// user-defined tags. The "wan" tag will be used by remote agents if
|
||||
// they are configured with TranslateWanAddrs set to true.
|
||||
TaggedAddresses map[string]string
|
||||
|
||||
// LeaveOnTerm controls if Serf does a graceful leave when receiving
|
||||
// the TERM signal. Defaults false. This can be changed on reload.
|
||||
LeaveOnTerm bool `mapstructure:"leave_on_terminate"`
|
||||
|
@ -968,6 +981,9 @@ func MergeConfig(a, b *Config) *Config {
|
|||
if b.AdvertiseAddrWan != "" {
|
||||
result.AdvertiseAddrWan = b.AdvertiseAddrWan
|
||||
}
|
||||
if b.TranslateWanAddrs == true {
|
||||
result.TranslateWanAddrs = true
|
||||
}
|
||||
if b.AdvertiseAddrs.SerfLan != nil {
|
||||
result.AdvertiseAddrs.SerfLan = b.AdvertiseAddrs.SerfLan
|
||||
result.AdvertiseAddrs.SerfLanRaw = b.AdvertiseAddrs.SerfLanRaw
|
||||
|
|
|
@ -253,6 +253,25 @@ func TestDecodeConfig(t *testing.T) {
|
|||
t.Fatalf("bad: %#v", config)
|
||||
}
|
||||
|
||||
// WAN address translation disabled by default
|
||||
config, err = DecodeConfig(bytes.NewReader([]byte(`{}`)))
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if config.TranslateWanAddrs != false {
|
||||
t.Fatalf("bad: %#v", config)
|
||||
}
|
||||
|
||||
// WAN address translation
|
||||
input = `{"translate_wan_addrs": true}`
|
||||
config, err = DecodeConfig(bytes.NewReader([]byte(input)))
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if config.TranslateWanAddrs != true {
|
||||
t.Fatalf("bad: %#v", config)
|
||||
}
|
||||
|
||||
// leave_on_terminate
|
||||
input = `{"leave_on_terminate": true}`
|
||||
config, err = DecodeConfig(bytes.NewReader([]byte(input)))
|
||||
|
|
|
@ -363,6 +363,19 @@ INVALID:
|
|||
resp.SetRcode(req, dns.RcodeNameError)
|
||||
}
|
||||
|
||||
// translateAddr is used to provide the final, translated address for a node,
|
||||
// depending on how this agent and the other node are configured.
|
||||
func (d *DNSServer) translateAddr(dc string, node *structs.Node) string {
|
||||
addr := node.Address
|
||||
if d.agent.config.TranslateWanAddrs && (d.agent.config.Datacenter != dc) {
|
||||
wanAddr := node.TaggedAddresses["wan"]
|
||||
if wanAddr != "" {
|
||||
addr = wanAddr
|
||||
}
|
||||
}
|
||||
return addr
|
||||
}
|
||||
|
||||
// nodeLookup is used to handle a node query
|
||||
func (d *DNSServer) nodeLookup(network, datacenter, node string, req, resp *dns.Msg) {
|
||||
// Only handle ANY, A and AAAA type requests
|
||||
|
@ -403,7 +416,8 @@ RPC:
|
|||
}
|
||||
|
||||
// Add the node record
|
||||
records := d.formatNodeRecord(out.NodeServices.Node, out.NodeServices.Node.Address,
|
||||
addr := d.translateAddr(datacenter, out.NodeServices.Node)
|
||||
records := d.formatNodeRecord(out.NodeServices.Node, addr,
|
||||
req.Question[0].Name, qType, d.config.NodeTTL)
|
||||
if records != nil {
|
||||
resp.Answer = append(resp.Answer, records...)
|
||||
|
@ -526,7 +540,7 @@ RPC:
|
|||
|
||||
// Add various responses depending on the request
|
||||
qType := req.Question[0].Qtype
|
||||
d.serviceNodeRecords(out.Nodes, req, resp, ttl)
|
||||
d.serviceNodeRecords(datacenter, out.Nodes, req, resp, ttl)
|
||||
|
||||
if qType == dns.TypeSRV {
|
||||
d.serviceSRVRecords(datacenter, out.Nodes, req, resp, ttl)
|
||||
|
@ -622,7 +636,7 @@ RPC:
|
|||
|
||||
// Add various responses depending on the request.
|
||||
qType := req.Question[0].Qtype
|
||||
d.serviceNodeRecords(out.Nodes, req, resp, ttl)
|
||||
d.serviceNodeRecords(datacenter, out.Nodes, req, resp, ttl)
|
||||
if qType == dns.TypeSRV {
|
||||
d.serviceSRVRecords(datacenter, out.Nodes, req, resp, ttl)
|
||||
}
|
||||
|
@ -646,18 +660,20 @@ RPC:
|
|||
}
|
||||
|
||||
// serviceNodeRecords is used to add the node records for a service lookup
|
||||
func (d *DNSServer) serviceNodeRecords(nodes structs.CheckServiceNodes, req, resp *dns.Msg, ttl time.Duration) {
|
||||
func (d *DNSServer) serviceNodeRecords(dc string, nodes structs.CheckServiceNodes, req, resp *dns.Msg, ttl time.Duration) {
|
||||
qName := req.Question[0].Name
|
||||
qType := req.Question[0].Qtype
|
||||
handled := make(map[string]struct{})
|
||||
for _, node := range nodes {
|
||||
// Avoid duplicate entries, possible if a node has
|
||||
// the same service on multiple ports, etc.
|
||||
addr := node.Node.Address
|
||||
// Start with the translated address but use the service address,
|
||||
// if specified.
|
||||
addr := d.translateAddr(dc, node.Node)
|
||||
if node.Service.Address != "" {
|
||||
addr = node.Service.Address
|
||||
}
|
||||
|
||||
// Avoid duplicate entries, possible if a node has
|
||||
// the same service on multiple ports, etc.
|
||||
if _, ok := handled[addr]; ok {
|
||||
continue
|
||||
}
|
||||
|
@ -698,8 +714,9 @@ func (d *DNSServer) serviceSRVRecords(dc string, nodes structs.CheckServiceNodes
|
|||
}
|
||||
resp.Answer = append(resp.Answer, srvRec)
|
||||
|
||||
// Determine advertised address
|
||||
addr := node.Node.Address
|
||||
// Start with the translated address but use the service address,
|
||||
// if specified.
|
||||
addr := d.translateAddr(dc, node.Node)
|
||||
if node.Service.Address != "" {
|
||||
addr = node.Service.Address
|
||||
}
|
||||
|
|
|
@ -117,6 +117,9 @@ func TestDNS_NodeLookup(t *testing.T) {
|
|||
Datacenter: "dc1",
|
||||
Node: "foo",
|
||||
Address: "127.0.0.1",
|
||||
TaggedAddresses: map[string]string{
|
||||
"wan": "127.0.0.2",
|
||||
},
|
||||
}
|
||||
|
||||
var out struct{}
|
||||
|
@ -715,6 +718,194 @@ func TestDNS_ServiceLookup_ServiceAddress(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestDNS_ServiceLookup_WanAddress(t *testing.T) {
|
||||
dir1, srv1 := makeDNSServerConfig(t,
|
||||
func(c *Config) {
|
||||
c.Datacenter = "dc1"
|
||||
c.TranslateWanAddrs = true
|
||||
}, nil)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer srv1.Shutdown()
|
||||
|
||||
dir2, srv2 := makeDNSServerConfig(t, func(c *Config) {
|
||||
c.Datacenter = "dc2"
|
||||
c.TranslateWanAddrs = true
|
||||
}, nil)
|
||||
defer os.RemoveAll(dir2)
|
||||
defer srv2.Shutdown()
|
||||
|
||||
testutil.WaitForLeader(t, srv1.agent.RPC, "dc1")
|
||||
testutil.WaitForLeader(t, srv2.agent.RPC, "dc2")
|
||||
|
||||
// Join WAN cluster
|
||||
addr := fmt.Sprintf("127.0.0.1:%d",
|
||||
srv1.agent.config.Ports.SerfWan)
|
||||
if _, err := srv2.agent.JoinWAN([]string{addr}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
testutil.WaitForResult(
|
||||
func() (bool, error) {
|
||||
return len(srv1.agent.WANMembers()) > 1, nil
|
||||
},
|
||||
func(err error) {
|
||||
t.Fatalf("Failed waiting for WAN join: %v", err)
|
||||
})
|
||||
|
||||
// Register a remote node with a service.
|
||||
{
|
||||
args := &structs.RegisterRequest{
|
||||
Datacenter: "dc2",
|
||||
Node: "foo",
|
||||
Address: "127.0.0.1",
|
||||
TaggedAddresses: map[string]string{
|
||||
"wan": "127.0.0.2",
|
||||
},
|
||||
Service: &structs.NodeService{
|
||||
Service: "db",
|
||||
},
|
||||
}
|
||||
|
||||
var out struct{}
|
||||
if err := srv2.agent.RPC("Catalog.Register", args, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Register an equivalent prepared query.
|
||||
var id string
|
||||
{
|
||||
args := &structs.PreparedQueryRequest{
|
||||
Datacenter: "dc2",
|
||||
Op: structs.PreparedQueryCreate,
|
||||
Query: &structs.PreparedQuery{
|
||||
Service: structs.ServiceQuery{
|
||||
Service: "db",
|
||||
},
|
||||
},
|
||||
}
|
||||
if err := srv2.agent.RPC("PreparedQuery.Apply", args, &id); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Look up the SRV record via service and prepared query.
|
||||
questions := []string{
|
||||
"db.service.dc2.consul.",
|
||||
id + ".query.dc2.consul.",
|
||||
}
|
||||
for _, question := range questions {
|
||||
m := new(dns.Msg)
|
||||
m.SetQuestion(question, dns.TypeSRV)
|
||||
|
||||
c := new(dns.Client)
|
||||
addr, _ := srv1.agent.config.ClientListener("", srv1.agent.config.Ports.DNS)
|
||||
in, _, err := c.Exchange(m, addr.String())
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if len(in.Answer) != 1 {
|
||||
t.Fatalf("Bad: %#v", in)
|
||||
}
|
||||
|
||||
aRec, ok := in.Extra[0].(*dns.A)
|
||||
if !ok {
|
||||
t.Fatalf("Bad: %#v", in.Extra[0])
|
||||
}
|
||||
if aRec.Hdr.Name != "foo.node.dc2.consul." {
|
||||
t.Fatalf("Bad: %#v", in.Extra[0])
|
||||
}
|
||||
if aRec.A.String() != "127.0.0.2" {
|
||||
t.Fatalf("Bad: %#v", in.Extra[0])
|
||||
}
|
||||
}
|
||||
|
||||
// Also check the A record directly
|
||||
for _, question := range questions {
|
||||
m := new(dns.Msg)
|
||||
m.SetQuestion(question, dns.TypeA)
|
||||
|
||||
c := new(dns.Client)
|
||||
addr, _ := srv1.agent.config.ClientListener("", srv1.agent.config.Ports.DNS)
|
||||
in, _, err := c.Exchange(m, addr.String())
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if len(in.Answer) != 1 {
|
||||
t.Fatalf("Bad: %#v", in)
|
||||
}
|
||||
|
||||
aRec, ok := in.Answer[0].(*dns.A)
|
||||
if !ok {
|
||||
t.Fatalf("Bad: %#v", in.Answer[0])
|
||||
}
|
||||
if aRec.Hdr.Name != question {
|
||||
t.Fatalf("Bad: %#v", in.Answer[0])
|
||||
}
|
||||
if aRec.A.String() != "127.0.0.2" {
|
||||
t.Fatalf("Bad: %#v", in.Answer[0])
|
||||
}
|
||||
}
|
||||
|
||||
// Now query from the same DC and make sure we get the local address
|
||||
for _, question := range questions {
|
||||
m := new(dns.Msg)
|
||||
m.SetQuestion(question, dns.TypeSRV)
|
||||
|
||||
c := new(dns.Client)
|
||||
addr, _ := srv2.agent.config.ClientListener("", srv2.agent.config.Ports.DNS)
|
||||
in, _, err := c.Exchange(m, addr.String())
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if len(in.Answer) != 1 {
|
||||
t.Fatalf("Bad: %#v", in)
|
||||
}
|
||||
|
||||
aRec, ok := in.Extra[0].(*dns.A)
|
||||
if !ok {
|
||||
t.Fatalf("Bad: %#v", in.Extra[0])
|
||||
}
|
||||
if aRec.Hdr.Name != "foo.node.dc2.consul." {
|
||||
t.Fatalf("Bad: %#v", in.Extra[0])
|
||||
}
|
||||
if aRec.A.String() != "127.0.0.1" {
|
||||
t.Fatalf("Bad: %#v", in.Extra[0])
|
||||
}
|
||||
}
|
||||
|
||||
// Also check the A record directly from DC2
|
||||
for _, question := range questions {
|
||||
m := new(dns.Msg)
|
||||
m.SetQuestion(question, dns.TypeA)
|
||||
|
||||
c := new(dns.Client)
|
||||
addr, _ := srv2.agent.config.ClientListener("", srv2.agent.config.Ports.DNS)
|
||||
in, _, err := c.Exchange(m, addr.String())
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if len(in.Answer) != 1 {
|
||||
t.Fatalf("Bad: %#v", in)
|
||||
}
|
||||
|
||||
aRec, ok := in.Answer[0].(*dns.A)
|
||||
if !ok {
|
||||
t.Fatalf("Bad: %#v", in.Answer[0])
|
||||
}
|
||||
if aRec.Hdr.Name != question {
|
||||
t.Fatalf("Bad: %#v", in.Answer[0])
|
||||
}
|
||||
if aRec.A.String() != "127.0.0.1" {
|
||||
t.Fatalf("Bad: %#v", in.Answer[0])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestDNS_CaseInsensitiveServiceLookup(t *testing.T) {
|
||||
dir, srv := makeDNSServer(t)
|
||||
defer os.RemoveAll(dir)
|
||||
|
|
|
@ -3,6 +3,7 @@ package agent
|
|||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
@ -45,6 +46,10 @@ type localState struct {
|
|||
// iface is the consul interface to use for keeping in sync
|
||||
iface consul.Interface
|
||||
|
||||
// nodeInfoInSync tracks whether the server has our correct top-level
|
||||
// node information in sync (currently only used for tagged addresses)
|
||||
nodeInfoInSync bool
|
||||
|
||||
// Services tracks the local services
|
||||
services map[string]*structs.NodeService
|
||||
serviceStatus map[string]syncStatus
|
||||
|
@ -361,6 +366,13 @@ func (l *localState) setSyncState() error {
|
|||
l.Lock()
|
||||
defer l.Unlock()
|
||||
|
||||
// Check the node info (currently limited to tagged addresses since
|
||||
// everything else is managed by the Serf layer)
|
||||
if !reflect.DeepEqual(out1.NodeServices.Node.TaggedAddresses, l.config.TaggedAddresses) {
|
||||
l.nodeInfoInSync = false
|
||||
}
|
||||
|
||||
// Check all our services
|
||||
services := make(map[string]*structs.NodeService)
|
||||
if out1.NodeServices != nil {
|
||||
services = out1.NodeServices.Services
|
||||
|
@ -440,6 +452,10 @@ func (l *localState) syncChanges() error {
|
|||
l.Lock()
|
||||
defer l.Unlock()
|
||||
|
||||
// We will do node-level info syncing at the end, since it will get
|
||||
// updated by a service or check sync anyway, given how the register
|
||||
// API works.
|
||||
|
||||
// Sync the services
|
||||
for id, status := range l.serviceStatus {
|
||||
if status.remoteDelete {
|
||||
|
@ -475,6 +491,15 @@ func (l *localState) syncChanges() error {
|
|||
l.logger.Printf("[DEBUG] agent: Check '%s' in sync", id)
|
||||
}
|
||||
}
|
||||
|
||||
// Now sync the node level info if we need to, and didn't do any of
|
||||
// the other sync operations.
|
||||
if !l.nodeInfoInSync {
|
||||
if err := l.syncNodeInfo(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -523,11 +548,12 @@ func (l *localState) deleteCheck(id string) error {
|
|||
// syncService is used to sync a service to the server
|
||||
func (l *localState) syncService(id string) error {
|
||||
req := structs.RegisterRequest{
|
||||
Datacenter: l.config.Datacenter,
|
||||
Node: l.config.NodeName,
|
||||
Address: l.config.AdvertiseAddr,
|
||||
Service: l.services[id],
|
||||
WriteRequest: structs.WriteRequest{Token: l.serviceToken(id)},
|
||||
Datacenter: l.config.Datacenter,
|
||||
Node: l.config.NodeName,
|
||||
Address: l.config.AdvertiseAddr,
|
||||
TaggedAddresses: l.config.TaggedAddresses,
|
||||
Service: l.services[id],
|
||||
WriteRequest: structs.WriteRequest{Token: l.serviceToken(id)},
|
||||
}
|
||||
|
||||
// If the service has associated checks that are out of sync,
|
||||
|
@ -553,6 +579,9 @@ func (l *localState) syncService(id string) error {
|
|||
err := l.iface.RPC("Catalog.Register", &req, &out)
|
||||
if err == nil {
|
||||
l.serviceStatus[id] = syncStatus{inSync: true}
|
||||
// Given how the register API works, this info is also updated
|
||||
// every time we sync a service.
|
||||
l.nodeInfoInSync = true
|
||||
l.logger.Printf("[INFO] agent: Synced service '%s'", id)
|
||||
for _, check := range checks {
|
||||
l.checkStatus[check.CheckID] = syncStatus{inSync: true}
|
||||
|
@ -580,17 +609,21 @@ func (l *localState) syncCheck(id string) error {
|
|||
}
|
||||
|
||||
req := structs.RegisterRequest{
|
||||
Datacenter: l.config.Datacenter,
|
||||
Node: l.config.NodeName,
|
||||
Address: l.config.AdvertiseAddr,
|
||||
Service: service,
|
||||
Check: l.checks[id],
|
||||
WriteRequest: structs.WriteRequest{Token: l.checkToken(id)},
|
||||
Datacenter: l.config.Datacenter,
|
||||
Node: l.config.NodeName,
|
||||
Address: l.config.AdvertiseAddr,
|
||||
TaggedAddresses: l.config.TaggedAddresses,
|
||||
Service: service,
|
||||
Check: l.checks[id],
|
||||
WriteRequest: structs.WriteRequest{Token: l.checkToken(id)},
|
||||
}
|
||||
var out struct{}
|
||||
err := l.iface.RPC("Catalog.Register", &req, &out)
|
||||
if err == nil {
|
||||
l.checkStatus[id] = syncStatus{inSync: true}
|
||||
// Given how the register API works, this info is also updated
|
||||
// every time we sync a service.
|
||||
l.nodeInfoInSync = true
|
||||
l.logger.Printf("[INFO] agent: Synced check '%s'", id)
|
||||
} else if strings.Contains(err.Error(), permissionDenied) {
|
||||
l.checkStatus[id] = syncStatus{inSync: true}
|
||||
|
@ -599,3 +632,24 @@ func (l *localState) syncCheck(id string) error {
|
|||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (l *localState) syncNodeInfo() error {
|
||||
req := structs.RegisterRequest{
|
||||
Datacenter: l.config.Datacenter,
|
||||
Node: l.config.NodeName,
|
||||
Address: l.config.AdvertiseAddr,
|
||||
TaggedAddresses: l.config.TaggedAddresses,
|
||||
WriteRequest: structs.WriteRequest{Token: l.config.ACLToken},
|
||||
}
|
||||
var out struct{}
|
||||
err := l.iface.RPC("Catalog.Register", &req, &out)
|
||||
if err == nil {
|
||||
l.nodeInfoInSync = true
|
||||
l.logger.Printf("[INFO] agent: Synced node info")
|
||||
} else if strings.Contains(err.Error(), permissionDenied) {
|
||||
l.nodeInfoInSync = true
|
||||
l.logger.Printf("[WARN] agent: Node info update blocked by ACLs")
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -120,6 +120,12 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
|
|||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Make sure we sent along our tagged addresses when we synced.
|
||||
addrs := services.NodeServices.Node.TaggedAddresses
|
||||
if len(addrs) == 0 || !reflect.DeepEqual(addrs, conf.TaggedAddresses) {
|
||||
t.Fatalf("bad: %v", addrs)
|
||||
}
|
||||
|
||||
// We should have 6 services (consul included)
|
||||
if len(services.NodeServices.Services) != 6 {
|
||||
t.Fatalf("bad: %v", services.NodeServices.Services)
|
||||
|
@ -627,6 +633,23 @@ func TestAgentAntiEntropy_Checks(t *testing.T) {
|
|||
t.Fatalf("should be in sync: %v %v", name, status)
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure we sent along our tagged addresses when we synced.
|
||||
{
|
||||
req := structs.NodeSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: agent.config.NodeName,
|
||||
}
|
||||
var services structs.IndexedNodeServices
|
||||
if err := agent.RPC("Catalog.NodeServices", &req, &services); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
addrs := services.NodeServices.Node.TaggedAddresses
|
||||
if len(addrs) == 0 || !reflect.DeepEqual(addrs, conf.TaggedAddresses) {
|
||||
t.Fatalf("bad: %v", addrs)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) {
|
||||
|
@ -708,6 +731,66 @@ func TestAgentAntiEntropy_Check_DeferSync(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestAgentAntiEntropy_NodeInfo(t *testing.T) {
|
||||
conf := nextConfig()
|
||||
dir, agent := makeAgent(t, conf)
|
||||
defer os.RemoveAll(dir)
|
||||
defer agent.Shutdown()
|
||||
|
||||
testutil.WaitForLeader(t, agent.RPC, "dc1")
|
||||
|
||||
// Register info
|
||||
args := &structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: agent.config.NodeName,
|
||||
Address: "127.0.0.1",
|
||||
}
|
||||
var out struct{}
|
||||
if err := agent.RPC("Catalog.Register", args, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Trigger anti-entropy run and wait
|
||||
agent.StartSync()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Verify that we are in sync
|
||||
req := structs.NodeSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: agent.config.NodeName,
|
||||
}
|
||||
var services structs.IndexedNodeServices
|
||||
if err := agent.RPC("Catalog.NodeServices", &req, &services); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Make sure we synced our node info - this should have ridden on the
|
||||
// "consul" service sync
|
||||
addrs := services.NodeServices.Node.TaggedAddresses
|
||||
if len(addrs) == 0 || !reflect.DeepEqual(addrs, conf.TaggedAddresses) {
|
||||
t.Fatalf("bad: %v", addrs)
|
||||
}
|
||||
|
||||
// Blow away the catalog version of the node info
|
||||
if err := agent.RPC("Catalog.Register", args, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Trigger anti-entropy run and wait
|
||||
agent.StartSync()
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Verify that we are in sync - this should have been a sync of just the
|
||||
// node info
|
||||
if err := agent.RPC("Catalog.NodeServices", &req, &services); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
addrs = services.NodeServices.Node.TaggedAddresses
|
||||
if len(addrs) == 0 || !reflect.DeepEqual(addrs, conf.TaggedAddresses) {
|
||||
t.Fatalf("bad: %v", addrs)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAgentAntiEntropy_deleteService_fails(t *testing.T) {
|
||||
l := new(localState)
|
||||
if err := l.deleteService(""); err == nil {
|
||||
|
@ -816,7 +899,7 @@ func TestAgent_sendCoordinate(t *testing.T) {
|
|||
testutil.WaitForLeader(t, agent.RPC, "dc1")
|
||||
|
||||
// Wait a little while for an update.
|
||||
time.Sleep(2 * conf.ConsulConfig.CoordinateUpdatePeriod)
|
||||
time.Sleep(3 * conf.ConsulConfig.CoordinateUpdatePeriod)
|
||||
|
||||
// Make sure the coordinate is present.
|
||||
req := structs.DCSpecificRequest{
|
||||
|
|
|
@ -472,8 +472,9 @@ func (s *consulSnapshot) persistNodes(sink raft.SnapshotSink,
|
|||
for node := nodes.Next(); node != nil; node = nodes.Next() {
|
||||
n := node.(*structs.Node)
|
||||
req := structs.RegisterRequest{
|
||||
Node: n.Node,
|
||||
Address: n.Address,
|
||||
Node: n.Node,
|
||||
Address: n.Address,
|
||||
TaggedAddresses: n.TaggedAddresses,
|
||||
}
|
||||
|
||||
// Register the node itself
|
||||
|
|
|
@ -360,7 +360,7 @@ func TestFSM_SnapshotRestore(t *testing.T) {
|
|||
|
||||
// Add some state
|
||||
fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
|
||||
fsm.state.EnsureNode(2, &structs.Node{Node: "baz", Address: "127.0.0.2"})
|
||||
fsm.state.EnsureNode(2, &structs.Node{Node: "baz", Address: "127.0.0.2", TaggedAddresses: map[string]string{"hello": "1.2.3.4"}})
|
||||
fsm.state.EnsureService(3, "foo", &structs.NodeService{ID: "web", Service: "web", Tags: nil, Address: "127.0.0.1", Port: 80})
|
||||
fsm.state.EnsureService(4, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000})
|
||||
fsm.state.EnsureService(5, "baz", &structs.NodeService{ID: "web", Service: "web", Tags: nil, Address: "127.0.0.2", Port: 80})
|
||||
|
@ -453,7 +453,18 @@ func TestFSM_SnapshotRestore(t *testing.T) {
|
|||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if len(nodes) != 2 {
|
||||
t.Fatalf("Bad: %v", nodes)
|
||||
t.Fatalf("bad: %v", nodes)
|
||||
}
|
||||
if nodes[0].Node != "baz" ||
|
||||
nodes[0].Address != "127.0.0.2" ||
|
||||
len(nodes[0].TaggedAddresses) != 1 ||
|
||||
nodes[0].TaggedAddresses["hello"] != "1.2.3.4" {
|
||||
t.Fatalf("bad: %v", nodes[0])
|
||||
}
|
||||
if nodes[1].Node != "foo" ||
|
||||
nodes[1].Address != "127.0.0.1" ||
|
||||
len(nodes[1].TaggedAddresses) != 0 {
|
||||
t.Fatalf("bad: %v", nodes[1])
|
||||
}
|
||||
|
||||
_, fooSrv, err := fsm2.state.NodeServices("foo")
|
||||
|
|
|
@ -474,7 +474,11 @@ func (s *StateStore) EnsureRegistration(idx uint64, req *structs.RegisterRequest
|
|||
func (s *StateStore) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, watches *DumbWatchManager,
|
||||
req *structs.RegisterRequest) error {
|
||||
// Add the node.
|
||||
node := &structs.Node{Node: req.Node, Address: req.Address}
|
||||
node := &structs.Node{
|
||||
Node: req.Node,
|
||||
Address: req.Address,
|
||||
TaggedAddresses: req.TaggedAddresses,
|
||||
}
|
||||
if err := s.ensureNodeTxn(tx, idx, watches, node); err != nil {
|
||||
return fmt.Errorf("failed inserting node: %s", err)
|
||||
}
|
||||
|
@ -1373,8 +1377,9 @@ func (s *StateStore) parseNodes(tx *memdb.Txn, idx uint64,
|
|||
|
||||
// Create the wrapped node
|
||||
dump := &structs.NodeInfo{
|
||||
Node: node.Node,
|
||||
Address: node.Address,
|
||||
Node: node.Node,
|
||||
Address: node.Address,
|
||||
TaggedAddresses: node.TaggedAddresses,
|
||||
}
|
||||
|
||||
// Query the node services
|
||||
|
|
|
@ -397,6 +397,9 @@ func TestStateStore_EnsureRegistration(t *testing.T) {
|
|||
req := &structs.RegisterRequest{
|
||||
Node: "node1",
|
||||
Address: "1.2.3.4",
|
||||
TaggedAddresses: map[string]string{
|
||||
"hello": "world",
|
||||
},
|
||||
}
|
||||
if err := s.EnsureRegistration(1, req); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
|
@ -409,6 +412,8 @@ func TestStateStore_EnsureRegistration(t *testing.T) {
|
|||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if out.Node != "node1" || out.Address != "1.2.3.4" ||
|
||||
len(out.TaggedAddresses) != 1 ||
|
||||
out.TaggedAddresses["hello"] != "world" ||
|
||||
out.CreateIndex != created || out.ModifyIndex != modified {
|
||||
t.Fatalf("bad node returned: %#v", out)
|
||||
}
|
||||
|
|
|
@ -159,12 +159,13 @@ type QueryMeta struct {
|
|||
// to register a node as providing a service. If no service
|
||||
// is provided, the node is registered.
|
||||
type RegisterRequest struct {
|
||||
Datacenter string
|
||||
Node string
|
||||
Address string
|
||||
Service *NodeService
|
||||
Check *HealthCheck
|
||||
Checks HealthChecks
|
||||
Datacenter string
|
||||
Node string
|
||||
Address string
|
||||
TaggedAddresses map[string]string
|
||||
Service *NodeService
|
||||
Check *HealthCheck
|
||||
Checks HealthChecks
|
||||
WriteRequest
|
||||
}
|
||||
|
||||
|
@ -245,8 +246,9 @@ func (r *ChecksInStateRequest) RequestDatacenter() string {
|
|||
|
||||
// Used to return information about a node
|
||||
type Node struct {
|
||||
Node string
|
||||
Address string
|
||||
Node string
|
||||
Address string
|
||||
TaggedAddresses map[string]string
|
||||
|
||||
RaftIndex
|
||||
}
|
||||
|
@ -438,10 +440,11 @@ OUTER:
|
|||
// a node. This is currently used for the UI only, as it is
|
||||
// rather expensive to generate.
|
||||
type NodeInfo struct {
|
||||
Node string
|
||||
Address string
|
||||
Services []*NodeService
|
||||
Checks []*HealthCheck
|
||||
Node string
|
||||
Address string
|
||||
TaggedAddresses map[string]string
|
||||
Services []*NodeService
|
||||
Checks []*HealthCheck
|
||||
}
|
||||
|
||||
// NodeDump is used to dump all the nodes with all their
|
||||
|
|
|
@ -207,3 +207,12 @@ By default, all DNS results served by Consul set a 0 TTL value. This disables
|
|||
caching of DNS results. However, there are many situations in which caching is
|
||||
desirable for performance and scalability. This is discussed more in the guide
|
||||
for [DNS Caching](/docs/guides/dns-cache.html).
|
||||
|
||||
## WAN Address Translation
|
||||
|
||||
Be default, Consul DNS queries will return a node's local address, even when
|
||||
being queried from a remote datacenter. If you need to use a different address
|
||||
to reach a node from outside its datacenter, you can configure this behavior
|
||||
using the [`advertise-wan`](/docs/agent/options.html#_advertise-wan) and
|
||||
[`translate_wan_addrs`](/docs/agent/options.html#translate_wan_addrs) configuration
|
||||
options.
|
||||
|
|
|
@ -48,6 +48,9 @@ body must look something like:
|
|||
"v1"
|
||||
],
|
||||
"Address": "127.0.0.1",
|
||||
"TaggedAddresses": {
|
||||
"wan": "127.0.0.1"
|
||||
},
|
||||
"Port": 8000
|
||||
},
|
||||
"Check": {
|
||||
|
@ -64,7 +67,9 @@ body must look something like:
|
|||
The behavior of the endpoint depends on what keys are provided. The endpoint
|
||||
requires `Node` and `Address` to be provided while `Datacenter` will be defaulted
|
||||
to match that of the agent. If only those are provided, the endpoint will register
|
||||
the node with the catalog.
|
||||
the node with the catalog. `TaggedAddresses` can be used in conjunction with the
|
||||
[`translate_wan_addrs`](/docs/agent/options.html#translate_wan_addrs) configuration
|
||||
option. Currently only the "wan" tag is supported.
|
||||
|
||||
If the `Service` key is provided, the service will also be registered. If
|
||||
`ID` is not provided, it will be defaulted to the value of the `Service.Service` property.
|
||||
|
@ -191,10 +196,16 @@ It returns a JSON body like this:
|
|||
{
|
||||
"Node": "baz",
|
||||
"Address": "10.1.10.11"
|
||||
"TaggedAddresses": {
|
||||
"wan": "10.1.10.11"
|
||||
}
|
||||
},
|
||||
{
|
||||
"Node": "foobar",
|
||||
"Address": "10.1.10.12"
|
||||
"Address": "10.1.10.12",
|
||||
"TaggedAddresses": {
|
||||
"wan": "10.1.10.12"
|
||||
}
|
||||
}
|
||||
]
|
||||
```
|
||||
|
@ -271,7 +282,10 @@ It returns a JSON body like this:
|
|||
{
|
||||
"Node": {
|
||||
"Node": "foobar",
|
||||
"Address": "10.1.10.12"
|
||||
"Address": "10.1.10.12",
|
||||
"TaggedAddresses": {
|
||||
"wan": "10.1.10.12"
|
||||
}
|
||||
},
|
||||
"Services": {
|
||||
"consul": {
|
||||
|
|
|
@ -127,7 +127,10 @@ It returns a JSON body like this:
|
|||
{
|
||||
"Node": {
|
||||
"Node": "foobar",
|
||||
"Address": "10.1.10.12"
|
||||
"Address": "10.1.10.12",
|
||||
"TaggedAddresses": {
|
||||
"wan": "10.1.10.12"
|
||||
}
|
||||
},
|
||||
"Service": {
|
||||
"ID": "redis",
|
||||
|
|
|
@ -41,18 +41,22 @@ The options below are all specified on the command-line.
|
|||
If this address is not routable, the node will be in a constant flapping state
|
||||
as other nodes will treat the non-routability as a failure.
|
||||
|
||||
* <a name="_advertise-wan"></a><a href="#_advertise-wan">`-advertise-wan`</a> - The advertise wan
|
||||
address is used to change the address that we advertise to server nodes joining
|
||||
through the WAN. By default, the [`-advertise`](#_advertise) address is advertised.
|
||||
However, in some cases all members of all datacenters cannot be on the same
|
||||
physical or virtual network, especially on hybrid setups mixing cloud and private datacenters.
|
||||
This flag enables server nodes gossiping through the public network for the WAN while using
|
||||
private VLANs for gossiping to each other and their client agents.
|
||||
* <a name="_advertise-wan"></a><a href="#_advertise-wan">`-advertise-wan`</a> - The
|
||||
advertise WAN address is used to change the address that we advertise to server nodes
|
||||
joining through the WAN. This can also be set on client agents when used in combination
|
||||
with the <a href="#translate_wan_addrs">`translate_wan_addrs`</a> configuration
|
||||
option. By default, the [`-advertise`](#_advertise) address is advertised. However, in some
|
||||
cases all members of all datacenters cannot be on the same physical or virtual network,
|
||||
especially on hybrid setups mixing cloud and private datacenters. This flag enables server
|
||||
nodes gossiping through the public network for the WAN while using private VLANs for gossiping
|
||||
to each other and their client agents, and it allows client agents to be reached at this
|
||||
address when being accessed from a remote datacenter if the remote datacenter is configured
|
||||
with <a href="#translate_wan_addrs">`translate_wan_addrs`</a>.
|
||||
|
||||
* <a name="_atlas"></a><a href="#_atlas">`-atlas`</a> - This flag
|
||||
enables [Atlas](https://atlas.hashicorp.com) integration.
|
||||
It is used to provide the Atlas infrastructure name and the SCADA connection. The format of
|
||||
this is `username/environment`. This enables Atlas features such as the Monitoring UI
|
||||
It is used to provide the Atlas infrastructure name and the SCADA connection. The format of
|
||||
this is `username/environment`. This enables Atlas features such as the Monitoring UI
|
||||
and node auto joining.
|
||||
|
||||
* <a name="_atlas_join"></a><a href="#_atlas_join">`-atlas-join`</a> - When set, enables auto-join
|
||||
|
@ -623,6 +627,13 @@ definitions support being updated during a reload.
|
|||
[`enable_syslog`](#enable_syslog) is provided, this controls to which
|
||||
facility messages are sent. By default, `LOCAL0` will be used.
|
||||
|
||||
* <a name="translate_wan_addrs"</a><a href="#translate_wan_addrs">`translate_wan_addrs`</a> If
|
||||
set to true, Consul will prefer a node's configured <a href="#_advertise-wan">WAN address</a>
|
||||
when servicing DNS requests for a node in a remote datacenter. This allows the node to be
|
||||
reached within its own datacenter using its local address, and reached from other datacenters
|
||||
using its WAN address, which is useful in hybrid setups with mixed networks. This is disabled
|
||||
by default.
|
||||
|
||||
* <a name="ui"></a><a href="#ui">`ui`</a> - Equivalent to the [`-ui`](#_ui)
|
||||
command-line flag.
|
||||
|
||||
|
|
Loading…
Reference in New Issue