Fix fsm serialization and add snapshot/restore

pull/5539/head
Kyle Havlovitz 6 years ago
parent 17aa6a5a34
commit d92577c16b

@ -29,7 +29,8 @@ func init() {
registerCommand(structs.ACLPolicySetRequestType, (*FSM).applyACLPolicySetOperation)
registerCommand(structs.ACLPolicyDeleteRequestType, (*FSM).applyACLPolicyDeleteOperation)
registerCommand(structs.ConnectCALeafRequestType, (*FSM).applyConnectCALeafOperation)
registerCommand(structs.ConfigEntryRequestType, (*FSM).applyConfigEntryOperation)
registerCommand(structs.ServiceConfigEntryRequestType, (*FSM).applyServiceConfigEntryOperation)
registerCommand(structs.ProxyConfigEntryRequestType, (*FSM).applyProxyConfigEntryOperation)
}
func (c *FSM) applyRegister(buf []byte, index uint64) interface{} {
@ -430,18 +431,37 @@ func (c *FSM) applyACLPolicyDeleteOperation(buf []byte, index uint64) interface{
return c.state.ACLPolicyBatchDelete(index, req.PolicyIDs)
}
func (c *FSM) applyConfigEntryOperation(buf []byte, index uint64) interface{} {
var req structs.ConfigEntryRequest
func (c *FSM) applyServiceConfigEntryOperation(buf []byte, index uint64) interface{} {
req := structs.ConfigEntryRequest{
Entry: &structs.ServiceConfigEntry{},
}
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
return c.applyConfigEntryOperation(index, req)
}
func (c *FSM) applyProxyConfigEntryOperation(buf []byte, index uint64) interface{} {
req := structs.ConfigEntryRequest{
Entry: &structs.ProxyConfigEntry{},
}
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := c.applyConfigEntryOperation(index, req); err != nil {
panic(err)
}
return true
}
func (c *FSM) applyConfigEntryOperation(index uint64, req structs.ConfigEntryRequest) error {
switch req.Op {
case structs.ConfigEntryUpsert:
defer metrics.MeasureSinceWithLabels([]string{"fsm", "config_entry"}, time.Now(),
defer metrics.MeasureSinceWithLabels([]string{"fsm", "config_entry", req.Entry.GetKind()}, time.Now(),
[]metrics.Label{{Name: "op", Value: "upsert"}})
return c.state.EnsureConfigEntry(index, req.Entry)
case structs.ConfigEntryDelete:
defer metrics.MeasureSinceWithLabels([]string{"fsm", "config_entry"}, time.Now(),
defer metrics.MeasureSinceWithLabels([]string{"fsm", "config_entry", req.Entry.GetKind()}, time.Now(),
[]metrics.Label{{Name: "op", Value: "delete"}})
return c.state.DeleteConfigEntry(req.Entry.GetKind(), req.Entry.GetName())
default:

@ -1379,7 +1379,7 @@ func TestFSM_ConfigEntry(t *testing.T) {
}
{
buf, err := structs.Encode(structs.ConfigEntryRequestType, req)
buf, err := structs.Encode(structs.ProxyConfigEntryRequestType, req)
assert.Nil(err)
assert.True(fsm.Apply(makeLog(buf)).(bool))
}
@ -1388,6 +1388,8 @@ func TestFSM_ConfigEntry(t *testing.T) {
{
_, config, err := fsm.state.ConfigEntry(structs.ProxyDefaults, "global")
assert.Nil(err)
entry.RaftIndex.CreateIndex = 1
entry.RaftIndex.ModifyIndex = 1
assert.Equal(entry, config)
}
}

@ -1,6 +1,8 @@
package fsm
import (
"fmt"
"github.com/hashicorp/consul/agent/consul/autopilot"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
@ -27,6 +29,7 @@ func init() {
registerRestorer(structs.IndexRequestType, restoreIndex)
registerRestorer(structs.ACLTokenSetRequestType, restoreToken)
registerRestorer(structs.ACLPolicySetRequestType, restorePolicy)
registerRestorer(structs.ConfigEntryRequestType, restoreConfigEntry)
}
func persistOSS(s *snapshot, sink raft.SnapshotSink, encoder *codec.Encoder) error {
@ -63,6 +66,9 @@ func persistOSS(s *snapshot, sink raft.SnapshotSink, encoder *codec.Encoder) err
if err := s.persistConnectCAConfig(sink, encoder); err != nil {
return err
}
if err := s.persistConfigEntries(sink, encoder); err != nil {
return err
}
if err := s.persistIndex(sink, encoder); err != nil {
return err
}
@ -360,6 +366,27 @@ func (s *snapshot) persistIntentions(sink raft.SnapshotSink,
return nil
}
func (s *snapshot) persistConfigEntries(sink raft.SnapshotSink,
encoder *codec.Encoder) error {
entries, err := s.state.ConfigEntries()
if err != nil {
return err
}
for _, entry := range entries {
if _, err := sink.Write([]byte{byte(structs.ConfigEntryRequestType)}); err != nil {
return err
}
if err := encoder.Encode(entry.GetKind()); err != nil {
return err
}
if err := encoder.Encode(entry); err != nil {
return err
}
}
return nil
}
func (s *snapshot) persistIndex(sink raft.SnapshotSink, encoder *codec.Encoder) error {
// Get all the indexes
iter, err := s.state.Indexes()
@ -565,3 +592,25 @@ func restorePolicy(header *snapshotHeader, restore *state.Restore, decoder *code
}
return restore.ACLPolicy(&req)
}
func restoreConfigEntry(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
var req structs.ConfigEntry
var kind string
if err := decoder.Decode(&kind); err != nil {
return err
}
switch kind {
case structs.ServiceDefaults:
req = &structs.ServiceConfigEntry{}
case structs.ProxyDefaults:
req = &structs.ProxyConfigEntry{}
default:
return fmt.Errorf("invalid config type: %s", kind)
}
if err := decoder.Decode(&req); err != nil {
return err
}
return restore.ConfigEntry(req)
}

@ -202,6 +202,19 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
err = fsm.state.CASetConfig(17, caConfig)
assert.Nil(err)
// Config entries
serviceConfig := &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "foo",
Protocol: "http",
}
proxyConfig := &structs.ProxyConfigEntry{
Kind: structs.ProxyDefaults,
Name: "global",
}
assert.Nil(fsm.state.EnsureConfigEntry(18, serviceConfig))
assert.Nil(fsm.state.EnsureConfigEntry(19, proxyConfig))
// Snapshot
snap, err := fsm.Snapshot()
if err != nil {
@ -388,6 +401,15 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
assert.Nil(err)
assert.Equal(caConfig, caConf)
// Verify config entries are restored
_, serviceConfEntry, err := fsm2.state.ConfigEntry(structs.ServiceDefaults, "foo")
assert.Nil(err)
assert.Equal(serviceConfig, serviceConfEntry)
_, proxyConfEntry, err := fsm2.state.ConfigEntry(structs.ProxyDefaults, "global")
assert.Nil(err)
assert.Equal(proxyConfig, proxyConfEntry)
// Snapshot
snap, err = fsm2.Snapshot()
if err != nil {

@ -57,7 +57,7 @@ func (s *Snapshot) ConfigEntries() ([]structs.ConfigEntry, error) {
return ret, nil
}
// Configuration is used when restoring from a snapshot.
// ConfigEntry is used when restoring from a snapshot.
func (s *Restore) ConfigEntry(c structs.ConfigEntry) error {
// Insert
if err := s.tx.Insert(configTableName, c); err != nil {
@ -70,7 +70,7 @@ func (s *Restore) ConfigEntry(c structs.ConfigEntry) error {
return nil
}
// Configuration is called to get a given config entry.
// ConfigEntry is called to get a given config entry.
func (s *Store) ConfigEntry(kind, name string) (uint64, structs.ConfigEntry, error) {
tx := s.db.Txn(true)
defer tx.Abort()
@ -95,6 +95,27 @@ func (s *Store) ConfigEntry(kind, name string) (uint64, structs.ConfigEntry, err
return idx, conf, nil
}
// ConfigEntries is called to get all config entry objects.
func (s *Store) ConfigEntries() (uint64, []structs.ConfigEntry, error) {
tx := s.db.Txn(true)
defer tx.Abort()
// Get the index
idx := maxIndexTxn(tx, configTableName)
// Get all
iter, err := tx.Get(configTableName, "id")
if err != nil {
return 0, nil, fmt.Errorf("failed config entry lookup: %s", err)
}
var results []structs.ConfigEntry
for v := iter.Next(); v != nil; v = iter.Next() {
results = append(results, v.(structs.ConfigEntry))
}
return idx, results, nil
}
// EnsureConfigEntry is called to upsert creation of a given config entry.
func (s *Store) EnsureConfigEntry(idx uint64, conf structs.ConfigEntry) error {
tx := s.db.Txn(true)

@ -29,10 +29,26 @@ type ServiceConfigEntry struct {
RaftIndex
}
func (s *ServiceConfigEntry) GetKind() string {
func (e *ServiceConfigEntry) GetKind() string {
return ServiceDefaults
}
func (e *ServiceConfigEntry) GetName() string {
return e.Name
}
func (e *ServiceConfigEntry) Normalize() error {
return nil
}
func (e *ServiceConfigEntry) Validate() error {
return nil
}
func (e *ServiceConfigEntry) GetRaftIndex() *RaftIndex {
return &e.RaftIndex
}
type ConnectConfiguration struct {
SidecarProxy bool
}
@ -75,24 +91,24 @@ type ProxyConfigEntry struct {
RaftIndex
}
func (p *ProxyConfigEntry) GetKind() string {
func (e *ProxyConfigEntry) GetKind() string {
return ProxyDefaults
}
func (p *ProxyConfigEntry) GetName() string {
return p.Name
func (e *ProxyConfigEntry) GetName() string {
return e.Name
}
func (p *ProxyConfigEntry) Normalize() error {
func (e *ProxyConfigEntry) Normalize() error {
return nil
}
func (p *ProxyConfigEntry) Validate() error {
func (e *ProxyConfigEntry) Validate() error {
return nil
}
func (p *ProxyConfigEntry) GetRaftIndex() *RaftIndex {
return &p.RaftIndex
func (e *ProxyConfigEntry) GetRaftIndex() *RaftIndex {
return &e.RaftIndex
}
type ConfigEntryOp string

@ -33,29 +33,31 @@ type RaftIndex struct {
// These are serialized between Consul servers and stored in Consul snapshots,
// so entries must only ever be added.
const (
RegisterRequestType MessageType = 0
DeregisterRequestType = 1
KVSRequestType = 2
SessionRequestType = 3
ACLRequestType = 4 // DEPRECATED (ACL-Legacy-Compat)
TombstoneRequestType = 5
CoordinateBatchUpdateType = 6
PreparedQueryRequestType = 7
TxnRequestType = 8
AutopilotRequestType = 9
AreaRequestType = 10
ACLBootstrapRequestType = 11
IntentionRequestType = 12
ConnectCARequestType = 13
ConnectCAProviderStateType = 14
ConnectCAConfigType = 15 // FSM snapshots only.
IndexRequestType = 16 // FSM snapshots only.
ACLTokenSetRequestType = 17
ACLTokenDeleteRequestType = 18
ACLPolicySetRequestType = 19
ACLPolicyDeleteRequestType = 20
ConnectCALeafRequestType = 21
ConfigEntryRequestType = 22
RegisterRequestType MessageType = 0
DeregisterRequestType = 1
KVSRequestType = 2
SessionRequestType = 3
ACLRequestType = 4 // DEPRECATED (ACL-Legacy-Compat)
TombstoneRequestType = 5
CoordinateBatchUpdateType = 6
PreparedQueryRequestType = 7
TxnRequestType = 8
AutopilotRequestType = 9
AreaRequestType = 10
ACLBootstrapRequestType = 11
IntentionRequestType = 12
ConnectCARequestType = 13
ConnectCAProviderStateType = 14
ConnectCAConfigType = 15 // FSM snapshots only.
IndexRequestType = 16 // FSM snapshots only.
ACLTokenSetRequestType = 17
ACLTokenDeleteRequestType = 18
ACLPolicySetRequestType = 19
ACLPolicyDeleteRequestType = 20
ConnectCALeafRequestType = 21
ConfigEntryRequestType = 22 // FSM snapshots only.
ServiceConfigEntryRequestType = 23
ProxyConfigEntryRequestType = 24
)
const (

Loading…
Cancel
Save