mirror of https://github.com/hashicorp/consul
Merge pull request #2275 from hashicorp/pr-2118-slackpad
Translates node addresses to WAN addresses where appropriate.pull/1433/merge
commit
affbb27416
|
@ -2,9 +2,10 @@ package agent
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
)
|
||||
|
||||
func (s *HTTPServer) CatalogRegister(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
|
@ -72,6 +73,7 @@ func (s *HTTPServer) CatalogNodes(resp http.ResponseWriter, req *http.Request) (
|
|||
if err := s.agent.RPC("Catalog.ListNodes", &args, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
translateAddresses(s.agent.config, args.Datacenter, out.Nodes)
|
||||
|
||||
// Use empty list instead of nil
|
||||
if out.Nodes == nil {
|
||||
|
@ -124,6 +126,7 @@ func (s *HTTPServer) CatalogServiceNodes(resp http.ResponseWriter, req *http.Req
|
|||
if err := s.agent.RPC("Catalog.ServiceNodes", &args, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
translateAddresses(s.agent.config, args.Datacenter, out.ServiceNodes)
|
||||
|
||||
// Use empty list instead of nil
|
||||
if out.ServiceNodes == nil {
|
||||
|
@ -153,5 +156,9 @@ func (s *HTTPServer) CatalogNodeServices(resp http.ResponseWriter, req *http.Req
|
|||
if err := s.agent.RPC("Catalog.NodeServices", &args, &out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if out.NodeServices != nil && out.NodeServices.Node != nil {
|
||||
translateAddresses(s.agent.config, args.Datacenter, out.NodeServices.Node)
|
||||
}
|
||||
|
||||
return out.NodeServices, nil
|
||||
}
|
||||
|
|
|
@ -145,6 +145,112 @@ func TestCatalogNodes(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestCatalogNodes_WanTranslation(t *testing.T) {
|
||||
dir1, srv1 := makeHTTPServerWithConfig(t,
|
||||
func(c *Config) {
|
||||
c.Datacenter = "dc1"
|
||||
c.TranslateWanAddrs = true
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer srv1.Shutdown()
|
||||
defer srv1.agent.Shutdown()
|
||||
testutil.WaitForLeader(t, srv1.agent.RPC, "dc1")
|
||||
|
||||
dir2, srv2 := makeHTTPServerWithConfig(t,
|
||||
func(c *Config) {
|
||||
c.Datacenter = "dc2"
|
||||
c.TranslateWanAddrs = true
|
||||
})
|
||||
defer os.RemoveAll(dir2)
|
||||
defer srv2.Shutdown()
|
||||
defer srv2.agent.Shutdown()
|
||||
testutil.WaitForLeader(t, srv2.agent.RPC, "dc2")
|
||||
|
||||
// Wait for the WAN join.
|
||||
addr := fmt.Sprintf("127.0.0.1:%d",
|
||||
srv1.agent.config.Ports.SerfWan)
|
||||
if _, err := srv2.agent.JoinWAN([]string{addr}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
testutil.WaitForResult(
|
||||
func() (bool, error) {
|
||||
return len(srv1.agent.WANMembers()) > 1, nil
|
||||
},
|
||||
func(err error) {
|
||||
t.Fatalf("Failed waiting for WAN join: %v", err)
|
||||
})
|
||||
|
||||
// Register a node with DC2.
|
||||
{
|
||||
args := &structs.RegisterRequest{
|
||||
Datacenter: "dc2",
|
||||
Node: "wan_translation_test",
|
||||
Address: "127.0.0.1",
|
||||
TaggedAddresses: map[string]string{
|
||||
"wan": "127.0.0.2",
|
||||
},
|
||||
Service: &structs.NodeService{
|
||||
Service: "http_wan_translation_test",
|
||||
},
|
||||
}
|
||||
|
||||
var out struct{}
|
||||
if err := srv2.agent.RPC("Catalog.Register", args, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Query nodes in DC2 from DC1.
|
||||
req, err := http.NewRequest("GET", "/v1/catalog/nodes?dc=dc2", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp1 := httptest.NewRecorder()
|
||||
obj1, err1 := srv1.CatalogNodes(resp1, req)
|
||||
if err1 != nil {
|
||||
t.Fatalf("err: %v", err1)
|
||||
}
|
||||
assertIndex(t, resp1)
|
||||
|
||||
// Expect that DC1 gives us a WAN address (since the node is in DC2).
|
||||
nodes1 := obj1.(structs.Nodes)
|
||||
if len(nodes1) != 2 {
|
||||
t.Fatalf("bad: %v", obj1)
|
||||
}
|
||||
var address string
|
||||
for _, node := range nodes1 {
|
||||
if node.Node == "wan_translation_test" {
|
||||
address = node.Address
|
||||
}
|
||||
}
|
||||
if address != "127.0.0.2" {
|
||||
t.Fatalf("bad: %s", address)
|
||||
}
|
||||
|
||||
// Query DC2 from DC2.
|
||||
resp2 := httptest.NewRecorder()
|
||||
obj2, err2 := srv2.CatalogNodes(resp2, req)
|
||||
if err2 != nil {
|
||||
t.Fatalf("err: %v", err2)
|
||||
}
|
||||
assertIndex(t, resp2)
|
||||
|
||||
// Expect that DC2 gives us a private address (since the node is in DC2).
|
||||
nodes2 := obj2.(structs.Nodes)
|
||||
if len(nodes2) != 2 {
|
||||
t.Fatalf("bad: %v", obj2)
|
||||
}
|
||||
for _, node := range nodes2 {
|
||||
if node.Node == "wan_translation_test" {
|
||||
address = node.Address
|
||||
}
|
||||
}
|
||||
if address != "127.0.0.1" {
|
||||
t.Fatalf("bad: %s", address)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCatalogNodes_Blocking(t *testing.T) {
|
||||
dir, srv := makeHTTPServer(t)
|
||||
defer os.RemoveAll(dir)
|
||||
|
@ -407,6 +513,103 @@ func TestCatalogServiceNodes(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestCatalogServiceNodes_WanTranslation(t *testing.T) {
|
||||
dir1, srv1 := makeHTTPServerWithConfig(t,
|
||||
func(c *Config) {
|
||||
c.Datacenter = "dc1"
|
||||
c.TranslateWanAddrs = true
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer srv1.Shutdown()
|
||||
defer srv1.agent.Shutdown()
|
||||
testutil.WaitForLeader(t, srv1.agent.RPC, "dc1")
|
||||
|
||||
dir2, srv2 := makeHTTPServerWithConfig(t,
|
||||
func(c *Config) {
|
||||
c.Datacenter = "dc2"
|
||||
c.TranslateWanAddrs = true
|
||||
})
|
||||
defer os.RemoveAll(dir2)
|
||||
defer srv2.Shutdown()
|
||||
defer srv2.agent.Shutdown()
|
||||
testutil.WaitForLeader(t, srv2.agent.RPC, "dc2")
|
||||
|
||||
// Wait for the WAN join.
|
||||
addr := fmt.Sprintf("127.0.0.1:%d",
|
||||
srv1.agent.config.Ports.SerfWan)
|
||||
if _, err := srv2.agent.JoinWAN([]string{addr}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
testutil.WaitForResult(
|
||||
func() (bool, error) {
|
||||
return len(srv1.agent.WANMembers()) > 1, nil
|
||||
},
|
||||
func(err error) {
|
||||
t.Fatalf("Failed waiting for WAN join: %v", err)
|
||||
})
|
||||
|
||||
// Register a node with DC2.
|
||||
{
|
||||
args := &structs.RegisterRequest{
|
||||
Datacenter: "dc2",
|
||||
Node: "foo",
|
||||
Address: "127.0.0.1",
|
||||
TaggedAddresses: map[string]string{
|
||||
"wan": "127.0.0.2",
|
||||
},
|
||||
Service: &structs.NodeService{
|
||||
Service: "http_wan_translation_test",
|
||||
},
|
||||
}
|
||||
|
||||
var out struct{}
|
||||
if err := srv2.agent.RPC("Catalog.Register", args, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Query for the node in DC2 from DC1.
|
||||
req, err := http.NewRequest("GET", "/v1/catalog/service/http_wan_translation_test?dc=dc2", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp1 := httptest.NewRecorder()
|
||||
obj1, err1 := srv1.CatalogServiceNodes(resp1, req)
|
||||
if err1 != nil {
|
||||
t.Fatalf("err: %v", err1)
|
||||
}
|
||||
assertIndex(t, resp1)
|
||||
|
||||
// Expect that DC1 gives us a WAN address (since the node is in DC2).
|
||||
nodes1 := obj1.(structs.ServiceNodes)
|
||||
if len(nodes1) != 1 {
|
||||
t.Fatalf("bad: %v", obj1)
|
||||
}
|
||||
node1 := nodes1[0]
|
||||
if node1.Address != "127.0.0.2" {
|
||||
t.Fatalf("bad: %v", node1)
|
||||
}
|
||||
|
||||
// Query DC2 from DC2.
|
||||
resp2 := httptest.NewRecorder()
|
||||
obj2, err2 := srv2.CatalogServiceNodes(resp2, req)
|
||||
if err2 != nil {
|
||||
t.Fatalf("err: %v", err2)
|
||||
}
|
||||
assertIndex(t, resp2)
|
||||
|
||||
// Expect that DC2 gives us a local address (since the node is in DC2).
|
||||
nodes2 := obj2.(structs.ServiceNodes)
|
||||
if len(nodes2) != 1 {
|
||||
t.Fatalf("bad: %v", obj2)
|
||||
}
|
||||
node2 := nodes2[0]
|
||||
if node2.Address != "127.0.0.1" {
|
||||
t.Fatalf("bad: %v", node2)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCatalogServiceNodes_DistanceSort(t *testing.T) {
|
||||
dir, srv := makeHTTPServer(t)
|
||||
defer os.RemoveAll(dir)
|
||||
|
@ -550,3 +753,99 @@ func TestCatalogNodeServices(t *testing.T) {
|
|||
t.Fatalf("bad: %v", obj)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCatalogNodeServices_WanTranslation(t *testing.T) {
|
||||
dir1, srv1 := makeHTTPServerWithConfig(t,
|
||||
func(c *Config) {
|
||||
c.Datacenter = "dc1"
|
||||
c.TranslateWanAddrs = true
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer srv1.Shutdown()
|
||||
defer srv1.agent.Shutdown()
|
||||
testutil.WaitForLeader(t, srv1.agent.RPC, "dc1")
|
||||
|
||||
dir2, srv2 := makeHTTPServerWithConfig(t,
|
||||
func(c *Config) {
|
||||
c.Datacenter = "dc2"
|
||||
c.TranslateWanAddrs = true
|
||||
})
|
||||
defer os.RemoveAll(dir2)
|
||||
defer srv2.Shutdown()
|
||||
defer srv2.agent.Shutdown()
|
||||
testutil.WaitForLeader(t, srv2.agent.RPC, "dc2")
|
||||
|
||||
// Wait for the WAN join.
|
||||
addr := fmt.Sprintf("127.0.0.1:%d",
|
||||
srv1.agent.config.Ports.SerfWan)
|
||||
if _, err := srv2.agent.JoinWAN([]string{addr}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
testutil.WaitForResult(
|
||||
func() (bool, error) {
|
||||
return len(srv1.agent.WANMembers()) > 1, nil
|
||||
},
|
||||
func(err error) {
|
||||
t.Fatalf("Failed waiting for WAN join: %v", err)
|
||||
})
|
||||
|
||||
// Register a node with DC2.
|
||||
{
|
||||
args := &structs.RegisterRequest{
|
||||
Datacenter: "dc2",
|
||||
Node: "foo",
|
||||
Address: "127.0.0.1",
|
||||
TaggedAddresses: map[string]string{
|
||||
"wan": "127.0.0.2",
|
||||
},
|
||||
Service: &structs.NodeService{
|
||||
Service: "http_wan_translation_test",
|
||||
},
|
||||
}
|
||||
|
||||
var out struct{}
|
||||
if err := srv2.agent.RPC("Catalog.Register", args, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Query for the node in DC2 from DC1.
|
||||
req, err := http.NewRequest("GET", "/v1/catalog/node/foo?dc=dc2", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
resp1 := httptest.NewRecorder()
|
||||
obj1, err1 := srv1.CatalogNodeServices(resp1, req)
|
||||
if err1 != nil {
|
||||
t.Fatalf("err: %v", err1)
|
||||
}
|
||||
assertIndex(t, resp1)
|
||||
|
||||
// Expect that DC1 gives us a WAN address (since the node is in DC2).
|
||||
services1 := obj1.(*structs.NodeServices)
|
||||
if len(services1.Services) != 1 {
|
||||
t.Fatalf("bad: %v", obj1)
|
||||
}
|
||||
service1 := services1.Node
|
||||
if service1.Address != "127.0.0.2" {
|
||||
t.Fatalf("bad: %v", service1)
|
||||
}
|
||||
|
||||
// Query DC2 from DC2.
|
||||
resp2 := httptest.NewRecorder()
|
||||
obj2, err2 := srv2.CatalogNodeServices(resp2, req)
|
||||
if err2 != nil {
|
||||
t.Fatalf("err: %v", err2)
|
||||
}
|
||||
assertIndex(t, resp2)
|
||||
|
||||
// Expect that DC2 gives us a private address (since the node is in DC2).
|
||||
services2 := obj2.(*structs.NodeServices)
|
||||
if len(services2.Services) != 1 {
|
||||
t.Fatalf("bad: %v", obj2)
|
||||
}
|
||||
service2 := services2.Node
|
||||
if service2.Address != "127.0.0.1" {
|
||||
t.Fatalf("bad: %v", service2)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -370,19 +370,6 @@ INVALID:
|
|||
resp.SetRcode(req, dns.RcodeNameError)
|
||||
}
|
||||
|
||||
// translateAddr is used to provide the final, translated address for a node,
|
||||
// depending on how this agent and the other node are configured.
|
||||
func (d *DNSServer) translateAddr(dc string, node *structs.Node) string {
|
||||
addr := node.Address
|
||||
if d.agent.config.TranslateWanAddrs && (d.agent.config.Datacenter != dc) {
|
||||
wanAddr := node.TaggedAddresses["wan"]
|
||||
if wanAddr != "" {
|
||||
addr = wanAddr
|
||||
}
|
||||
}
|
||||
return addr
|
||||
}
|
||||
|
||||
// nodeLookup is used to handle a node query
|
||||
func (d *DNSServer) nodeLookup(network, datacenter, node string, req, resp *dns.Msg) {
|
||||
// Only handle ANY, A and AAAA type requests
|
||||
|
@ -423,7 +410,8 @@ RPC:
|
|||
}
|
||||
|
||||
// Add the node record
|
||||
addr := d.translateAddr(datacenter, out.NodeServices.Node)
|
||||
n := out.NodeServices.Node
|
||||
addr := translateAddress(d.agent.config, datacenter, n.Address, n.TaggedAddresses)
|
||||
records := d.formatNodeRecord(out.NodeServices.Node, addr,
|
||||
req.Question[0].Name, qType, d.config.NodeTTL)
|
||||
if records != nil {
|
||||
|
@ -776,7 +764,7 @@ func (d *DNSServer) serviceNodeRecords(dc string, nodes structs.CheckServiceNode
|
|||
for _, node := range nodes {
|
||||
// Start with the translated address but use the service address,
|
||||
// if specified.
|
||||
addr := d.translateAddr(dc, node.Node)
|
||||
addr := translateAddress(d.agent.config, dc, node.Node.Address, node.Node.TaggedAddresses)
|
||||
if node.Service.Address != "" {
|
||||
addr = node.Service.Address
|
||||
}
|
||||
|
@ -825,7 +813,7 @@ func (d *DNSServer) serviceSRVRecords(dc string, nodes structs.CheckServiceNodes
|
|||
|
||||
// Start with the translated address but use the service address,
|
||||
// if specified.
|
||||
addr := d.translateAddr(dc, node.Node)
|
||||
addr := translateAddress(d.agent.config, dc, node.Node.Address, node.Node.TaggedAddresses)
|
||||
if node.Service.Address != "" {
|
||||
addr = node.Service.Address
|
||||
}
|
||||
|
|
|
@ -131,6 +131,9 @@ func (s *HTTPServer) HealthServiceNodes(resp http.ResponseWriter, req *http.Requ
|
|||
out.Nodes = filterNonPassing(out.Nodes)
|
||||
}
|
||||
|
||||
// Translate addresses after filtering so we don't waste effort.
|
||||
translateAddresses(s.agent.config, args.Datacenter, out.Nodes)
|
||||
|
||||
// Use empty list instead of nil
|
||||
for i, _ := range out.Nodes {
|
||||
// TODO (slackpad) It's lame that this isn't a slice of pointers
|
||||
|
@ -143,6 +146,7 @@ func (s *HTTPServer) HealthServiceNodes(resp http.ResponseWriter, req *http.Requ
|
|||
if out.Nodes == nil {
|
||||
out.Nodes = make(structs.CheckServiceNodes, 0)
|
||||
}
|
||||
|
||||
return out.Nodes, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -554,6 +554,102 @@ func TestHealthServiceNodes_PassingFilter(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestHealthServiceNodes_WanTranslation(t *testing.T) {
|
||||
dir1, srv1 := makeHTTPServerWithConfig(t,
|
||||
func(c *Config) {
|
||||
c.Datacenter = "dc1"
|
||||
c.TranslateWanAddrs = true
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer srv1.Shutdown()
|
||||
defer srv1.agent.Shutdown()
|
||||
testutil.WaitForLeader(t, srv1.agent.RPC, "dc1")
|
||||
|
||||
dir2, srv2 := makeHTTPServerWithConfig(t,
|
||||
func(c *Config) {
|
||||
c.Datacenter = "dc2"
|
||||
c.TranslateWanAddrs = true
|
||||
})
|
||||
defer os.RemoveAll(dir2)
|
||||
defer srv2.Shutdown()
|
||||
defer srv2.agent.Shutdown()
|
||||
testutil.WaitForLeader(t, srv2.agent.RPC, "dc2")
|
||||
|
||||
// Wait for the WAN join.
|
||||
addr := fmt.Sprintf("127.0.0.1:%d",
|
||||
srv1.agent.config.Ports.SerfWan)
|
||||
if _, err := srv2.agent.JoinWAN([]string{addr}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
testutil.WaitForResult(
|
||||
func() (bool, error) {
|
||||
return len(srv1.agent.WANMembers()) > 1, nil
|
||||
},
|
||||
func(err error) {
|
||||
t.Fatalf("Failed waiting for WAN join: %v", err)
|
||||
})
|
||||
|
||||
// Register a node with DC2.
|
||||
{
|
||||
args := &structs.RegisterRequest{
|
||||
Datacenter: "dc2",
|
||||
Node: "foo",
|
||||
Address: "127.0.0.1",
|
||||
TaggedAddresses: map[string]string{
|
||||
"wan": "127.0.0.2",
|
||||
},
|
||||
Service: &structs.NodeService{
|
||||
Service: "http_wan_translation_test",
|
||||
},
|
||||
}
|
||||
|
||||
var out struct{}
|
||||
if err := srv2.agent.RPC("Catalog.Register", args, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Query for a service in DC2 from DC1.
|
||||
req, err := http.NewRequest("GET", "/v1/health/service/http_wan_translation_test?dc=dc2", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
resp1 := httptest.NewRecorder()
|
||||
obj1, err1 := srv1.HealthServiceNodes(resp1, req)
|
||||
if err1 != nil {
|
||||
t.Fatalf("err: %v", err1)
|
||||
}
|
||||
assertIndex(t, resp1)
|
||||
|
||||
// Expect that DC1 gives us a WAN address (since the node is in DC2).
|
||||
nodes1 := obj1.(structs.CheckServiceNodes)
|
||||
if len(nodes1) != 1 {
|
||||
t.Fatalf("bad: %v", obj1)
|
||||
}
|
||||
node1 := nodes1[0].Node
|
||||
if node1.Address != "127.0.0.2" {
|
||||
t.Fatalf("bad: %v", node1)
|
||||
}
|
||||
|
||||
// Query DC2 from DC2.
|
||||
resp2 := httptest.NewRecorder()
|
||||
obj2, err2 := srv2.HealthServiceNodes(resp2, req)
|
||||
if err2 != nil {
|
||||
t.Fatalf("err: %v", err2)
|
||||
}
|
||||
assertIndex(t, resp2)
|
||||
|
||||
// Expect that DC2 gives us a private address (since the node is in DC2).
|
||||
nodes2 := obj2.(structs.CheckServiceNodes)
|
||||
if len(nodes2) != 1 {
|
||||
t.Fatalf("bad: %v", obj2)
|
||||
}
|
||||
node2 := nodes2[0].Node
|
||||
if node2.Address != "127.0.0.1" {
|
||||
t.Fatalf("bad: %v", node2)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFilterNonPassing(t *testing.T) {
|
||||
nodes := structs.CheckServiceNodes{
|
||||
structs.CheckServiceNode{
|
||||
|
|
|
@ -122,6 +122,12 @@ func (s *HTTPServer) preparedQueryExecute(id string, resp http.ResponseWriter, r
|
|||
return nil, err
|
||||
}
|
||||
|
||||
// Note that we translate using the DC that the results came from, since
|
||||
// a query can fail over to a different DC than where the execute request
|
||||
// was sent to. That's why we use the reply's DC and not the one from
|
||||
// the args.
|
||||
translateAddresses(s.agent.config, reply.Datacenter, reply.Nodes)
|
||||
|
||||
// Use empty list instead of nil.
|
||||
if reply.Nodes == nil {
|
||||
reply.Nodes = make(structs.CheckServiceNodes, 0)
|
||||
|
|
|
@ -359,6 +359,108 @@ func TestPreparedQuery_Execute(t *testing.T) {
|
|||
}
|
||||
})
|
||||
|
||||
// Ensure WAN translation occurs for a response outside of the local DC.
|
||||
httpTestWithConfig(t, func(srv *HTTPServer) {
|
||||
m := MockPreparedQuery{}
|
||||
if err := srv.agent.InjectEndpoint("PreparedQuery", &m); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
m.executeFn = func(args *structs.PreparedQueryExecuteRequest, reply *structs.PreparedQueryExecuteResponse) error {
|
||||
nodesResponse := make(structs.CheckServiceNodes, 1)
|
||||
nodesResponse[0].Node = &structs.Node{
|
||||
Node: "foo", Address: "127.0.0.1",
|
||||
TaggedAddresses: map[string]string{
|
||||
"wan": "127.0.0.2",
|
||||
},
|
||||
}
|
||||
reply.Nodes = nodesResponse
|
||||
reply.Datacenter = "dc2"
|
||||
return nil
|
||||
}
|
||||
|
||||
body := bytes.NewBuffer(nil)
|
||||
req, err := http.NewRequest("GET", "/v1/query/my-id/execute?dc=dc2", body)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := srv.PreparedQuerySpecific(resp, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if resp.Code != 200 {
|
||||
t.Fatalf("bad code: %d", resp.Code)
|
||||
}
|
||||
r, ok := obj.(structs.PreparedQueryExecuteResponse)
|
||||
if !ok {
|
||||
t.Fatalf("unexpected: %T", obj)
|
||||
}
|
||||
if r.Nodes == nil || len(r.Nodes) != 1 {
|
||||
t.Fatalf("bad: %v", r)
|
||||
}
|
||||
|
||||
node := r.Nodes[0]
|
||||
if node.Node.Address != "127.0.0.2" {
|
||||
t.Fatalf("bad: %v", node.Node)
|
||||
}
|
||||
}, func(c *Config) {
|
||||
c.Datacenter = "dc1"
|
||||
c.TranslateWanAddrs = true
|
||||
})
|
||||
|
||||
// Ensure WAN translation doesn't occur for the local DC.
|
||||
httpTestWithConfig(t, func(srv *HTTPServer) {
|
||||
m := MockPreparedQuery{}
|
||||
if err := srv.agent.InjectEndpoint("PreparedQuery", &m); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
m.executeFn = func(args *structs.PreparedQueryExecuteRequest, reply *structs.PreparedQueryExecuteResponse) error {
|
||||
nodesResponse := make(structs.CheckServiceNodes, 1)
|
||||
nodesResponse[0].Node = &structs.Node{
|
||||
Node: "foo", Address: "127.0.0.1",
|
||||
TaggedAddresses: map[string]string{
|
||||
"wan": "127.0.0.2",
|
||||
},
|
||||
}
|
||||
reply.Nodes = nodesResponse
|
||||
reply.Datacenter = "dc1"
|
||||
return nil
|
||||
}
|
||||
|
||||
body := bytes.NewBuffer(nil)
|
||||
req, err := http.NewRequest("GET", "/v1/query/my-id/execute?dc=dc2", body)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := srv.PreparedQuerySpecific(resp, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if resp.Code != 200 {
|
||||
t.Fatalf("bad code: %d", resp.Code)
|
||||
}
|
||||
r, ok := obj.(structs.PreparedQueryExecuteResponse)
|
||||
if !ok {
|
||||
t.Fatalf("unexpected: %T", obj)
|
||||
}
|
||||
if r.Nodes == nil || len(r.Nodes) != 1 {
|
||||
t.Fatalf("bad: %v", r)
|
||||
}
|
||||
|
||||
node := r.Nodes[0]
|
||||
if node.Node.Address != "127.0.0.1" {
|
||||
t.Fatalf("bad: %v", node.Node)
|
||||
}
|
||||
}, func(c *Config) {
|
||||
c.Datacenter = "dc1"
|
||||
c.TranslateWanAddrs = true
|
||||
})
|
||||
|
||||
httpTest(t, func(srv *HTTPServer) {
|
||||
body := bytes.NewBuffer(nil)
|
||||
req, err := http.NewRequest("GET", "/v1/query/not-there/execute", body)
|
||||
|
|
|
@ -0,0 +1,67 @@
|
|||
package agent
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
)
|
||||
|
||||
// translateAddress is used to provide the final, translated address for a node,
|
||||
// depending on how the agent and the other node are configured. The dc
|
||||
// parameter is the dc the datacenter this node is from.
|
||||
func translateAddress(config *Config, dc string, addr string, taggedAddresses map[string]string) string {
|
||||
if config.TranslateWanAddrs && (config.Datacenter != dc) {
|
||||
wanAddr := taggedAddresses["wan"]
|
||||
if wanAddr != "" {
|
||||
addr = wanAddr
|
||||
}
|
||||
}
|
||||
return addr
|
||||
}
|
||||
|
||||
// translateAddresses translates addresses in the given structure into the
|
||||
// final, translated address, depending on how the agent and the other node are
|
||||
// configured. The dc parameter is the datacenter this structure is from.
|
||||
func translateAddresses(config *Config, dc string, subj interface{}) {
|
||||
// CAUTION - SUBTLE! An agent running on a server can, in some cases,
|
||||
// return pointers directly into the immutable state store for
|
||||
// performance (it's via the in-memory RPC mechanism). It's never safe
|
||||
// to modify those values, so we short circuit here so that we never
|
||||
// update any structures that are from our own datacenter. This works
|
||||
// for address translation because we *never* need to translate local
|
||||
// addresses, but this is super subtle, so we've piped all the in-place
|
||||
// address translation into this function which makes sure this check is
|
||||
// done. This also happens to skip looking at any of the incoming
|
||||
// structure for the common case of not needing to translate, so it will
|
||||
// skip a lot of work if no translation needs to be done.
|
||||
if !config.TranslateWanAddrs || (config.Datacenter == dc) {
|
||||
return
|
||||
}
|
||||
|
||||
// Translate addresses in-place, subject to the condition checked above
|
||||
// which ensures this is safe to do since we are operating on a local
|
||||
// copy of the data.
|
||||
switch v := subj.(type) {
|
||||
case structs.CheckServiceNodes:
|
||||
for _, entry := range v {
|
||||
entry.Node.Address = translateAddress(config, dc,
|
||||
entry.Node.Address, entry.Node.TaggedAddresses)
|
||||
}
|
||||
case *structs.Node:
|
||||
v.Address = translateAddress(config, dc,
|
||||
v.Address, v.TaggedAddresses)
|
||||
case structs.Nodes:
|
||||
for _, node := range v {
|
||||
node.Address = translateAddress(config, dc,
|
||||
node.Address, node.TaggedAddresses)
|
||||
}
|
||||
case structs.ServiceNodes:
|
||||
for _, entry := range v {
|
||||
entry.Address = translateAddress(config, dc,
|
||||
entry.Address, entry.TaggedAddresses)
|
||||
}
|
||||
default:
|
||||
panic(fmt.Errorf("Unhandled type passed to address translator: %#v", subj))
|
||||
|
||||
}
|
||||
}
|
|
@ -678,9 +678,11 @@ func (s *StateStore) ensureServiceTxn(tx *memdb.Txn, idx uint64, watches *DumbWa
|
|||
return fmt.Errorf("failed service lookup: %s", err)
|
||||
}
|
||||
|
||||
// Create the service node entry and populate the indexes. We leave the
|
||||
// address blank and fill that in on the way out during queries.
|
||||
entry := svc.ToServiceNode(node, "")
|
||||
// Create the service node entry and populate the indexes. Note that
|
||||
// conversion doesn't populate any of the node-specific information
|
||||
// (Address and TaggedAddresses). That's always populated when we read
|
||||
// from the state store.
|
||||
entry := svc.ToServiceNode(node)
|
||||
if existing != nil {
|
||||
entry.CreateIndex = existing.(*structs.ServiceNode).CreateIndex
|
||||
entry.ModifyIndex = idx
|
||||
|
@ -830,16 +832,23 @@ func (s *StateStore) parseServiceNodes(tx *memdb.Txn, services structs.ServiceNo
|
|||
var results structs.ServiceNodes
|
||||
for _, sn := range services {
|
||||
// Note that we have to clone here because we don't want to
|
||||
// modify the address field on the object in the database,
|
||||
// modify the node-related fields on the object in the database,
|
||||
// which is what we are referencing.
|
||||
s := sn.Clone()
|
||||
s := sn.PartialClone()
|
||||
|
||||
// Fill in the address of the node.
|
||||
// Grab the corresponding node record.
|
||||
n, err := tx.First("nodes", "id", sn.Node)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed node lookup: %s", err)
|
||||
}
|
||||
s.Address = n.(*structs.Node).Address
|
||||
|
||||
// Populate the node-related fields. The tagged addresses may be
|
||||
// used by agents to perform address translation if they are
|
||||
// configured to do that.
|
||||
node := n.(*structs.Node)
|
||||
s.Address = node.Address
|
||||
s.TaggedAddresses = node.TaggedAddresses
|
||||
|
||||
results = append(results, s)
|
||||
}
|
||||
return results, nil
|
||||
|
|
|
@ -260,10 +260,15 @@ type Nodes []*Node
|
|||
// Maps service name to available tags
|
||||
type Services map[string][]string
|
||||
|
||||
// ServiceNode represents a node that is part of a service
|
||||
// ServiceNode represents a node that is part of a service. Address and
|
||||
// TaggedAddresses are node-related fields that are always empty in the state
|
||||
// store and are filled in on the way out by parseServiceNodes(). This is also
|
||||
// why PartialClone() skips them, because we know they are blank already so it
|
||||
// would be a waste of time to copy them.
|
||||
type ServiceNode struct {
|
||||
Node string
|
||||
Address string
|
||||
TaggedAddresses map[string]string
|
||||
ServiceID string
|
||||
ServiceName string
|
||||
ServiceTags []string
|
||||
|
@ -274,14 +279,16 @@ type ServiceNode struct {
|
|||
RaftIndex
|
||||
}
|
||||
|
||||
// Clone returns a clone of the given service node.
|
||||
func (s *ServiceNode) Clone() *ServiceNode {
|
||||
// PartialClone() returns a clone of the given service node, minus the node-
|
||||
// related fields that get filled in later, Address and TaggedAddresses.
|
||||
func (s *ServiceNode) PartialClone() *ServiceNode {
|
||||
tags := make([]string, len(s.ServiceTags))
|
||||
copy(tags, s.ServiceTags)
|
||||
|
||||
return &ServiceNode{
|
||||
Node: s.Node,
|
||||
Address: s.Address,
|
||||
Node: s.Node,
|
||||
// Skip Address, see above.
|
||||
// Skip TaggedAddresses, see above.
|
||||
ServiceID: s.ServiceID,
|
||||
ServiceName: s.ServiceName,
|
||||
ServiceTags: tags,
|
||||
|
@ -343,10 +350,11 @@ func (s *NodeService) IsSame(other *NodeService) bool {
|
|||
}
|
||||
|
||||
// ToServiceNode converts the given node service to a service node.
|
||||
func (s *NodeService) ToServiceNode(node, address string) *ServiceNode {
|
||||
func (s *NodeService) ToServiceNode(node string) *ServiceNode {
|
||||
return &ServiceNode{
|
||||
Node: node,
|
||||
Address: address,
|
||||
Node: node,
|
||||
// Skip Address, see ServiceNode definition.
|
||||
// Skip TaggedAddresses, see ServiceNode definition.
|
||||
ServiceID: s.ID,
|
||||
ServiceName: s.Service,
|
||||
ServiceTags: s.Tags,
|
||||
|
|
|
@ -108,8 +108,11 @@ func TestStructs_ACL_IsSame(t *testing.T) {
|
|||
// testServiceNode gives a fully filled out ServiceNode instance.
|
||||
func testServiceNode() *ServiceNode {
|
||||
return &ServiceNode{
|
||||
Node: "node1",
|
||||
Address: "127.0.0.1",
|
||||
Node: "node1",
|
||||
Address: "127.0.0.1",
|
||||
TaggedAddresses: map[string]string{
|
||||
"hello": "world",
|
||||
},
|
||||
ServiceID: "service1",
|
||||
ServiceName: "dogs",
|
||||
ServiceTags: []string{"prod", "v1"},
|
||||
|
@ -123,10 +126,20 @@ func testServiceNode() *ServiceNode {
|
|||
}
|
||||
}
|
||||
|
||||
func TestStructs_ServiceNode_Clone(t *testing.T) {
|
||||
func TestStructs_ServiceNode_PartialClone(t *testing.T) {
|
||||
sn := testServiceNode()
|
||||
|
||||
clone := sn.Clone()
|
||||
clone := sn.PartialClone()
|
||||
|
||||
// Make sure the parts that weren't supposed to be cloned didn't get
|
||||
// copied over, then zero-value them out so we can do a DeepEqual() on
|
||||
// the rest of the contents.
|
||||
if clone.Address != "" || len(clone.TaggedAddresses) != 0 {
|
||||
t.Fatalf("bad: %v", clone)
|
||||
}
|
||||
|
||||
sn.Address = ""
|
||||
sn.TaggedAddresses = nil
|
||||
if !reflect.DeepEqual(sn, clone) {
|
||||
t.Fatalf("bad: %v", clone)
|
||||
}
|
||||
|
@ -140,7 +153,12 @@ func TestStructs_ServiceNode_Clone(t *testing.T) {
|
|||
func TestStructs_ServiceNode_Conversions(t *testing.T) {
|
||||
sn := testServiceNode()
|
||||
|
||||
sn2 := sn.ToNodeService().ToServiceNode("node1", "127.0.0.1")
|
||||
sn2 := sn.ToNodeService().ToServiceNode("node1")
|
||||
|
||||
// These two fields get lost in the conversion, so we have to zero-value
|
||||
// them out before we do the compare.
|
||||
sn.Address = ""
|
||||
sn.TaggedAddresses = nil
|
||||
if !reflect.DeepEqual(sn, sn2) {
|
||||
t.Fatalf("bad: %v", sn2)
|
||||
}
|
||||
|
|
|
@ -750,10 +750,20 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass
|
|||
|
||||
* <a name="translate_wan_addrs"</a><a href="#translate_wan_addrs">`translate_wan_addrs`</a> If
|
||||
set to true, Consul will prefer a node's configured <a href="#_advertise-wan">WAN address</a>
|
||||
when servicing DNS requests for a node in a remote datacenter. This allows the node to be
|
||||
reached within its own datacenter using its local address, and reached from other datacenters
|
||||
using its WAN address, which is useful in hybrid setups with mixed networks. This is disabled
|
||||
by default.
|
||||
when servicing DNS and HTTP requests for a node in a remote datacenter. This allows the node to
|
||||
be reached within its own datacenter using its local address, and reached from other datacenters
|
||||
using its WAN address, which is useful in hybrid setups with mixed networks. This is disabled by
|
||||
default.
|
||||
<br>
|
||||
<br>
|
||||
In addition to addresses in DNS responses, Consul will also translate node addresses in responses
|
||||
to the following HTTP endpoints when querying a remote datacenter:
|
||||
<br>
|
||||
* [`/v1/catalog/nodes`](/docs/agent/http/catalog.html#catalog_nodes)
|
||||
* [`/v1/catalog/node/<node>`](/docs/agent/http/catalog.html#catalog_node)
|
||||
* [`/v1/catalog/service/<service>`](/docs/agent/http/catalog.html#catalog_service)
|
||||
* [`/v1/health/service/<service>`](/docs/agent/http/health.html#health_service)
|
||||
* [`/v1/query/<query or name>/execute`](/docs/agent/http/query.html#execute)
|
||||
|
||||
* <a name="ui"></a><a href="#ui">`ui`</a> - Equivalent to the [`-ui`](#_ui)
|
||||
command-line flag.
|
||||
|
@ -764,20 +774,23 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass
|
|||
* <a name="unix_sockets"></a><a href="#unix_sockets">`unix_sockets`</a> - This
|
||||
allows tuning the ownership and permissions of the
|
||||
Unix domain socket files created by Consul. Domain sockets are only used if
|
||||
the HTTP or RPC addresses are configured with the `unix://` prefix. The
|
||||
following options are valid within this construct and apply globally to all
|
||||
sockets created by Consul:
|
||||
the HTTP or RPC addresses are configured with the `unix://` prefix.
|
||||
<br>
|
||||
* `user` - The name or ID of the user who will own the socket file.
|
||||
* `group` - The group ID ownership of the socket file. Note that this option
|
||||
currently only supports numeric IDs.
|
||||
* `mode` - The permission bits to set on the file.
|
||||
<br>
|
||||
It is important to note that this option may have different effects on
|
||||
different operating systems. Linux generally observes socket file permissions
|
||||
while many BSD variants ignore permissions on the socket file itself. It is
|
||||
important to test this feature on your specific distribution. This feature is
|
||||
currently not functional on Windows hosts.
|
||||
<br>
|
||||
<br>
|
||||
The following options are valid within this construct and apply globally to all
|
||||
sockets created by Consul:
|
||||
<br>
|
||||
* `user` - The name or ID of the user who will own the socket file.
|
||||
* `group` - The group ID ownership of the socket file. Note that this option
|
||||
currently only supports numeric IDs.
|
||||
* `mode` - The permission bits to set on the file.
|
||||
|
||||
* <a name="verify_incoming"></a><a href="#verify_incoming">`verify_incoming`</a> - If
|
||||
set to true, Consul requires that all incoming
|
||||
|
|
|
@ -38,6 +38,13 @@ making them unable to properly serve requests for prepared queries using the
|
|||
feature. It is recommended that all agents be running version 0.7.0 prior to
|
||||
using this feature.
|
||||
|
||||
#### WAN Address Translation in HTTP Endpoints
|
||||
|
||||
Consul version 0.7 added support for translating WAN addresses in certain
|
||||
[HTTP endpoints](/docs/agent/options.html#translate_wan_addrs). The servers
|
||||
and the agents need to be running version 0.7.0 or later in order to use this
|
||||
feature.
|
||||
|
||||
## Consul 0.6.4
|
||||
|
||||
Consul 0.6.4 made some substantial changes to how ACLs work with prepared
|
||||
|
|
Loading…
Reference in New Issue