mirror of https://github.com/hashicorp/consul
Adds snapshot save and restore of coordinates.
parent
5f754c4a87
commit
e094f5a61d
|
@ -253,7 +253,7 @@ func (c *consulFSM) applyTombstoneOperation(buf []byte, index uint64) interface{
|
|||
// update interface that the coordinate endpoint exposes, so we made it single
|
||||
// purpose and avoided the opcode convention.
|
||||
func (c *consulFSM) applyCoordinateBatchUpdate(buf []byte, index uint64) interface{} {
|
||||
var updates []*structs.Coordinate
|
||||
var updates []structs.Coordinate
|
||||
if err := structs.Decode(buf, &updates); err != nil {
|
||||
panic(fmt.Errorf("failed to decode batch updates: %v", err))
|
||||
}
|
||||
|
@ -361,6 +361,15 @@ func (c *consulFSM) Restore(old io.ReadCloser) error {
|
|||
return err
|
||||
}
|
||||
|
||||
case structs.CoordinateBatchUpdateType:
|
||||
var req []structs.Coordinate
|
||||
if err := dec.Decode(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := c.state.CoordinateBatchUpdate(header.LastIndex, req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
default:
|
||||
return fmt.Errorf("Unrecognized msg type: %v", msgType)
|
||||
}
|
||||
|
@ -462,6 +471,15 @@ func (s *consulSnapshot) persistNodes(sink raft.SnapshotSink,
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Save the coordinates separately so we can use the existing batch
|
||||
// interface.
|
||||
sink.Write([]byte{byte(structs.CoordinateBatchUpdateType)})
|
||||
coords := s.state.Coordinates()
|
||||
if err := encoder.Encode(&coords); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@ package consul
|
|||
import (
|
||||
"bytes"
|
||||
"os"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/consul/state"
|
||||
|
@ -382,6 +383,12 @@ func TestFSM_SnapshotRestore(t *testing.T) {
|
|||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
|
||||
coord := generateRandomCoordinate()
|
||||
coords := []structs.Coordinate {
|
||||
structs.Coordinate{"foo", coord},
|
||||
}
|
||||
fsm.state.CoordinateBatchUpdate(13, coords)
|
||||
|
||||
// Snapshot
|
||||
snap, err := fsm.Snapshot()
|
||||
if err != nil {
|
||||
|
@ -490,6 +497,15 @@ func TestFSM_SnapshotRestore(t *testing.T) {
|
|||
t.Fatalf("unexpected extra tombstones")
|
||||
}
|
||||
}()
|
||||
|
||||
// Verify coordinates are restored
|
||||
_, c, err := fsm2.state.CoordinateGet("foo")
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if c == nil || !reflect.DeepEqual(c, coord) {
|
||||
t.Fatalf("coordinate is missing or doesn't match, %v != %v", c, coord)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFSM_KVSSet(t *testing.T) {
|
||||
|
|
Loading…
Reference in New Issue