diff --git a/command/agent/dns.go b/command/agent/dns.go
index 2a8d8dd5c3..9e2357dea4 100644
--- a/command/agent/dns.go
+++ b/command/agent/dns.go
@@ -598,6 +598,15 @@ func (d *DNSServer) preparedQueryLookup(network, datacenter, query string, req,
Token: d.agent.config.ACLToken,
AllowStale: d.config.AllowStale,
},
+
+ // Always pass the local agent through. In the DNS interface, there
+ // is no provision for passing additional query parameters, so we
+ // send the local agent's data through to allow distance sorting
+ // relative to ourself on the server side.
+ Agent: structs.QuerySource{
+ Datacenter: d.agent.config.Datacenter,
+ Node: d.agent.config.NodeName,
+ },
}
// TODO (slackpad) - What's a safe limit we can set here? It seems like
diff --git a/command/agent/dns_test.go b/command/agent/dns_test.go
index 863a0bfefb..1182d9172e 100644
--- a/command/agent/dns_test.go
+++ b/command/agent/dns_test.go
@@ -3166,3 +3166,37 @@ func TestDNS_InvalidQueries(t *testing.T) {
}
}
}
+
+func TestDNS_PreparedQuery_AgentSource(t *testing.T) {
+ dir, srv := makeDNSServer(t)
+ defer os.RemoveAll(dir)
+ defer srv.agent.Shutdown()
+
+ testutil.WaitForLeader(t, srv.agent.RPC, "dc1")
+
+ m := MockPreparedQuery{}
+ if err := srv.agent.InjectEndpoint("PreparedQuery", &m); err != nil {
+ t.Fatalf("err: %v", err)
+ }
+
+ m.executeFn = func(args *structs.PreparedQueryExecuteRequest, reply *structs.PreparedQueryExecuteResponse) error {
+ // Check that the agent inserted its self-name and datacenter to
+ // the RPC request body.
+ if args.Agent.Datacenter != srv.agent.config.Datacenter ||
+ args.Agent.Node != srv.agent.config.NodeName {
+ t.Fatalf("bad: %#v", args.Agent)
+ }
+ return nil
+ }
+
+ {
+ m := new(dns.Msg)
+ m.SetQuestion("foo.query.consul.", dns.TypeSRV)
+
+ c := new(dns.Client)
+ addr, _ := srv.agent.config.ClientListener("", srv.agent.config.Ports.DNS)
+ if _, _, err := c.Exchange(m, addr.String()); err != nil {
+ t.Fatalf("err: %v", err)
+ }
+ }
+}
diff --git a/command/agent/prepared_query_endpoint.go b/command/agent/prepared_query_endpoint.go
index bf643f7c26..1a6ff6d72e 100644
--- a/command/agent/prepared_query_endpoint.go
+++ b/command/agent/prepared_query_endpoint.go
@@ -96,6 +96,10 @@ func parseLimit(req *http.Request, limit *int) error {
func (s *HTTPServer) preparedQueryExecute(id string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
args := structs.PreparedQueryExecuteRequest{
QueryIDOrName: id,
+ Agent: structs.QuerySource{
+ Node: s.agent.config.NodeName,
+ Datacenter: s.agent.config.Datacenter,
+ },
}
s.parseSource(req, &args.Source)
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
@@ -131,6 +135,10 @@ func (s *HTTPServer) preparedQueryExecute(id string, resp http.ResponseWriter, r
func (s *HTTPServer) preparedQueryExplain(id string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
args := structs.PreparedQueryExecuteRequest{
QueryIDOrName: id,
+ Agent: structs.QuerySource{
+ Node: s.agent.config.NodeName,
+ Datacenter: s.agent.config.Datacenter,
+ },
}
s.parseSource(req, &args.Source)
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
diff --git a/command/agent/prepared_query_endpoint_test.go b/command/agent/prepared_query_endpoint_test.go
index 8997de05f3..ff757e0acf 100644
--- a/command/agent/prepared_query_endpoint_test.go
+++ b/command/agent/prepared_query_endpoint_test.go
@@ -286,6 +286,10 @@ func TestPreparedQuery_Execute(t *testing.T) {
Datacenter: "dc1",
Node: "my-node",
},
+ Agent: structs.QuerySource{
+ Datacenter: srv.agent.config.Datacenter,
+ Node: srv.agent.config.NodeName,
+ },
QueryOptions: structs.QueryOptions{
Token: "my-token",
RequireConsistent: true,
@@ -323,6 +327,38 @@ func TestPreparedQuery_Execute(t *testing.T) {
}
})
+ // Ensure the proper params are set when no special args are passed
+ httpTest(t, func(srv *HTTPServer) {
+ m := MockPreparedQuery{}
+ if err := srv.agent.InjectEndpoint("PreparedQuery", &m); err != nil {
+ t.Fatalf("err: %v", err)
+ }
+
+ m.executeFn = func(args *structs.PreparedQueryExecuteRequest, reply *structs.PreparedQueryExecuteResponse) error {
+ if args.Source.Node != "" {
+ t.Fatalf("expect node to be empty, got %q", args.Source.Node)
+ }
+ expect := structs.QuerySource{
+ Datacenter: srv.agent.config.Datacenter,
+ Node: srv.agent.config.NodeName,
+ }
+ if !reflect.DeepEqual(args.Agent, expect) {
+ t.Fatalf("expect: %#v\nactual: %#v", expect, args.Agent)
+ }
+ return nil
+ }
+
+ req, err := http.NewRequest("GET", "/v1/query/my-id/execute", nil)
+ if err != nil {
+ t.Fatalf("err: %v", err)
+ }
+
+ resp := httptest.NewRecorder()
+ if _, err := srv.PreparedQuerySpecific(resp, req); err != nil {
+ t.Fatalf("err: %v", err)
+ }
+ })
+
httpTest(t, func(srv *HTTPServer) {
body := bytes.NewBuffer(nil)
req, err := http.NewRequest("GET", "/v1/query/not-there/execute", body)
@@ -357,6 +393,10 @@ func TestPreparedQuery_Explain(t *testing.T) {
Datacenter: "dc1",
Node: "my-node",
},
+ Agent: structs.QuerySource{
+ Datacenter: srv.agent.config.Datacenter,
+ Node: srv.agent.config.NodeName,
+ },
QueryOptions: structs.QueryOptions{
Token: "my-token",
RequireConsistent: true,
diff --git a/consul/prepared_query/walk_test.go b/consul/prepared_query/walk_test.go
index db9a75c1cb..05294e3b65 100644
--- a/consul/prepared_query/walk_test.go
+++ b/consul/prepared_query/walk_test.go
@@ -20,6 +20,7 @@ func TestWalk_ServiceQuery(t *testing.T) {
Failover: structs.QueryDatacenterOptions{
Datacenters: []string{"dc1", "dc2"},
},
+ Near: "_agent",
Tags: []string{"tag1", "tag2", "tag3"},
}
if err := walk(service, fn); err != nil {
@@ -30,6 +31,7 @@ func TestWalk_ServiceQuery(t *testing.T) {
".Service:the-service",
".Failover.Datacenters[0]:dc1",
".Failover.Datacenters[1]:dc2",
+ ".Near:_agent",
".Tags[0]:tag1",
".Tags[1]:tag2",
".Tags[2]:tag3",
diff --git a/consul/prepared_query_endpoint.go b/consul/prepared_query_endpoint.go
index d30b6c10dd..5f7d788943 100644
--- a/consul/prepared_query_endpoint.go
+++ b/consul/prepared_query_endpoint.go
@@ -368,7 +368,25 @@ func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest,
// Shuffle the results in case coordinates are not available if they
// requested an RTT sort.
reply.Nodes.Shuffle()
- if err := p.srv.sortNodesByDistanceFrom(args.Source, reply.Nodes); err != nil {
+
+ // Build the query source. This can be provided by the client, or by
+ // the prepared query. Client-specified takes priority.
+ qs := args.Source
+ if qs.Datacenter == "" {
+ qs.Datacenter = args.Agent.Datacenter
+ }
+ if query.Service.Near != "" && qs.Node == "" {
+ qs.Node = query.Service.Near
+ }
+
+ // Respect the magic "_agent" flag.
+ if qs.Node == "_agent" {
+ qs.Node = args.Agent.Node
+ }
+
+ // Perform the distance sort
+ err = p.srv.sortNodesByDistanceFrom(qs, reply.Nodes)
+ if err != nil {
return err
}
diff --git a/consul/prepared_query_endpoint_test.go b/consul/prepared_query_endpoint_test.go
index cb10eb8f8c..5630f26c61 100644
--- a/consul/prepared_query_endpoint_test.go
+++ b/consul/prepared_query_endpoint_test.go
@@ -1607,6 +1607,197 @@ func TestPreparedQuery_Execute(t *testing.T) {
t.Fatalf("unique shuffle ratio too low: %d/100", len(uniques))
}
+ // Set the query to return results nearest to node3. This is the only
+ // node with coordinates, and it carries the service we are asking for,
+ // so node3 should always show up first.
+ query.Op = structs.PreparedQueryUpdate
+ query.Query.Service.Near = "node3"
+ if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil {
+ t.Fatalf("err: %v", err)
+ }
+
+ // Now run the query and make sure the sort looks right.
+ {
+ req := structs.PreparedQueryExecuteRequest{
+ Agent: structs.QuerySource{
+ Datacenter: "dc1",
+ Node: "node3",
+ },
+ Datacenter: "dc1",
+ QueryIDOrName: query.Query.ID,
+ QueryOptions: structs.QueryOptions{Token: execToken},
+ }
+
+ var reply structs.PreparedQueryExecuteResponse
+
+ for i := 0; i < 10; i++ {
+ if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
+ t.Fatalf("err: %v", err)
+ }
+ if n := len(reply.Nodes); n != 10 {
+ t.Fatalf("expect 10 nodes, got: %d", n)
+ }
+ if node := reply.Nodes[0].Node.Node; node != "node3" {
+ t.Fatalf("expect node3 first, got: %q", node)
+ }
+ }
+ }
+
+ // Query again, but this time set a client-supplied query source. This
+ // proves that we allow overriding the baked-in value with ?near.
+ {
+ // Set up the query with a non-existent node. This will cause the
+ // nodes to be shuffled if the passed node is respected, proving
+ // that we allow the override to happen.
+ req := structs.PreparedQueryExecuteRequest{
+ Source: structs.QuerySource{
+ Datacenter: "dc1",
+ Node: "foo",
+ },
+ Agent: structs.QuerySource{
+ Datacenter: "dc1",
+ Node: "node3",
+ },
+ Datacenter: "dc1",
+ QueryIDOrName: query.Query.ID,
+ QueryOptions: structs.QueryOptions{Token: execToken},
+ }
+
+ var reply structs.PreparedQueryExecuteResponse
+
+ shuffled := false
+ for i := 0; i < 10; i++ {
+ if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
+ t.Fatalf("err: %v", err)
+ }
+ if n := len(reply.Nodes); n != 10 {
+ t.Fatalf("expect 10 nodes, got: %d", n)
+ }
+ if node := reply.Nodes[0].Node.Node; node != "node3" {
+ shuffled = true
+ break
+ }
+ }
+
+ if !shuffled {
+ t.Fatalf("expect nodes to be shuffled")
+ }
+ }
+
+ // Bake the magic "_agent" flag into the query.
+ query.Query.Service.Near = "_agent"
+ if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil {
+ t.Fatalf("err: %v", err)
+ }
+
+ // Check that we sort the local agent first when the magic flag is set.
+ {
+ req := structs.PreparedQueryExecuteRequest{
+ Agent: structs.QuerySource{
+ Datacenter: "dc1",
+ Node: "node3",
+ },
+ Datacenter: "dc1",
+ QueryIDOrName: query.Query.ID,
+ QueryOptions: structs.QueryOptions{Token: execToken},
+ }
+
+ var reply structs.PreparedQueryExecuteResponse
+
+ for i := 0; i < 10; i++ {
+ if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
+ t.Fatalf("err: %v", err)
+ }
+ if n := len(reply.Nodes); n != 10 {
+ t.Fatalf("expect 10 nodes, got: %d", n)
+ }
+ if node := reply.Nodes[0].Node.Node; node != "node3" {
+ t.Fatalf("expect node3 first, got: %q", node)
+ }
+ }
+ }
+
+ // Check that the query isn't just sorting "node3" first because we
+ // provided it in the Agent query source. Proves that we use the
+ // Agent source when the magic "_agent" flag is passed.
+ {
+ req := structs.PreparedQueryExecuteRequest{
+ Agent: structs.QuerySource{
+ Datacenter: "dc1",
+ Node: "foo",
+ },
+ Datacenter: "dc1",
+ QueryIDOrName: query.Query.ID,
+ QueryOptions: structs.QueryOptions{Token: execToken},
+ }
+
+ var reply structs.PreparedQueryExecuteResponse
+
+ // Expect the set to be shuffled since we have no coordinates
+ // on the "foo" node.
+ shuffled := false
+ for i := 0; i < 10; i++ {
+ if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
+ t.Fatalf("err: %v", err)
+ }
+ if n := len(reply.Nodes); n != 10 {
+ t.Fatalf("expect 10 nodes, got: %d", n)
+ }
+ if node := reply.Nodes[0].Node.Node; node != "node3" {
+ shuffled = true
+ break
+ }
+ }
+
+ if !shuffled {
+ t.Fatal("expect nodes to be shuffled")
+ }
+ }
+
+ // Shuffles if the response comes from a non-local DC. Proves that the
+ // agent query source does not interfere with the order.
+ {
+ req := structs.PreparedQueryExecuteRequest{
+ Source: structs.QuerySource{
+ Datacenter: "dc2",
+ Node: "node3",
+ },
+ Agent: structs.QuerySource{
+ Datacenter: "dc1",
+ Node: "node3",
+ },
+ Datacenter: "dc1",
+ QueryIDOrName: query.Query.ID,
+ QueryOptions: structs.QueryOptions{Token: execToken},
+ }
+
+ var reply structs.PreparedQueryExecuteResponse
+
+ shuffled := false
+ for i := 0; i < 10; i++ {
+ if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply); err != nil {
+ t.Fatalf("err: %v", err)
+ }
+ if n := len(reply.Nodes); n != 10 {
+ t.Fatalf("expect 10 nodes, got: %d", n)
+ }
+ if reply.Nodes[0].Node.Node != "node3" {
+ shuffled = true
+ break
+ }
+ }
+
+ if !shuffled {
+ t.Fatal("expect node shuffle for remote results")
+ }
+ }
+
+ // Un-bake the near parameter.
+ query.Query.Service.Near = ""
+ if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil {
+ t.Fatalf("err: %v", err)
+ }
+
// Update the health of a node to mark it critical.
setHealth := func(node string, health string) {
req := structs.RegisterRequest{
@@ -1683,7 +1874,6 @@ func TestPreparedQuery_Execute(t *testing.T) {
}
// Make the query more picky so it excludes warning nodes.
- query.Op = structs.PreparedQueryUpdate
query.Query.Service.OnlyPassing = true
if err := msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &query, &query.Query.ID); err != nil {
t.Fatalf("err: %v", err)
diff --git a/consul/structs/prepared_query.go b/consul/structs/prepared_query.go
index b1b20c9ed3..5e9c31847b 100644
--- a/consul/structs/prepared_query.go
+++ b/consul/structs/prepared_query.go
@@ -34,6 +34,12 @@ type ServiceQuery struct {
// discarded)
OnlyPassing bool
+ // Near allows the query to always prefer the node nearest the given
+ // node. If the node does not exist, results are returned in their
+ // normal randomly-shuffled order. Supplying the magic "_agent" value
+ // is supported to sort near the agent which initiated the request.
+ Near string
+
// Tags are a set of required and/or disallowed tags. If a tag is in
// this list it must be present. If the tag is preceded with "!" then
// it is disallowed.
@@ -177,6 +183,10 @@ type PreparedQueryExecuteRequest struct {
// network coordinates.
Source QuerySource
+ // Agent is used to carry around a reference to the agent which initiated
+ // the execute request. Used to distance-sort relative to the local node.
+ Agent QuerySource
+
// QueryOptions (unfortunately named here) controls the consistency
// settings for the query lookup itself, as well as the service lookups.
QueryOptions
diff --git a/website/source/docs/agent/http/query.html.markdown b/website/source/docs/agent/http/query.html.markdown
index bad3ae1288..d2737384d9 100644
--- a/website/source/docs/agent/http/query.html.markdown
+++ b/website/source/docs/agent/http/query.html.markdown
@@ -70,6 +70,7 @@ query, like this example:
"Name": "my-query",
"Session": "adf4238a-882b-9ddc-4a9d-5b6758e4159e",
"Token": "",
+ "Near": "node1",
"Service": {
"Service": "redis",
"Failover": {
@@ -114,6 +115,16 @@ attribute which can be set on functions. This change in effect moves Consul
from using `SECURITY DEFINER` by default to `SECURITY INVOKER` by default for
new Prepared Queries.
+
+`Near` allows specifying a particular node to sort near based on distance
+sorting using [Network Coordinates](/docs/internals/coordinates.html). The
+nearest instance to the specified node will be returned first, and subsequent
+nodes in the response will be sorted in ascending order of estimated round-trip
+times. If the node given does not exist, the nodes in the response will
+be shuffled. Using the magic `_agent` value is supported, and will automatically
+return results nearest the agent servicing the request. If unspecified, the
+response will be shuffled by default.
+
The set of fields inside the `Service` structure define the query's behavior.
`Service` is the name of the service to query. This is required.
@@ -365,8 +376,9 @@ blocking queries, but it does support all consistency modes.
Adding the optional "?near=" parameter with a node name will sort the resulting
list in ascending order based on the estimated round trip time from that node.
Passing "?near=_agent" will use the agent's node for the sort. If this is not
-present, then the nodes will be shuffled randomly and will be in a different
-order each time the query is executed.
+present, the default behavior will shuffle the nodes randomly each time the
+query is executed. Passing this option will override the built-in
+near parameter of a prepared query, if present.
An optional "?limit=" parameter can be used to limit the size of the list to
the given number of nodes. This is applied after any sorting or shuffling.
diff --git a/website/source/docs/upgrade-specific.html.markdown b/website/source/docs/upgrade-specific.html.markdown
index d55e397b47..1a97794a2a 100644
--- a/website/source/docs/upgrade-specific.html.markdown
+++ b/website/source/docs/upgrade-specific.html.markdown
@@ -14,6 +14,21 @@ details provided for their upgrades as a result of new features or changed
behavior. This page is used to document those details separately from the
standard upgrade flow.
+## Consul 0.7
+
+Consul version 0.7 adds a feature which allows prepared queries to store a
+["Near" parameter](/docs/agent/http/query.html#near) in the query definition
+itself. This feature enables using the distance sorting features of prepared
+queries without explicitly providing the node to sort near in requests, but
+requires the agent servicing a request to send additional information about
+itself to the Consul servers when executing the prepared query. Agents prior
+to 0.7.0 do not send this information, which means they are unable to properly
+execute prepared queries configured with a `Near` parameter. Similarly, any
+server nodes prior to version 0.7.0 are unable to store the `Near` parameter,
+making them unable to properly serve requests for prepared queries using the
+feature. It is recommended that all agents be running version 0.7.0 prior to
+using this feature.
+
## Consul 0.6.4
Consul 0.6.4 made some substantial changes to how ACLs work with prepared