diff --git a/consul/prepared_query_endpoint.go b/consul/prepared_query_endpoint.go index aff1a1b2a1..b3863f71f2 100644 --- a/consul/prepared_query_endpoint.go +++ b/consul/prepared_query_endpoint.go @@ -305,7 +305,7 @@ func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest, // by the query setup. if len(reply.Nodes) == 0 { wrapper := &queryServerWrapper{p.srv} - if err := queryFailover(wrapper, query, args, reply); err != nil { + if err := queryFailover(wrapper, query, args.Limit, args.QueryOptions, reply); err != nil { return err } } @@ -488,7 +488,7 @@ func (q *queryServerWrapper) ForwardDC(method, dc string, args interface{}, repl // queryFailover runs an algorithm to determine which DCs to try and then calls // them to try to locate alternative services. func queryFailover(q queryServer, query *structs.PreparedQuery, - args *structs.PreparedQueryExecuteRequest, + limit int, options structs.QueryOptions, reply *structs.PreparedQueryExecuteResponse) error { // Pull the list of other DCs. This is sorted by RTT in case the user @@ -543,10 +543,10 @@ func queryFailover(q queryServer, query *structs.PreparedQuery, remote := &structs.PreparedQueryExecuteRemoteRequest{ Datacenter: dc, Query: *query, - Limit: args.Limit, - QueryOptions: args.QueryOptions, + Limit: limit, + QueryOptions: options, } - if err := q.ForwardDC("PreparedQuery.ExecuteRemote", dc, &remote, &reply); err != nil { + if err := q.ForwardDC("PreparedQuery.ExecuteRemote", dc, remote, reply); err != nil { q.GetLogger().Printf("[WARN] consul.prepared_query: Failed querying for service '%s' in datacenter '%s': %s", query.Service.Service, dc, err) continue } diff --git a/consul/prepared_query_endpoint_test.go b/consul/prepared_query_endpoint_test.go index e8c379eb03..c28eaf063a 100644 --- a/consul/prepared_query_endpoint_test.go +++ b/consul/prepared_query_endpoint_test.go @@ -1,7 +1,9 @@ package consul import ( + "bytes" "fmt" + "log" "net/rpc" "os" "reflect" @@ -1800,3 +1802,380 @@ func TestPreparedQuery_Wrapper(t *testing.T) { t.Fatalf("err: %v", err) } } + +type mockQueryServer struct { + Datacenters []string + DatacentersError error + QueryLog []string + QueryFn func(dc string, args interface{}, reply interface{}) error + Logger *log.Logger + LogBuffer *bytes.Buffer +} + +func (m *mockQueryServer) JoinQueryLog() string { + return strings.Join(m.QueryLog, "|") +} + +func (m *mockQueryServer) GetLogger() *log.Logger { + if m.Logger == nil { + m.LogBuffer = new(bytes.Buffer) + m.Logger = log.New(m.LogBuffer, "", 0) + } + return m.Logger +} + +func (m *mockQueryServer) GetOtherDatacentersByDistance() ([]string, error) { + return m.Datacenters, m.DatacentersError +} + +func (m *mockQueryServer) ForwardDC(method, dc string, args interface{}, reply interface{}) error { + m.QueryLog = append(m.QueryLog, fmt.Sprintf("%s:%s", dc, method)) + if ret, ok := reply.(*structs.PreparedQueryExecuteResponse); ok { + ret.Datacenter = dc + } + if m.QueryFn != nil { + return m.QueryFn(dc, args, reply) + } else { + return nil + } +} + +func TestPreparedQuery_queryFailover(t *testing.T) { + query := &structs.PreparedQuery{ + Service: structs.ServiceQuery{ + Failover: structs.QueryDatacenterOptions{ + NearestN: 0, + Datacenters: []string{""}, + }, + }, + } + + nodes := func() structs.CheckServiceNodes { + return structs.CheckServiceNodes{ + structs.CheckServiceNode{ + Node: &structs.Node{Node: "node1"}, + }, + structs.CheckServiceNode{ + Node: &structs.Node{Node: "node2"}, + }, + structs.CheckServiceNode{ + Node: &structs.Node{Node: "node3"}, + }, + } + } + + // Datacenters are available but the query doesn't use them. + { + mock := &mockQueryServer{ + Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"}, + } + + var reply structs.PreparedQueryExecuteResponse + if err := queryFailover(mock, query, 0, structs.QueryOptions{}, &reply); err != nil { + t.Fatalf("err: %v", err) + } + if len(reply.Nodes) != 0 || reply.Datacenter != "" || reply.Failovers != 0 { + t.Fatalf("bad: %v", reply) + } + } + + // Make it fail to get datacenters. + { + mock := &mockQueryServer{ + Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"}, + DatacentersError: fmt.Errorf("XXX"), + } + + var reply structs.PreparedQueryExecuteResponse + err := queryFailover(mock, query, 0, structs.QueryOptions{}, &reply) + if err == nil || !strings.Contains(err.Error(), "XXX") { + t.Fatalf("bad: %v", err) + } + if len(reply.Nodes) != 0 || reply.Datacenter != "" || reply.Failovers != 0 { + t.Fatalf("bad: %v", reply) + } + } + + // The query wants to use other datacenters but none are available. + query.Service.Failover.NearestN = 3 + { + mock := &mockQueryServer{ + Datacenters: []string{}, + } + + var reply structs.PreparedQueryExecuteResponse + if err := queryFailover(mock, query, 0, structs.QueryOptions{}, &reply); err != nil { + t.Fatalf("err: %v", err) + } + if len(reply.Nodes) != 0 || reply.Datacenter != "" || reply.Failovers != 0 { + t.Fatalf("bad: %v", reply) + } + } + + // Try the first three nearest datacenters, first one has the data. + query.Service.Failover.NearestN = 3 + { + mock := &mockQueryServer{ + Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"}, + QueryFn: func(dc string, args interface{}, reply interface{}) error { + ret := reply.(*structs.PreparedQueryExecuteResponse) + if dc == "dc1" { + ret.Nodes = nodes() + } + return nil + }, + } + + var reply structs.PreparedQueryExecuteResponse + if err := queryFailover(mock, query, 0, structs.QueryOptions{}, &reply); err != nil { + t.Fatalf("err: %v", err) + } + if len(reply.Nodes) != 3 || + reply.Datacenter != "dc1" || reply.Failovers != 1 || + !reflect.DeepEqual(reply.Nodes, nodes()) { + t.Fatalf("bad: %v", reply) + } + if queries := mock.JoinQueryLog(); queries != "dc1:PreparedQuery.ExecuteRemote" { + t.Fatalf("bad: %s", queries) + } + } + + // Try the first three nearest datacenters, last one has the data. + query.Service.Failover.NearestN = 3 + { + mock := &mockQueryServer{ + Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"}, + QueryFn: func(dc string, args interface{}, reply interface{}) error { + ret := reply.(*structs.PreparedQueryExecuteResponse) + if dc == "dc3" { + ret.Nodes = nodes() + } + return nil + }, + } + + var reply structs.PreparedQueryExecuteResponse + if err := queryFailover(mock, query, 0, structs.QueryOptions{}, &reply); err != nil { + t.Fatalf("err: %v", err) + } + if len(reply.Nodes) != 3 || + reply.Datacenter != "dc3" || reply.Failovers != 3 || + !reflect.DeepEqual(reply.Nodes, nodes()) { + t.Fatalf("bad: %v", reply) + } + if queries := mock.JoinQueryLog(); queries != "dc1:PreparedQuery.ExecuteRemote|dc2:PreparedQuery.ExecuteRemote|dc3:PreparedQuery.ExecuteRemote" { + t.Fatalf("bad: %s", queries) + } + } + + // Try the first four nearest datacenters, nobody has the data. + query.Service.Failover.NearestN = 4 + { + mock := &mockQueryServer{ + Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"}, + } + + var reply structs.PreparedQueryExecuteResponse + if err := queryFailover(mock, query, 0, structs.QueryOptions{}, &reply); err != nil { + t.Fatalf("err: %v", err) + } + if len(reply.Nodes) != 0 || + reply.Datacenter != "xxx" || reply.Failovers != 4 { + t.Fatalf("bad: %v", reply) + } + if queries := mock.JoinQueryLog(); queries != "dc1:PreparedQuery.ExecuteRemote|dc2:PreparedQuery.ExecuteRemote|dc3:PreparedQuery.ExecuteRemote|xxx:PreparedQuery.ExecuteRemote" { + t.Fatalf("bad: %s", queries) + } + } + + // Try the first two nearest datacenters, plus a user-specified one that + // has the data. + query.Service.Failover.NearestN = 2 + query.Service.Failover.Datacenters = []string{"dc4"} + { + mock := &mockQueryServer{ + Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"}, + QueryFn: func(dc string, args interface{}, reply interface{}) error { + ret := reply.(*structs.PreparedQueryExecuteResponse) + if dc == "dc4" { + ret.Nodes = nodes() + } + return nil + }, + } + + var reply structs.PreparedQueryExecuteResponse + if err := queryFailover(mock, query, 0, structs.QueryOptions{}, &reply); err != nil { + t.Fatalf("err: %v", err) + } + if len(reply.Nodes) != 3 || + reply.Datacenter != "dc4" || reply.Failovers != 3 || + !reflect.DeepEqual(reply.Nodes, nodes()) { + t.Fatalf("bad: %v", reply) + } + if queries := mock.JoinQueryLog(); queries != "dc1:PreparedQuery.ExecuteRemote|dc2:PreparedQuery.ExecuteRemote|dc4:PreparedQuery.ExecuteRemote" { + t.Fatalf("bad: %s", queries) + } + } + + // Add in a hard-coded value that overlaps with the nearest list. + query.Service.Failover.NearestN = 2 + query.Service.Failover.Datacenters = []string{"dc4", "dc1"} + { + mock := &mockQueryServer{ + Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"}, + QueryFn: func(dc string, args interface{}, reply interface{}) error { + ret := reply.(*structs.PreparedQueryExecuteResponse) + if dc == "dc4" { + ret.Nodes = nodes() + } + return nil + }, + } + + var reply structs.PreparedQueryExecuteResponse + if err := queryFailover(mock, query, 0, structs.QueryOptions{}, &reply); err != nil { + t.Fatalf("err: %v", err) + } + if len(reply.Nodes) != 3 || + reply.Datacenter != "dc4" || reply.Failovers != 3 || + !reflect.DeepEqual(reply.Nodes, nodes()) { + t.Fatalf("bad: %v", reply) + } + if queries := mock.JoinQueryLog(); queries != "dc1:PreparedQuery.ExecuteRemote|dc2:PreparedQuery.ExecuteRemote|dc4:PreparedQuery.ExecuteRemote" { + t.Fatalf("bad: %s", queries) + } + } + + // Now add a bogus user-defined one to the mix. + query.Service.Failover.NearestN = 2 + query.Service.Failover.Datacenters = []string{"nope", "dc4", "dc1"} + { + mock := &mockQueryServer{ + Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"}, + QueryFn: func(dc string, args interface{}, reply interface{}) error { + ret := reply.(*structs.PreparedQueryExecuteResponse) + if dc == "dc4" { + ret.Nodes = nodes() + } + return nil + }, + } + + var reply structs.PreparedQueryExecuteResponse + if err := queryFailover(mock, query, 0, structs.QueryOptions{}, &reply); err != nil { + t.Fatalf("err: %v", err) + } + if len(reply.Nodes) != 3 || + reply.Datacenter != "dc4" || reply.Failovers != 3 || + !reflect.DeepEqual(reply.Nodes, nodes()) { + t.Fatalf("bad: %v", reply) + } + if queries := mock.JoinQueryLog(); queries != "dc1:PreparedQuery.ExecuteRemote|dc2:PreparedQuery.ExecuteRemote|dc4:PreparedQuery.ExecuteRemote" { + t.Fatalf("bad: %s", queries) + } + if !strings.Contains(mock.LogBuffer.String(), "Skipping unknown datacenter") { + t.Fatalf("bad: %s", mock.LogBuffer.String()) + } + } + + // Same setup as before but dc1 is going to return an error and should + // get skipped over, still yielding data from dc4 which comes later. + query.Service.Failover.NearestN = 2 + query.Service.Failover.Datacenters = []string{"dc4", "dc1"} + { + mock := &mockQueryServer{ + Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"}, + QueryFn: func(dc string, args interface{}, reply interface{}) error { + ret := reply.(*structs.PreparedQueryExecuteResponse) + if dc == "dc1" { + return fmt.Errorf("XXX") + } else if dc == "dc4" { + ret.Nodes = nodes() + } + return nil + }, + } + + var reply structs.PreparedQueryExecuteResponse + if err := queryFailover(mock, query, 0, structs.QueryOptions{}, &reply); err != nil { + t.Fatalf("err: %v", err) + } + if len(reply.Nodes) != 3 || + reply.Datacenter != "dc4" || reply.Failovers != 3 || + !reflect.DeepEqual(reply.Nodes, nodes()) { + t.Fatalf("bad: %v", reply) + } + if queries := mock.JoinQueryLog(); queries != "dc1:PreparedQuery.ExecuteRemote|dc2:PreparedQuery.ExecuteRemote|dc4:PreparedQuery.ExecuteRemote" { + t.Fatalf("bad: %s", queries) + } + if !strings.Contains(mock.LogBuffer.String(), "Failed querying") { + t.Fatalf("bad: %s", mock.LogBuffer.String()) + } + } + + // Just use a hard-coded list and now xxx has the data. + query.Service.Failover.NearestN = 0 + query.Service.Failover.Datacenters = []string{"dc3", "xxx"} + { + mock := &mockQueryServer{ + Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"}, + QueryFn: func(dc string, args interface{}, reply interface{}) error { + ret := reply.(*structs.PreparedQueryExecuteResponse) + if dc == "xxx" { + ret.Nodes = nodes() + } + return nil + }, + } + + var reply structs.PreparedQueryExecuteResponse + if err := queryFailover(mock, query, 0, structs.QueryOptions{}, &reply); err != nil { + t.Fatalf("err: %v", err) + } + if len(reply.Nodes) != 3 || + reply.Datacenter != "xxx" || reply.Failovers != 2 || + !reflect.DeepEqual(reply.Nodes, nodes()) { + t.Fatalf("bad: %v", reply) + } + if queries := mock.JoinQueryLog(); queries != "dc3:PreparedQuery.ExecuteRemote|xxx:PreparedQuery.ExecuteRemote" { + t.Fatalf("bad: %s", queries) + } + } + + // Make sure the limit and query options are plumbed through. + query.Service.Failover.NearestN = 0 + query.Service.Failover.Datacenters = []string{"xxx"} + { + mock := &mockQueryServer{ + Datacenters: []string{"dc1", "dc2", "dc3", "xxx", "dc4"}, + QueryFn: func(dc string, args interface{}, reply interface{}) error { + inp := args.(*structs.PreparedQueryExecuteRemoteRequest) + ret := reply.(*structs.PreparedQueryExecuteResponse) + if dc == "xxx" { + if inp.Limit != 5 { + t.Fatalf("bad: %d", inp.Limit) + } + if inp.RequireConsistent != true { + t.Fatalf("bad: %v", inp.RequireConsistent) + } + ret.Nodes = nodes() + } + return nil + }, + } + + var reply structs.PreparedQueryExecuteResponse + if err := queryFailover(mock, query, 5, structs.QueryOptions{RequireConsistent: true}, &reply); err != nil { + t.Fatalf("err: %v", err) + } + if len(reply.Nodes) != 3 || + reply.Datacenter != "xxx" || reply.Failovers != 1 || + !reflect.DeepEqual(reply.Nodes, nodes()) { + t.Fatalf("bad: %v", reply) + } + if queries := mock.JoinQueryLog(); queries != "xxx:PreparedQuery.ExecuteRemote" { + t.Fatalf("bad: %s", queries) + } + } +}