Completes rebase of network coordinates to new memdb.

pull/1331/head
James Phillips 2015-10-23 15:19:14 -07:00
parent cef9402ab9
commit ecd3a1d1d2
16 changed files with 692 additions and 210 deletions

View File

@ -17,7 +17,7 @@ func coordinateDisabled(resp http.ResponseWriter, req *http.Request) (interface{
// sorter wraps a coordinate list and implements the sort.Interface to sort by // sorter wraps a coordinate list and implements the sort.Interface to sort by
// node name. // node name.
type sorter struct { type sorter struct {
coordinates []structs.Coordinate coordinates structs.Coordinates
} }
// See sort.Interface. // See sort.Interface.

View File

@ -48,6 +48,20 @@ func TestCoordinate_Nodes(t *testing.T) {
testutil.WaitForLeader(t, srv.agent.RPC, "dc1") testutil.WaitForLeader(t, srv.agent.RPC, "dc1")
// Register the nodes.
nodes := []string{"foo", "bar"}
for _, node := range nodes {
req := structs.RegisterRequest{
Datacenter: "dc1",
Node: node,
Address: "127.0.0.1",
}
var reply struct{}
if err := srv.agent.RPC("Catalog.Register", &req, &reply); err != nil {
t.Fatalf("err: %s", err)
}
}
// Send some coordinates for a few nodes, waiting a little while for the // Send some coordinates for a few nodes, waiting a little while for the
// batch update to run. // batch update to run.
arg1 := structs.CoordinateUpdateRequest{ arg1 := structs.CoordinateUpdateRequest{
@ -82,7 +96,7 @@ func TestCoordinate_Nodes(t *testing.T) {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
coordinates := obj.([]structs.Coordinate) coordinates := obj.(structs.Coordinates)
if len(coordinates) != 2 || if len(coordinates) != 2 ||
coordinates[0].Node != "bar" || coordinates[0].Node != "bar" ||
coordinates[1].Node != "foo" { coordinates[1].Node != "foo" {

View File

@ -819,15 +819,18 @@ func TestAgent_sendCoordinate(t *testing.T) {
time.Sleep(2 * conf.ConsulConfig.CoordinateUpdatePeriod) time.Sleep(2 * conf.ConsulConfig.CoordinateUpdatePeriod)
// Make sure the coordinate is present. // Make sure the coordinate is present.
req := structs.NodeSpecificRequest{ req := structs.DCSpecificRequest{
Datacenter: agent.config.Datacenter, Datacenter: agent.config.Datacenter,
Node: agent.config.NodeName,
} }
var reply structs.IndexedCoordinate var reply structs.IndexedCoordinates
if err := agent.RPC("Coordinate.Get", &req, &reply); err != nil { if err := agent.RPC("Coordinate.ListNodes", &req, &reply); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %s", err)
} }
if reply.Coord == nil { if len(reply.Coordinates) != 1 {
t.Fatalf("should get a coordinate") t.Fatalf("expected a coordinate: %v", reply)
}
coord := reply.Coordinates[0]
if coord.Node != agent.config.NodeName || coord.Coord == nil {
t.Fatalf("bad: %v", coord)
} }
} }

View File

@ -54,25 +54,39 @@ func TestRTTCommand_Run_LAN(t *testing.T) {
c2 := c1.Clone() c2 := c1.Clone()
c2.Vec[0] = 0.123 c2.Vec[0] = 0.123
dist_str := fmt.Sprintf("%.3f ms", c1.DistanceTo(c2).Seconds()*1000.0) dist_str := fmt.Sprintf("%.3f ms", c1.DistanceTo(c2).Seconds()*1000.0)
{
req1 := structs.CoordinateUpdateRequest{ req := structs.CoordinateUpdateRequest{
Datacenter: a.config.Datacenter, Datacenter: a.config.Datacenter,
Node: a.config.NodeName, Node: a.config.NodeName,
Coord: c1, Coord: c1,
} }
var reply struct{} var reply struct{}
if err := a.agent.RPC("Coordinate.Update", &req1, &reply); err != nil { if err := a.agent.RPC("Coordinate.Update", &req, &reply); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
}
req2 := structs.CoordinateUpdateRequest{ {
req := structs.RegisterRequest{
Datacenter: a.config.Datacenter,
Node: "dogs",
Address: "127.0.0.2",
}
var reply struct{}
if err := a.agent.RPC("Catalog.Register", &req, &reply); err != nil {
t.Fatalf("err: %s", err)
}
}
{
var reply struct{}
req := structs.CoordinateUpdateRequest{
Datacenter: a.config.Datacenter, Datacenter: a.config.Datacenter,
Node: "dogs", Node: "dogs",
Coord: c2, Coord: c2,
} }
if err := a.agent.RPC("Coordinate.Update", &req2, &reply); err != nil { if err := a.agent.RPC("Coordinate.Update", &req, &reply); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
}
// Wait for the updates to get flushed to the data store. // Wait for the updates to get flushed to the data store.
time.Sleep(2 * updatePeriod) time.Sleep(2 * updatePeriod)

View File

@ -512,21 +512,21 @@ func TestCatalogListNodes_DistanceSort(t *testing.T) {
defer codec.Close() defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1") testutil.WaitForLeader(t, s1.RPC, "dc1")
if err := s1.fsm.State().EnsureNode(1, structs.Node{"aaa", "127.0.0.1"}); err != nil { if err := s1.fsm.State().EnsureNode(1, &structs.Node{Node: "aaa", Address: "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if err := s1.fsm.State().EnsureNode(2, structs.Node{"foo", "127.0.0.2"}); err != nil { if err := s1.fsm.State().EnsureNode(2, &structs.Node{Node: "foo", Address: "127.0.0.2"}); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if err := s1.fsm.State().EnsureNode(3, structs.Node{"bar", "127.0.0.3"}); err != nil { if err := s1.fsm.State().EnsureNode(3, &structs.Node{Node: "bar", Address: "127.0.0.3"}); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if err := s1.fsm.State().EnsureNode(4, structs.Node{"baz", "127.0.0.4"}); err != nil { if err := s1.fsm.State().EnsureNode(4, &structs.Node{Node: "baz", Address: "127.0.0.4"}); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
// Set all but one of the nodes to known coordinates. // Set all but one of the nodes to known coordinates.
updates := []structs.Coordinate{ updates := structs.Coordinates{
{"foo", generateCoordinate(2 * time.Millisecond)}, {"foo", generateCoordinate(2 * time.Millisecond)},
{"bar", generateCoordinate(5 * time.Millisecond)}, {"bar", generateCoordinate(5 * time.Millisecond)},
{"baz", generateCoordinate(1 * time.Millisecond)}, {"baz", generateCoordinate(1 * time.Millisecond)},
@ -870,17 +870,17 @@ func TestCatalogListServiceNodes_DistanceSort(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC, "dc1") testutil.WaitForLeader(t, s1.RPC, "dc1")
// Add a few nodes for the associated services. // Add a few nodes for the associated services.
s1.fsm.State().EnsureNode(1, structs.Node{"aaa", "127.0.0.1"}) s1.fsm.State().EnsureNode(1, &structs.Node{Node: "aaa", Address: "127.0.0.1"})
s1.fsm.State().EnsureService(2, "aaa", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.1", 5000, false}) s1.fsm.State().EnsureService(2, "aaa", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000})
s1.fsm.State().EnsureNode(3, structs.Node{"foo", "127.0.0.2"}) s1.fsm.State().EnsureNode(3, &structs.Node{Node: "foo", Address: "127.0.0.2"})
s1.fsm.State().EnsureService(4, "foo", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.2", 5000, false}) s1.fsm.State().EnsureService(4, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.2", Port: 5000})
s1.fsm.State().EnsureNode(5, structs.Node{"bar", "127.0.0.3"}) s1.fsm.State().EnsureNode(5, &structs.Node{Node: "bar", Address: "127.0.0.3"})
s1.fsm.State().EnsureService(6, "bar", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.3", 5000, false}) s1.fsm.State().EnsureService(6, "bar", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.3", Port: 5000})
s1.fsm.State().EnsureNode(7, structs.Node{"baz", "127.0.0.4"}) s1.fsm.State().EnsureNode(7, &structs.Node{Node: "baz", Address: "127.0.0.4"})
s1.fsm.State().EnsureService(8, "baz", &structs.NodeService{"db", "db", []string{"primary"}, "127.0.0.4", 5000, false}) s1.fsm.State().EnsureService(8, "baz", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.4", Port: 5000})
// Set all but one of the nodes to known coordinates. // Set all but one of the nodes to known coordinates.
updates := []structs.Coordinate{ updates := structs.Coordinates{
{"foo", generateCoordinate(2 * time.Millisecond)}, {"foo", generateCoordinate(2 * time.Millisecond)},
{"bar", generateCoordinate(5 * time.Millisecond)}, {"bar", generateCoordinate(5 * time.Millisecond)},
{"baz", generateCoordinate(1 * time.Millisecond)}, {"baz", generateCoordinate(1 * time.Millisecond)},
@ -900,13 +900,13 @@ func TestCatalogListServiceNodes_DistanceSort(t *testing.T) {
if out.ServiceNodes[0].Node != "aaa" { if out.ServiceNodes[0].Node != "aaa" {
t.Fatalf("bad: %v", out) t.Fatalf("bad: %v", out)
} }
if out.ServiceNodes[1].Node != "foo" { if out.ServiceNodes[1].Node != "bar" {
t.Fatalf("bad: %v", out) t.Fatalf("bad: %v", out)
} }
if out.ServiceNodes[2].Node != "bar" { if out.ServiceNodes[2].Node != "baz" {
t.Fatalf("bad: %v", out) t.Fatalf("bad: %v", out)
} }
if out.ServiceNodes[3].Node != "baz" { if out.ServiceNodes[3].Node != "foo" {
t.Fatalf("bad: %v", out) t.Fatalf("bad: %v", out)
} }

View File

@ -49,7 +49,8 @@ func (c *Coordinate) batchUpdate() {
} }
} }
// batchApplyUpdates applies all pending updates to the Raft log in a series of batches. // batchApplyUpdates applies all pending updates to the Raft log in a series of
// batches.
func (c *Coordinate) batchApplyUpdates() error { func (c *Coordinate) batchApplyUpdates() error {
// Grab the pending updates and release the lock so we can still handle // Grab the pending updates and release the lock so we can still handle
// incoming messages. // incoming messages.
@ -68,14 +69,14 @@ func (c *Coordinate) batchApplyUpdates() error {
// Transform the map into a slice that we can feed to the Raft log in // Transform the map into a slice that we can feed to the Raft log in
// batches. // batches.
updates := make([]structs.Coordinate, size)
i := 0 i := 0
updates := make(structs.Coordinates, size)
for node, coord := range pending { for node, coord := range pending {
if !(i < size) { if !(i < size) {
break break
} }
updates[i] = structs.Coordinate{node, coord} updates[i] = &structs.Coordinate{node, coord}
i++ i++
} }
@ -117,24 +118,6 @@ func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct
return nil return nil
} }
// Get returns the coordinate of the given node.
func (c *Coordinate) Get(args *structs.NodeSpecificRequest,
reply *structs.IndexedCoordinate) error {
if done, err := c.srv.forward("Coordinate.Get", args, args, reply); done {
return err
}
state := c.srv.fsm.State()
return c.srv.blockingRPC(&args.QueryOptions,
&reply.QueryMeta,
state.QueryTables("Coordinates"),
func() error {
var err error
reply.Index, reply.Coord, err = state.CoordinateGet(args.Node)
return err
})
}
// ListDatacenters returns the list of datacenters and their respective nodes // ListDatacenters returns the list of datacenters and their respective nodes
// and the raw coordinates of those nodes (if no coordinates are available for // and the raw coordinates of those nodes (if no coordinates are available for
// any of the nodes, the node list may be empty). // any of the nodes, the node list may be empty).
@ -174,10 +157,14 @@ func (c *Coordinate) ListNodes(args *structs.DCSpecificRequest, reply *structs.I
state := c.srv.fsm.State() state := c.srv.fsm.State()
return c.srv.blockingRPC(&args.QueryOptions, return c.srv.blockingRPC(&args.QueryOptions,
&reply.QueryMeta, &reply.QueryMeta,
state.QueryTables("Coordinates"), state.GetQueryWatch("Coordinates"),
func() error { func() error {
var err error index, coords, err := state.Coordinates()
reply.Index, reply.Coordinates, err = state.Coordinates() if err != nil {
return err return err
}
reply.Index, reply.Coordinates = index, coords
return nil
}) })
} }

View File

@ -57,6 +57,20 @@ func TestCoordinate_Update(t *testing.T) {
defer codec.Close() defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1") testutil.WaitForLeader(t, s1.RPC, "dc1")
// Register some nodes.
nodes := []string{"node1", "node2"}
for _, node := range nodes {
req := structs.RegisterRequest{
Datacenter: "dc1",
Node: node,
Address: "127.0.0.1",
}
var reply struct{}
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}
}
// Send an update for the first node. // Send an update for the first node.
arg1 := structs.CoordinateUpdateRequest{ arg1 := structs.CoordinateUpdateRequest{
Datacenter: "dc1", Datacenter: "dc1",
@ -81,14 +95,14 @@ func TestCoordinate_Update(t *testing.T) {
// Make sure the updates did not yet apply because the update period // Make sure the updates did not yet apply because the update period
// hasn't expired. // hasn't expired.
state := s1.fsm.State() state := s1.fsm.State()
_, c, err := state.CoordinateGet("node1") c, err := state.CoordinateGetRaw("node1")
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if c != nil { if c != nil {
t.Fatalf("should be nil because the update should be batched") t.Fatalf("should be nil because the update should be batched")
} }
_, c, err = state.CoordinateGet("node2") c, err = state.CoordinateGetRaw("node2")
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -96,9 +110,16 @@ func TestCoordinate_Update(t *testing.T) {
t.Fatalf("should be nil because the update should be batched") t.Fatalf("should be nil because the update should be batched")
} }
// Send another update for the second node. It should take precedence
// since there will be two updates in the same batch.
arg2.Coord = generateRandomCoordinate()
if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg2, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Wait a while and the updates should get picked up. // Wait a while and the updates should get picked up.
time.Sleep(2 * s1.config.CoordinateUpdatePeriod) time.Sleep(2 * s1.config.CoordinateUpdatePeriod)
_, c, err = state.CoordinateGet("node1") c, err = state.CoordinateGetRaw("node1")
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -106,7 +127,7 @@ func TestCoordinate_Update(t *testing.T) {
t.Fatalf("should return a coordinate but it's nil") t.Fatalf("should return a coordinate but it's nil")
} }
verifyCoordinatesEqual(t, c, arg1.Coord) verifyCoordinatesEqual(t, c, arg1.Coord)
_, c, err = state.CoordinateGet("node2") c, err = state.CoordinateGetRaw("node2")
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -115,10 +136,23 @@ func TestCoordinate_Update(t *testing.T) {
} }
verifyCoordinatesEqual(t, c, arg2.Coord) verifyCoordinatesEqual(t, c, arg2.Coord)
// Register a bunch of additional nodes.
spamLen := s1.config.CoordinateUpdateBatchSize*s1.config.CoordinateUpdateMaxBatches + 1
for i := 0; i < spamLen; i++ {
req := structs.RegisterRequest{
Datacenter: "dc1",
Node: fmt.Sprintf("bogusnode%d", i),
Address: "127.0.0.1",
}
var reply struct{}
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}
}
// Now spam some coordinate updates and make sure it starts throwing // Now spam some coordinate updates and make sure it starts throwing
// them away if they exceed the batch allowance. Note we have to make // them away if they exceed the batch allowance. Note we have to make
// unique names since these are held in map by node name. // unique names since these are held in map by node name.
spamLen := s1.config.CoordinateUpdateBatchSize*s1.config.CoordinateUpdateMaxBatches + 1
for i := 0; i < spamLen; i++ { for i := 0; i < spamLen; i++ {
arg1.Node = fmt.Sprintf("bogusnode%d", i) arg1.Node = fmt.Sprintf("bogusnode%d", i)
arg1.Coord = generateRandomCoordinate() arg1.Coord = generateRandomCoordinate()
@ -132,7 +166,7 @@ func TestCoordinate_Update(t *testing.T) {
time.Sleep(2 * s1.config.CoordinateUpdatePeriod) time.Sleep(2 * s1.config.CoordinateUpdatePeriod)
numDropped := 0 numDropped := 0
for i := 0; i < spamLen; i++ { for i := 0; i < spamLen; i++ {
_, c, err = state.CoordinateGet(fmt.Sprintf("bogusnode%d", i)) c, err = state.CoordinateGetRaw(fmt.Sprintf("bogusnode%d", i))
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -153,54 +187,6 @@ func TestCoordinate_Update(t *testing.T) {
} }
} }
func TestCoordinate_Get(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
arg := structs.CoordinateUpdateRequest{
Datacenter: "dc1",
Node: "node1",
Coord: generateRandomCoordinate(),
}
// Send an initial update, waiting a little while for the batch update
// to run.
var out struct{}
if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
time.Sleep(2 * s1.config.CoordinateUpdatePeriod)
// Query the coordinate via RPC.
arg2 := structs.NodeSpecificRequest{
Datacenter: "dc1",
Node: "node1",
}
coord := structs.IndexedCoordinate{}
if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Get", &arg2, &coord); err != nil {
t.Fatalf("err: %v", err)
}
verifyCoordinatesEqual(t, coord.Coord, arg.Coord)
// Send another coordinate update, waiting after for the flush.
arg.Coord = generateRandomCoordinate()
if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
time.Sleep(2 * s1.config.CoordinateUpdatePeriod)
// Now re-query and make sure the results are fresh.
if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Get", &arg2, &coord); err != nil {
t.Fatalf("err: %v", err)
}
verifyCoordinatesEqual(t, coord.Coord, arg.Coord)
}
func TestCoordinate_ListDatacenters(t *testing.T) { func TestCoordinate_ListDatacenters(t *testing.T) {
dir1, s1 := testServer(t) dir1, s1 := testServer(t)
defer os.RemoveAll(dir1) defer os.RemoveAll(dir1)
@ -240,6 +226,20 @@ func TestCoordinate_ListNodes(t *testing.T) {
defer codec.Close() defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1") testutil.WaitForLeader(t, s1.RPC, "dc1")
// Register some nodes.
nodes := []string{"foo", "bar", "baz"}
for _, node := range nodes {
req := structs.RegisterRequest{
Datacenter: "dc1",
Node: node,
Address: "127.0.0.1",
}
var reply struct{}
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}
}
// Send coordinate updates for a few nodes, waiting a little while for // Send coordinate updates for a few nodes, waiting a little while for
// the batch update to run. // the batch update to run.
arg1 := structs.CoordinateUpdateRequest{ arg1 := structs.CoordinateUpdateRequest{

View File

@ -253,7 +253,7 @@ func (c *consulFSM) applyTombstoneOperation(buf []byte, index uint64) interface{
// update interface that the coordinate endpoint exposes, so we made it single // update interface that the coordinate endpoint exposes, so we made it single
// purpose and avoided the opcode convention. // purpose and avoided the opcode convention.
func (c *consulFSM) applyCoordinateBatchUpdate(buf []byte, index uint64) interface{} { func (c *consulFSM) applyCoordinateBatchUpdate(buf []byte, index uint64) interface{} {
var updates []structs.Coordinate var updates structs.Coordinates
if err := structs.Decode(buf, &updates); err != nil { if err := structs.Decode(buf, &updates); err != nil {
panic(fmt.Errorf("failed to decode batch updates: %v", err)) panic(fmt.Errorf("failed to decode batch updates: %v", err))
} }
@ -362,11 +362,12 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
} }
case structs.CoordinateBatchUpdateType: case structs.CoordinateBatchUpdateType:
var req []structs.Coordinate var req structs.Coordinates
if err := dec.Decode(&req); err != nil { if err := dec.Decode(&req); err != nil {
return err return err
} }
if err := c.state.CoordinateBatchUpdate(header.LastIndex, req); err != nil { if err := restore.Coordinates(header.LastIndex, req); err != nil {
return err return err
} }
@ -472,14 +473,20 @@ func (s *consulSnapshot) persistNodes(sink raft.SnapshotSink,
} }
} }
// Save the coordinates separately so we can use the existing batch // Save the coordinates separately since they are not part of the
// interface. // register request interface. To avoid copying them out, we turn
sink.Write([]byte{byte(structs.CoordinateBatchUpdateType)}) // them into batches with a single coordinate each.
coords := s.state.Coordinates() coords, err := s.state.Coordinates()
if err := encoder.Encode(&coords); err != nil { if err != nil {
return err return err
} }
for coord := coords.Next(); coord != nil; coord = coords.Next() {
sink.Write([]byte{byte(structs.CoordinateBatchUpdateType)})
updates := structs.Coordinates{coord.(*structs.Coordinate)}
if err := encoder.Encode(&updates); err != nil {
return err
}
}
return nil return nil
} }

View File

@ -383,11 +383,19 @@ func TestFSM_SnapshotRestore(t *testing.T) {
t.Fatalf("bad index: %d", idx) t.Fatalf("bad index: %d", idx)
} }
coord := generateRandomCoordinate() updates := structs.Coordinates{
coords := []structs.Coordinate{ &structs.Coordinate{
structs.Coordinate{"foo", coord}, Node: "baz",
Coord: generateRandomCoordinate(),
},
&structs.Coordinate{
Node: "foo",
Coord: generateRandomCoordinate(),
},
}
if err := fsm.state.CoordinateBatchUpdate(13, updates); err != nil {
t.Fatalf("err: %s", err)
} }
fsm.state.CoordinateBatchUpdate(13, coords)
// Snapshot // Snapshot
snap, err := fsm.Snapshot() snap, err := fsm.Snapshot()
@ -499,12 +507,12 @@ func TestFSM_SnapshotRestore(t *testing.T) {
}() }()
// Verify coordinates are restored // Verify coordinates are restored
_, c, err := fsm2.state.CoordinateGet("foo") _, coords, err := fsm2.state.Coordinates()
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %s", err)
} }
if c == nil || !reflect.DeepEqual(c, coord) { if !reflect.DeepEqual(coords, updates) {
t.Fatalf("coordinate is missing or doesn't match, %v != %v", c, coord) t.Fatalf("bad: %#v", coords)
} }
} }
@ -745,19 +753,17 @@ func TestFSM_KVSCheckAndSet(t *testing.T) {
} }
func TestFSM_CoordinateUpdate(t *testing.T) { func TestFSM_CoordinateUpdate(t *testing.T) {
path, err := ioutil.TempDir("", "fsm") fsm, err := NewFSM(nil, os.Stderr)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
defer os.RemoveAll(path)
fsm, err := NewFSM(nil, path, os.Stderr) // Register some nodes.
if err != nil { fsm.state.EnsureNode(1, &structs.Node{Node: "node1", Address: "127.0.0.1"})
t.Fatalf("err: %v", err) fsm.state.EnsureNode(2, &structs.Node{Node: "node2", Address: "127.0.0.1"})
}
defer fsm.Close()
// Write a batch of two coordinates. // Write a batch of two coordinates.
updates := []*structs.Coordinate{ updates := structs.Coordinates{
&structs.Coordinate{ &structs.Coordinate{
Node: "node1", Node: "node1",
Coord: generateRandomCoordinate(), Coord: generateRandomCoordinate(),
@ -777,23 +783,13 @@ func TestFSM_CoordinateUpdate(t *testing.T) {
} }
// Read back the two coordinates to make sure they got updated. // Read back the two coordinates to make sure they got updated.
_, c, err := fsm.state.CoordinateGet(updates[0].Node) _, coords, err := fsm.state.Coordinates()
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %s", err)
} }
if c == nil { if !reflect.DeepEqual(coords, updates) {
t.Fatalf("missing") t.Fatalf("bad: %#v", coords)
} }
verifyCoordinatesEqual(t, c, updates[0].Coord)
_, c, err = fsm.state.CoordinateGet(updates[1].Node)
if err != nil {
t.Fatalf("err: %v", err)
}
if c == nil {
t.Fatalf("missing")
}
verifyCoordinatesEqual(t, c, updates[1].Coord)
} }
func TestFSM_SessionCreate_Destroy(t *testing.T) { func TestFSM_SessionCreate_Destroy(t *testing.T) {

View File

@ -64,13 +64,13 @@ func TestHealth_ChecksInState_DistanceSort(t *testing.T) {
defer codec.Close() defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1") testutil.WaitForLeader(t, s1.RPC, "dc1")
if err := s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.2"}); err != nil { if err := s1.fsm.State().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.2"}); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if err := s1.fsm.State().EnsureNode(2, structs.Node{"bar", "127.0.0.3"}); err != nil { if err := s1.fsm.State().EnsureNode(2, &structs.Node{Node: "bar", Address: "127.0.0.3"}); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
updates := []structs.Coordinate{ updates := structs.Coordinates{
{"foo", generateCoordinate(1 * time.Millisecond)}, {"foo", generateCoordinate(1 * time.Millisecond)},
{"bar", generateCoordinate(2 * time.Millisecond)}, {"bar", generateCoordinate(2 * time.Millisecond)},
} }
@ -228,13 +228,13 @@ func TestHealth_ServiceChecks_DistanceSort(t *testing.T) {
defer codec.Close() defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1") testutil.WaitForLeader(t, s1.RPC, "dc1")
if err := s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.2"}); err != nil { if err := s1.fsm.State().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.2"}); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if err := s1.fsm.State().EnsureNode(2, structs.Node{"bar", "127.0.0.3"}); err != nil { if err := s1.fsm.State().EnsureNode(2, &structs.Node{Node: "bar", Address: "127.0.0.3"}); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
updates := []structs.Coordinate{ updates := structs.Coordinates{
{"foo", generateCoordinate(1 * time.Millisecond)}, {"foo", generateCoordinate(1 * time.Millisecond)},
{"bar", generateCoordinate(2 * time.Millisecond)}, {"bar", generateCoordinate(2 * time.Millisecond)},
} }
@ -399,13 +399,13 @@ func TestHealth_ServiceNodes_DistanceSort(t *testing.T) {
defer codec.Close() defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1") testutil.WaitForLeader(t, s1.RPC, "dc1")
if err := s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.2"}); err != nil { if err := s1.fsm.State().EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.2"}); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
if err := s1.fsm.State().EnsureNode(2, structs.Node{"bar", "127.0.0.3"}); err != nil { if err := s1.fsm.State().EnsureNode(2, &structs.Node{Node: "bar", Address: "127.0.0.3"}); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
updates := []structs.Coordinate{ updates := structs.Coordinates{
{"foo", generateCoordinate(1 * time.Millisecond)}, {"foo", generateCoordinate(1 * time.Millisecond)},
{"bar", generateCoordinate(2 * time.Millisecond)}, {"bar", generateCoordinate(2 * time.Millisecond)},
} }

View File

@ -34,7 +34,7 @@ func (s *Server) newNodeSorter(c *coordinate.Coordinate, nodes structs.Nodes) (s
state := s.fsm.State() state := s.fsm.State()
vec := make([]float64, len(nodes)) vec := make([]float64, len(nodes))
for i, node := range nodes { for i, node := range nodes {
_, coord, err := state.CoordinateGet(node.Node) coord, err := state.CoordinateGetRaw(node.Node)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -73,7 +73,7 @@ func (s *Server) newServiceNodeSorter(c *coordinate.Coordinate, nodes structs.Se
state := s.fsm.State() state := s.fsm.State()
vec := make([]float64, len(nodes)) vec := make([]float64, len(nodes))
for i, node := range nodes { for i, node := range nodes {
_, coord, err := state.CoordinateGet(node.Node) coord, err := state.CoordinateGetRaw(node.Node)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -112,7 +112,7 @@ func (s *Server) newHealthCheckSorter(c *coordinate.Coordinate, checks structs.H
state := s.fsm.State() state := s.fsm.State()
vec := make([]float64, len(checks)) vec := make([]float64, len(checks))
for i, check := range checks { for i, check := range checks {
_, coord, err := state.CoordinateGet(check.Node) coord, err := state.CoordinateGetRaw(check.Node)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -151,7 +151,7 @@ func (s *Server) newCheckServiceNodeSorter(c *coordinate.Coordinate, nodes struc
state := s.fsm.State() state := s.fsm.State()
vec := make([]float64, len(nodes)) vec := make([]float64, len(nodes))
for i, node := range nodes { for i, node := range nodes {
_, coord, err := state.CoordinateGet(node.Node.Node) coord, err := state.CoordinateGetRaw(node.Node.Node)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -217,7 +217,7 @@ func (s *Server) sortNodesByDistanceFrom(source structs.QuerySource, subj interf
// There won't always be a coordinate for the source node. If there's not // There won't always be a coordinate for the source node. If there's not
// one then we can bail out because there's no meaning for the sort. // one then we can bail out because there's no meaning for the sort.
state := s.fsm.State() state := s.fsm.State()
_, coord, err := state.CoordinateGet(source.Node) coord, err := state.CoordinateGetRaw(source.Node)
if err != nil { if err != nil {
return err return err
} }
@ -388,10 +388,7 @@ func getDatacenterMaps(s serfer, dcs []string) []structs.DatacenterMap {
nodes := s.GetNodesForDatacenter(dc) nodes := s.GetNodesForDatacenter(dc)
for _, node := range nodes { for _, node := range nodes {
if coord, ok := s.GetCachedCoordinate(node); ok { if coord, ok := s.GetCachedCoordinate(node); ok {
entry := structs.Coordinate{ entry := &structs.Coordinate{node, coord}
Node: node,
Coord: coord,
}
m.Coordinates = append(m.Coordinates, entry) m.Coordinates = append(m.Coordinates, entry)
} }
} }

View File

@ -1,6 +1,7 @@
package consul package consul
import ( import (
"fmt"
"math" "math"
"net/rpc" "net/rpc"
"os" "os"
@ -87,6 +88,20 @@ func verifyCheckServiceNodeSort(t *testing.T, nodes structs.CheckServiceNodes, e
// 0 1 2 3 4 5 6 7 8 9 10 (ms) // 0 1 2 3 4 5 6 7 8 9 10 (ms)
// //
func seedCoordinates(t *testing.T, codec rpc.ClientCodec, server *Server) { func seedCoordinates(t *testing.T, codec rpc.ClientCodec, server *Server) {
// Register some nodes.
for i := 0; i < 5; i++ {
req := structs.RegisterRequest{
Datacenter: "dc1",
Node: fmt.Sprintf("node%d", i+1),
Address: "127.0.0.1",
}
var reply struct{}
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &reply); err != nil {
t.Fatalf("err: %v", err)
}
}
// Seed the fixed setup of the nodes.
updates := []structs.CoordinateUpdateRequest{ updates := []structs.CoordinateUpdateRequest{
structs.CoordinateUpdateRequest{ structs.CoordinateUpdateRequest{
Datacenter: "dc1", Datacenter: "dc1",
@ -137,12 +152,12 @@ func TestRtt_sortNodesByDistanceFrom(t *testing.T) {
seedCoordinates(t, codec, server) seedCoordinates(t, codec, server)
nodes := structs.Nodes{ nodes := structs.Nodes{
structs.Node{Node: "apple"}, &structs.Node{Node: "apple"},
structs.Node{Node: "node1"}, &structs.Node{Node: "node1"},
structs.Node{Node: "node2"}, &structs.Node{Node: "node2"},
structs.Node{Node: "node3"}, &structs.Node{Node: "node3"},
structs.Node{Node: "node4"}, &structs.Node{Node: "node4"},
structs.Node{Node: "node5"}, &structs.Node{Node: "node5"},
} }
// The zero value for the source should not trigger any sorting. // The zero value for the source should not trigger any sorting.
@ -198,12 +213,12 @@ func TestRtt_sortNodesByDistanceFrom_Nodes(t *testing.T) {
seedCoordinates(t, codec, server) seedCoordinates(t, codec, server)
nodes := structs.Nodes{ nodes := structs.Nodes{
structs.Node{Node: "apple"}, &structs.Node{Node: "apple"},
structs.Node{Node: "node1"}, &structs.Node{Node: "node1"},
structs.Node{Node: "node2"}, &structs.Node{Node: "node2"},
structs.Node{Node: "node3"}, &structs.Node{Node: "node3"},
structs.Node{Node: "node4"}, &structs.Node{Node: "node4"},
structs.Node{Node: "node5"}, &structs.Node{Node: "node5"},
} }
// Now sort relative to node1, note that apple doesn't have any // Now sort relative to node1, note that apple doesn't have any
@ -247,12 +262,12 @@ func TestRtt_sortNodesByDistanceFrom_ServiceNodes(t *testing.T) {
seedCoordinates(t, codec, server) seedCoordinates(t, codec, server)
nodes := structs.ServiceNodes{ nodes := structs.ServiceNodes{
structs.ServiceNode{Node: "apple"}, &structs.ServiceNode{Node: "apple"},
structs.ServiceNode{Node: "node1"}, &structs.ServiceNode{Node: "node1"},
structs.ServiceNode{Node: "node2"}, &structs.ServiceNode{Node: "node2"},
structs.ServiceNode{Node: "node3"}, &structs.ServiceNode{Node: "node3"},
structs.ServiceNode{Node: "node4"}, &structs.ServiceNode{Node: "node4"},
structs.ServiceNode{Node: "node5"}, &structs.ServiceNode{Node: "node5"},
} }
// Now sort relative to node1, note that apple doesn't have any // Now sort relative to node1, note that apple doesn't have any
@ -345,12 +360,12 @@ func TestRtt_sortNodesByDistanceFrom_CheckServiceNodes(t *testing.T) {
seedCoordinates(t, codec, server) seedCoordinates(t, codec, server)
nodes := structs.CheckServiceNodes{ nodes := structs.CheckServiceNodes{
structs.CheckServiceNode{Node: structs.Node{Node: "apple"}}, structs.CheckServiceNode{Node: &structs.Node{Node: "apple"}},
structs.CheckServiceNode{Node: structs.Node{Node: "node1"}}, structs.CheckServiceNode{Node: &structs.Node{Node: "node1"}},
structs.CheckServiceNode{Node: structs.Node{Node: "node2"}}, structs.CheckServiceNode{Node: &structs.Node{Node: "node2"}},
structs.CheckServiceNode{Node: structs.Node{Node: "node3"}}, structs.CheckServiceNode{Node: &structs.Node{Node: "node3"}},
structs.CheckServiceNode{Node: structs.Node{Node: "node4"}}, structs.CheckServiceNode{Node: &structs.Node{Node: "node4"}},
structs.CheckServiceNode{Node: structs.Node{Node: "node5"}}, structs.CheckServiceNode{Node: &structs.Node{Node: "node5"}},
} }
// Now sort relative to node1, note that apple doesn't have any // Now sort relative to node1, note that apple doesn't have any

View File

@ -29,6 +29,7 @@ func stateStoreSchema() *memdb.DBSchema {
sessionsTableSchema, sessionsTableSchema,
sessionChecksTableSchema, sessionChecksTableSchema,
aclsTableSchema, aclsTableSchema,
coordinatesTableSchema,
} }
// Add the tables to the root schema // Add the tables to the root schema
@ -345,3 +346,22 @@ func aclsTableSchema() *memdb.TableSchema {
}, },
} }
} }
// coordinatesTableSchema returns a new table schema used for storing
// network coordinates.
func coordinatesTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: "coordinates",
Indexes: map[string]*memdb.IndexSchema{
"id": &memdb.IndexSchema{
Name: "id",
AllowMissing: false,
Unique: true,
Indexer: &memdb.StringFieldIndex{
Field: "Node",
Lowercase: true,
},
},
},
}
}

View File

@ -8,6 +8,7 @@ import (
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-memdb" "github.com/hashicorp/go-memdb"
"github.com/hashicorp/serf/coordinate"
) )
var ( var (
@ -196,6 +197,15 @@ func (s *StateSnapshot) ACLs() (memdb.ResultIterator, error) {
return iter, nil return iter, nil
} }
// Coordinates is used to pull all the coordinates from the snapshot.
func (s *StateSnapshot) Coordinates() (memdb.ResultIterator, error) {
iter, err := s.tx.Get("coordinates", "id")
if err != nil {
return nil, err
}
return iter, nil
}
// Restore is used to efficiently manage restoring a large amount of data into // Restore is used to efficiently manage restoring a large amount of data into
// the state store. It works by doing all the restores inside of a single // the state store. It works by doing all the restores inside of a single
// transaction. // transaction.
@ -299,6 +309,24 @@ func (s *StateRestore) ACL(acl *structs.ACL) error {
return nil return nil
} }
// Coordinates is used when restoring from a snapshot. For general inserts, use
// CoordinateBatchUpdate. We do less vetting of the updates here because they
// already got checked on the way in during a batch update.
func (s *StateRestore) Coordinates(idx uint64, updates structs.Coordinates) error {
for _, update := range updates {
if err := s.tx.Insert("coordinates", update); err != nil {
return fmt.Errorf("failed restoring coordinate: %s", err)
}
}
if err := indexUpdateMaxTxn(s.tx, idx, "coordinates"); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
s.watches.Arm("coordinates")
return nil
}
// maxIndex is a helper used to retrieve the highest known index // maxIndex is a helper used to retrieve the highest known index
// amongst a set of tables in the db. // amongst a set of tables in the db.
func (s *StateStore) maxIndex(tables ...string) uint64 { func (s *StateStore) maxIndex(tables ...string) uint64 {
@ -379,6 +407,8 @@ func (s *StateStore) getWatchTables(method string) []string {
return []string{"sessions"} return []string{"sessions"}
case "ACLGet", "ACLList": case "ACLGet", "ACLList":
return []string{"acls"} return []string{"acls"}
case "Coordinates":
return []string{"coordinates"}
} }
panic(fmt.Sprintf("Unknown method %s", method)) panic(fmt.Sprintf("Unknown method %s", method))
@ -583,7 +613,6 @@ func (s *StateStore) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeID string) err
// Use a watch manager since the inner functions can perform multiple // Use a watch manager since the inner functions can perform multiple
// ops per table. // ops per table.
watches := NewDumbWatchManager(s.tableWatches) watches := NewDumbWatchManager(s.tableWatches)
watches.Arm("nodes")
// Delete all services associated with the node and update the service index. // Delete all services associated with the node and update the service index.
services, err := tx.Get("services", "node", nodeID) services, err := tx.Get("services", "node", nodeID)
@ -620,6 +649,21 @@ func (s *StateStore) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeID string) err
} }
} }
// Delete any coordinate associated with this node.
coord, err := tx.First("coordinates", "id", nodeID)
if err != nil {
return fmt.Errorf("failed coordinate lookup: %s", err)
}
if coord != nil {
if err := tx.Delete("coordinates", coord); err != nil {
return fmt.Errorf("failed deleting coordinate: %s", err)
}
if err := tx.Insert("index", &IndexEntry{"coordinates", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
watches.Arm("coordinates")
}
// Delete the node and update the index. // Delete the node and update the index.
if err := tx.Delete("nodes", node); err != nil { if err := tx.Delete("nodes", node); err != nil {
return fmt.Errorf("failed deleting node: %s", err) return fmt.Errorf("failed deleting node: %s", err)
@ -645,6 +689,7 @@ func (s *StateStore) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeID string) err
} }
} }
watches.Arm("nodes")
tx.Defer(func() { watches.Notify() }) tx.Defer(func() { watches.Notify() })
return nil return nil
} }
@ -2231,3 +2276,84 @@ func (s *StateStore) aclDeleteTxn(tx *memdb.Txn, idx uint64, aclID string) error
tx.Defer(func() { s.tableWatches["acls"].Notify() }) tx.Defer(func() { s.tableWatches["acls"].Notify() })
return nil return nil
} }
// CoordinateGetRaw queries for the coordinate of the given node. This is an
// unusual state store method because it just returns the raw coordinate or
// nil, none of the Raft or node information is returned. This hits the 90%
// internal-to-Consul use case for this data, and this isn't exposed via an
// endpoint, so it doesn't matter that the Raft info isn't available.
func (s *StateStore) CoordinateGetRaw(node string) (*coordinate.Coordinate, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Pull the full coordinate entry.
coord, err := tx.First("coordinates", "id", node)
if err != nil {
return nil, fmt.Errorf("failed coordinate lookup: %s", err)
}
// Pick out just the raw coordinate.
if coord != nil {
return coord.(*structs.Coordinate).Coord, nil
}
return nil, nil
}
// Coordinates queries for all nodes with coordinates.
func (s *StateStore) Coordinates() (uint64, structs.Coordinates, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, s.getWatchTables("Coordinates")...)
// Pull all the coordinates.
coords, err := tx.Get("coordinates", "id")
if err != nil {
return 0, nil, fmt.Errorf("failed coordinate lookup: %s", err)
}
var results structs.Coordinates
for coord := coords.Next(); coord != nil; coord = coords.Next() {
results = append(results, coord.(*structs.Coordinate))
}
return idx, results, nil
}
// CoordinateBatchUpdate processes a batch of coordinate updates and applies
// them in a single transaction.
func (s *StateStore) CoordinateBatchUpdate(idx uint64, updates structs.Coordinates) error {
tx := s.db.Txn(true)
defer tx.Abort()
// Upsert the coordinates.
for _, update := range updates {
// Since the cleanup of coordinates is tied to deletion of
// nodes, we silently drop any updates for nodes that we don't
// know about. This might be possible during normal operation
// if we happen to get a coordinate update for a node that
// hasn't been able to add itself to the catalog yet. Since we
// don't carefully sequence this, and since it will fix itself
// on the next coordinate update from that node, we don't return
// an error or log anything.
node, err := tx.First("nodes", "id", update.Node)
if err != nil {
return fmt.Errorf("failed node lookup: %s", err)
}
if node == nil {
continue
}
if err := tx.Insert("coordinates", update); err != nil {
return fmt.Errorf("failed inserting coordinate: %s", err)
}
}
// Update the index.
if err := tx.Insert("index", &IndexEntry{"coordinates", idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
tx.Defer(func() { s.tableWatches["coordinates"].Notify() })
tx.Commit()
return nil
}

View File

@ -3,6 +3,7 @@ package state
import ( import (
crand "crypto/rand" crand "crypto/rand"
"fmt" "fmt"
"math/rand"
"reflect" "reflect"
"sort" "sort"
"strings" "strings"
@ -10,6 +11,7 @@ import (
"time" "time"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/serf/coordinate"
) )
func testUUID() string { func testUUID() string {
@ -999,20 +1001,31 @@ func TestStateStore_Node_Watches(t *testing.T) {
} }
}) })
// Check that a delete of a node + service + check triggers all three // Check that a delete of a node + service + check + coordinate triggers
// tables in one shot. // all tables in one shot.
testRegisterNode(t, s, 4, "node1") testRegisterNode(t, s, 4, "node1")
testRegisterService(t, s, 5, "node1", "service1") testRegisterService(t, s, 5, "node1", "service1")
testRegisterCheck(t, s, 6, "node1", "service1", "check3", structs.HealthPassing) testRegisterCheck(t, s, 6, "node1", "service1", "check3", structs.HealthPassing)
updates := structs.Coordinates{
&structs.Coordinate{
Node: "node1",
Coord: generateRandomCoordinate(),
},
}
if err := s.CoordinateBatchUpdate(7, updates); err != nil {
t.Fatalf("err: %s", err)
}
verifyWatch(t, s.getTableWatch("nodes"), func() { verifyWatch(t, s.getTableWatch("nodes"), func() {
verifyWatch(t, s.getTableWatch("services"), func() { verifyWatch(t, s.getTableWatch("services"), func() {
verifyWatch(t, s.getTableWatch("checks"), func() { verifyWatch(t, s.getTableWatch("checks"), func() {
verifyWatch(t, s.getTableWatch("coordinates"), func() {
if err := s.DeleteNode(7, "node1"); err != nil { if err := s.DeleteNode(7, "node1"); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
}) })
}) })
}) })
})
} }
func TestStateStore_EnsureService(t *testing.T) { func TestStateStore_EnsureService(t *testing.T) {
@ -4721,3 +4734,291 @@ func TestStateStore_ACL_Watches(t *testing.T) {
restore.Commit() restore.Commit()
}) })
} }
// generateRandomCoordinate creates a random coordinate. This mucks with the
// underlying structure directly, so it's not really useful for any particular
// position in the network, but it's a good payload to send through to make
// sure things come out the other side or get stored correctly.
func generateRandomCoordinate() *coordinate.Coordinate {
config := coordinate.DefaultConfig()
coord := coordinate.NewCoordinate(config)
for i := range coord.Vec {
coord.Vec[i] = rand.NormFloat64()
}
coord.Error = rand.NormFloat64()
coord.Adjustment = rand.NormFloat64()
return coord
}
func TestStateStore_Coordinate_Updates(t *testing.T) {
s := testStateStore(t)
// Make sure the coordinates list starts out empty, and that a query for
// a raw coordinate for a nonexistent node doesn't do anything bad.
idx, coords, err := s.Coordinates()
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 0 {
t.Fatalf("bad index: %d", idx)
}
if coords != nil {
t.Fatalf("bad: %#v", coords)
}
coord, err := s.CoordinateGetRaw("nope")
if err != nil {
t.Fatalf("err: %s", err)
}
if coord != nil {
t.Fatalf("bad: %#v", coord)
}
// Make an update for nodes that don't exist and make sure they get
// ignored.
updates := structs.Coordinates{
&structs.Coordinate{
Node: "node1",
Coord: generateRandomCoordinate(),
},
&structs.Coordinate{
Node: "node2",
Coord: generateRandomCoordinate(),
},
}
if err := s.CoordinateBatchUpdate(1, updates); err != nil {
t.Fatalf("err: %s", err)
}
// Should still be empty, though applying an empty batch does bump
// the table index.
idx, coords, err = s.Coordinates()
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 1 {
t.Fatalf("bad index: %d", idx)
}
if coords != nil {
t.Fatalf("bad: %#v", coords)
}
// Register the nodes then do the update again.
testRegisterNode(t, s, 1, "node1")
testRegisterNode(t, s, 2, "node2")
if err := s.CoordinateBatchUpdate(3, updates); err != nil {
t.Fatalf("err: %s", err)
}
// Should go through now.
idx, coords, err = s.Coordinates()
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 3 {
t.Fatalf("bad index: %d", idx)
}
if !reflect.DeepEqual(coords, updates) {
t.Fatalf("bad: %#v", coords)
}
// Also verify the raw coordinate interface.
for _, update := range updates {
coord, err := s.CoordinateGetRaw(update.Node)
if err != nil {
t.Fatalf("err: %s", err)
}
if !reflect.DeepEqual(coord, update.Coord) {
t.Fatalf("bad: %#v", coord)
}
}
// Update the coordinate for one of the nodes.
updates[1].Coord = generateRandomCoordinate()
if err := s.CoordinateBatchUpdate(4, updates); err != nil {
t.Fatalf("err: %s", err)
}
// Verify it got applied.
idx, coords, err = s.Coordinates()
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 4 {
t.Fatalf("bad index: %d", idx)
}
if !reflect.DeepEqual(coords, updates) {
t.Fatalf("bad: %#v", coords)
}
// And check the raw coordinate version of the same thing.
for _, update := range updates {
coord, err := s.CoordinateGetRaw(update.Node)
if err != nil {
t.Fatalf("err: %s", err)
}
if !reflect.DeepEqual(coord, update.Coord) {
t.Fatalf("bad: %#v", coord)
}
}
}
func TestStateStore_Coordinate_Cleanup(t *testing.T) {
s := testStateStore(t)
// Register a node and update its coordinate.
testRegisterNode(t, s, 1, "node1")
updates := structs.Coordinates{
&structs.Coordinate{
Node: "node1",
Coord: generateRandomCoordinate(),
},
}
if err := s.CoordinateBatchUpdate(2, updates); err != nil {
t.Fatalf("err: %s", err)
}
// Make sure it's in there.
coord, err := s.CoordinateGetRaw("node1")
if err != nil {
t.Fatalf("err: %s", err)
}
if !reflect.DeepEqual(coord, updates[0].Coord) {
t.Fatalf("bad: %#v", coord)
}
// Now delete the node.
if err := s.DeleteNode(3, "node1"); err != nil {
t.Fatalf("err: %s", err)
}
// Make sure the coordinate is gone.
coord, err = s.CoordinateGetRaw("node1")
if err != nil {
t.Fatalf("err: %s", err)
}
if coord != nil {
t.Fatalf("bad: %#v", coord)
}
// Make sure the index got updated.
idx, coords, err := s.Coordinates()
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 3 {
t.Fatalf("bad index: %d", idx)
}
if coords != nil {
t.Fatalf("bad: %#v", coords)
}
}
func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) {
s := testStateStore(t)
// Register two nodes and update their coordinates.
testRegisterNode(t, s, 1, "node1")
testRegisterNode(t, s, 2, "node2")
updates := structs.Coordinates{
&structs.Coordinate{
Node: "node1",
Coord: generateRandomCoordinate(),
},
&structs.Coordinate{
Node: "node2",
Coord: generateRandomCoordinate(),
},
}
if err := s.CoordinateBatchUpdate(3, updates); err != nil {
t.Fatalf("err: %s", err)
}
// Snapshot the coordinates.
snap := s.Snapshot()
defer snap.Close()
// Alter the real state store.
trash := structs.Coordinates{
&structs.Coordinate{
Node: "node1",
Coord: generateRandomCoordinate(),
},
&structs.Coordinate{
Node: "node2",
Coord: generateRandomCoordinate(),
},
}
if err := s.CoordinateBatchUpdate(4, trash); err != nil {
t.Fatalf("err: %s", err)
}
// Verify the snapshot.
if idx := snap.LastIndex(); idx != 3 {
t.Fatalf("bad index: %d", idx)
}
iter, err := snap.Coordinates()
if err != nil {
t.Fatalf("err: %s", err)
}
var dump structs.Coordinates
for coord := iter.Next(); coord != nil; coord = iter.Next() {
dump = append(dump, coord.(*structs.Coordinate))
}
if !reflect.DeepEqual(dump, updates) {
t.Fatalf("bad: %#v", dump)
}
// Restore the values into a new state store.
func() {
s := testStateStore(t)
restore := s.Restore()
if err := restore.Coordinates(5, dump); err != nil {
t.Fatalf("err: %s", err)
}
restore.Commit()
// Read the restored coordinates back out and verify that they match.
idx, res, err := s.Coordinates()
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != 5 {
t.Fatalf("bad index: %d", idx)
}
if !reflect.DeepEqual(res, updates) {
t.Fatalf("bad: %#v", res)
}
// Check that the index was updated (note that it got passed
// in during the restore).
if idx := s.maxIndex("coordinates"); idx != 5 {
t.Fatalf("bad index: %d", idx)
}
}()
}
func TestStateStore_Coordinate_Watches(t *testing.T) {
s := testStateStore(t)
testRegisterNode(t, s, 1, "node1")
// Call functions that update the coordinates table and make sure a watch fires
// each time.
verifyWatch(t, s.getTableWatch("coordinates"), func() {
updates := structs.Coordinates{
&structs.Coordinate{
Node: "node1",
Coord: generateRandomCoordinate(),
},
}
if err := s.CoordinateBatchUpdate(2, updates); err != nil {
t.Fatalf("err: %s", err)
}
})
verifyWatch(t, s.getTableWatch("coordinates"), func() {
if err := s.DeleteNode(3, "node1"); err != nil {
t.Fatalf("err: %s", err)
}
})
}

View File

@ -637,6 +637,8 @@ type Coordinate struct {
Coord *coordinate.Coordinate Coord *coordinate.Coordinate
} }
type Coordinates []*Coordinate
// IndexedCoordinate is used to represent a single node's coordinate from the state // IndexedCoordinate is used to represent a single node's coordinate from the state
// store. // store.
type IndexedCoordinate struct { type IndexedCoordinate struct {
@ -647,7 +649,7 @@ type IndexedCoordinate struct {
// IndexedCoordinates is used to represent a list of nodes and their // IndexedCoordinates is used to represent a list of nodes and their
// corresponding raw coordinates. // corresponding raw coordinates.
type IndexedCoordinates struct { type IndexedCoordinates struct {
Coordinates []Coordinate Coordinates Coordinates
QueryMeta QueryMeta
} }
@ -655,7 +657,7 @@ type IndexedCoordinates struct {
// associated with a datacenter. // associated with a datacenter.
type DatacenterMap struct { type DatacenterMap struct {
Datacenter string Datacenter string
Coordinates []Coordinate Coordinates Coordinates
} }
// CoordinateUpdateRequest is used to update the network coordinate of a given // CoordinateUpdateRequest is used to update the network coordinate of a given