mirror of https://github.com/hashicorp/consul
Encode config entry FSM messages in a generic type
parent
f6df5c9b3b
commit
d16be2e269
|
@ -29,8 +29,7 @@ func init() {
|
||||||
registerCommand(structs.ACLPolicySetRequestType, (*FSM).applyACLPolicySetOperation)
|
registerCommand(structs.ACLPolicySetRequestType, (*FSM).applyACLPolicySetOperation)
|
||||||
registerCommand(structs.ACLPolicyDeleteRequestType, (*FSM).applyACLPolicyDeleteOperation)
|
registerCommand(structs.ACLPolicyDeleteRequestType, (*FSM).applyACLPolicyDeleteOperation)
|
||||||
registerCommand(structs.ConnectCALeafRequestType, (*FSM).applyConnectCALeafOperation)
|
registerCommand(structs.ConnectCALeafRequestType, (*FSM).applyConnectCALeafOperation)
|
||||||
registerCommand(structs.ServiceConfigEntryRequestType, (*FSM).applyServiceConfigEntryOperation)
|
registerCommand(structs.ConfigEntryRequestType, (*FSM).applyConfigEntryOperation)
|
||||||
registerCommand(structs.ProxyConfigEntryRequestType, (*FSM).applyProxyConfigEntryOperation)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *FSM) applyRegister(buf []byte, index uint64) interface{} {
|
func (c *FSM) applyRegister(buf []byte, index uint64) interface{} {
|
||||||
|
@ -431,30 +430,14 @@ func (c *FSM) applyACLPolicyDeleteOperation(buf []byte, index uint64) interface{
|
||||||
return c.state.ACLPolicyBatchDelete(index, req.PolicyIDs)
|
return c.state.ACLPolicyBatchDelete(index, req.PolicyIDs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *FSM) applyServiceConfigEntryOperation(buf []byte, index uint64) interface{} {
|
func (c *FSM) applyConfigEntryOperation(buf []byte, index uint64) interface{} {
|
||||||
req := structs.ConfigEntryRequest{
|
|
||||||
Entry: &structs.ServiceConfigEntry{},
|
|
||||||
}
|
|
||||||
if err := structs.Decode(buf, &req); err != nil {
|
|
||||||
panic(fmt.Errorf("failed to decode request: %v", err))
|
|
||||||
}
|
|
||||||
return c.applyConfigEntryOperation(index, req)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *FSM) applyProxyConfigEntryOperation(buf []byte, index uint64) interface{} {
|
|
||||||
req := structs.ConfigEntryRequest{
|
req := structs.ConfigEntryRequest{
|
||||||
Entry: &structs.ProxyConfigEntry{},
|
Entry: &structs.ProxyConfigEntry{},
|
||||||
}
|
}
|
||||||
if err := structs.Decode(buf, &req); err != nil {
|
if err := structs.Decode(buf, &req); err != nil {
|
||||||
panic(fmt.Errorf("failed to decode request: %v", err))
|
panic(fmt.Errorf("failed to decode request: %v", err))
|
||||||
}
|
}
|
||||||
if err := c.applyConfigEntryOperation(index, req); err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *FSM) applyConfigEntryOperation(index uint64, req structs.ConfigEntryRequest) error {
|
|
||||||
switch req.Op {
|
switch req.Op {
|
||||||
case structs.ConfigEntryUpsert:
|
case structs.ConfigEntryUpsert:
|
||||||
defer metrics.MeasureSinceWithLabels([]string{"fsm", "config_entry", req.Entry.GetKind()}, time.Now(),
|
defer metrics.MeasureSinceWithLabels([]string{"fsm", "config_entry", req.Entry.GetKind()}, time.Now(),
|
||||||
|
|
|
@ -1368,21 +1368,24 @@ func TestFSM_ConfigEntry(t *testing.T) {
|
||||||
entry := &structs.ProxyConfigEntry{
|
entry := &structs.ProxyConfigEntry{
|
||||||
Kind: structs.ProxyDefaults,
|
Kind: structs.ProxyDefaults,
|
||||||
Name: "global",
|
Name: "global",
|
||||||
ProxyConfig: structs.ConnectProxyConfig{
|
Config: map[string]interface{}{
|
||||||
DestinationServiceName: "foo",
|
"DestinationServiceName": "foo",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a new request.
|
// Create a new request.
|
||||||
req := structs.ConfigEntryRequest{
|
req := &structs.ConfigEntryRequest{
|
||||||
Op: structs.ConfigEntryUpsert,
|
Op: structs.ConfigEntryUpsert,
|
||||||
Entry: entry,
|
Entry: entry,
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
buf, err := structs.Encode(structs.ProxyConfigEntryRequestType, req)
|
buf, err := structs.Encode(structs.ConfigEntryRequestType, req)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.True(fsm.Apply(makeLog(buf)).(bool))
|
resp := fsm.Apply(makeLog(buf))
|
||||||
|
if _, ok := resp.(error); ok {
|
||||||
|
t.Fatalf("bad: %v", resp)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify it's in the state store.
|
// Verify it's in the state store.
|
||||||
|
@ -1391,6 +1394,14 @@ func TestFSM_ConfigEntry(t *testing.T) {
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
entry.RaftIndex.CreateIndex = 1
|
entry.RaftIndex.CreateIndex = 1
|
||||||
entry.RaftIndex.ModifyIndex = 1
|
entry.RaftIndex.ModifyIndex = 1
|
||||||
|
|
||||||
|
proxyConf, ok := config.(*structs.ProxyConfigEntry)
|
||||||
|
require.True(ok)
|
||||||
|
|
||||||
|
// Read the map[string]interface{} back out.
|
||||||
|
value, _ := proxyConf.Config["DestinationServiceName"].([]uint8)
|
||||||
|
proxyConf.Config["DestinationServiceName"] = structs.Uint8ToString(value)
|
||||||
|
|
||||||
require.Equal(entry, config)
|
require.Equal(entry, config)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,8 +1,6 @@
|
||||||
package fsm
|
package fsm
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/consul/autopilot"
|
"github.com/hashicorp/consul/agent/consul/autopilot"
|
||||||
"github.com/hashicorp/consul/agent/consul/state"
|
"github.com/hashicorp/consul/agent/consul/state"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
@ -377,10 +375,10 @@ func (s *snapshot) persistConfigEntries(sink raft.SnapshotSink,
|
||||||
if _, err := sink.Write([]byte{byte(structs.ConfigEntryRequestType)}); err != nil {
|
if _, err := sink.Write([]byte{byte(structs.ConfigEntryRequestType)}); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := encoder.Encode(entry.GetKind()); err != nil {
|
req := &structs.ConfigEntryRequest{
|
||||||
return err
|
Entry: entry,
|
||||||
}
|
}
|
||||||
if err := encoder.Encode(entry); err != nil {
|
if err := encoder.Encode(req); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -594,23 +592,9 @@ func restorePolicy(header *snapshotHeader, restore *state.Restore, decoder *code
|
||||||
}
|
}
|
||||||
|
|
||||||
func restoreConfigEntry(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
func restoreConfigEntry(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||||
var req structs.ConfigEntry
|
var req structs.ConfigEntryRequest
|
||||||
var kind string
|
|
||||||
if err := decoder.Decode(&kind); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
switch kind {
|
|
||||||
case structs.ServiceDefaults:
|
|
||||||
req = &structs.ServiceConfigEntry{}
|
|
||||||
case structs.ProxyDefaults:
|
|
||||||
req = &structs.ProxyConfigEntry{}
|
|
||||||
default:
|
|
||||||
return fmt.Errorf("invalid config type: %s", kind)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := decoder.Decode(&req); err != nil {
|
if err := decoder.Decode(&req); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return restore.ConfigEntry(req)
|
return restore.ConfigEntry(req.Entry)
|
||||||
}
|
}
|
||||||
|
|
|
@ -317,8 +317,8 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
|
||||||
// Verify ACL Token is restored
|
// Verify ACL Token is restored
|
||||||
_, a, err := fsm2.state.ACLTokenGetByAccessor(nil, token.AccessorID)
|
_, a, err := fsm2.state.ACLTokenGetByAccessor(nil, token.AccessorID)
|
||||||
require.NoError(err)
|
require.NoError(err)
|
||||||
require.Equal(t, token.AccessorID, a.AccessorID)
|
require.Equal(token.AccessorID, a.AccessorID)
|
||||||
require.Equal(t, token.ModifyIndex, a.ModifyIndex)
|
require.Equal(token.ModifyIndex, a.ModifyIndex)
|
||||||
|
|
||||||
// Verify the acl-token-bootstrap index was restored
|
// Verify the acl-token-bootstrap index was restored
|
||||||
canBootstrap, index, err := fsm2.state.CanBootstrapACLToken()
|
canBootstrap, index, err := fsm2.state.CanBootstrapACLToken()
|
||||||
|
|
|
@ -3,6 +3,8 @@ package structs
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"github.com/hashicorp/go-msgpack/codec"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -163,3 +165,62 @@ type ConfigEntryRequest struct {
|
||||||
Op ConfigEntryOp
|
Op ConfigEntryOp
|
||||||
Entry ConfigEntry
|
Entry ConfigEntry
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *ConfigEntryRequest) MarshalBinary() (data []byte, err error) {
|
||||||
|
// bs will grow if needed but allocate enough to avoid reallocation in common
|
||||||
|
// case.
|
||||||
|
bs := make([]byte, 128)
|
||||||
|
enc := codec.NewEncoderBytes(&bs, msgpackHandle)
|
||||||
|
// Encode kind first
|
||||||
|
err = enc.Encode(r.Entry.GetKind())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
// Then actual value using alias trick to avoid infinite recursion
|
||||||
|
type Alias ConfigEntryRequest
|
||||||
|
err = enc.Encode(struct {
|
||||||
|
*Alias
|
||||||
|
}{
|
||||||
|
Alias: (*Alias)(r),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return bs, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *ConfigEntryRequest) UnmarshalBinary(data []byte) error {
|
||||||
|
// First decode the kind prefix
|
||||||
|
var kind string
|
||||||
|
dec := codec.NewDecoderBytes(data, msgpackHandle)
|
||||||
|
if err := dec.Decode(&kind); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Then decode the real thing with appropriate kind of ConfigEntry
|
||||||
|
r.Entry = makeConfigEntry(kind)
|
||||||
|
|
||||||
|
// Alias juggling to prevent infinite recursive calls back to this decode
|
||||||
|
// method.
|
||||||
|
type Alias ConfigEntryRequest
|
||||||
|
as := struct {
|
||||||
|
*Alias
|
||||||
|
}{
|
||||||
|
Alias: (*Alias)(r),
|
||||||
|
}
|
||||||
|
if err := dec.Decode(&as); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func makeConfigEntry(kind string) ConfigEntry {
|
||||||
|
switch kind {
|
||||||
|
case ServiceDefaults:
|
||||||
|
return &ServiceConfigEntry{}
|
||||||
|
case ProxyDefaults:
|
||||||
|
return &ProxyConfigEntry{}
|
||||||
|
default:
|
||||||
|
panic("invalid kind")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -56,8 +56,6 @@ const (
|
||||||
ACLPolicyDeleteRequestType = 20
|
ACLPolicyDeleteRequestType = 20
|
||||||
ConnectCALeafRequestType = 21
|
ConnectCALeafRequestType = 21
|
||||||
ConfigEntryRequestType = 22 // FSM snapshots only.
|
ConfigEntryRequestType = 22 // FSM snapshots only.
|
||||||
ServiceConfigEntryRequestType = 23
|
|
||||||
ProxyConfigEntryRequestType = 24
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|
Loading…
Reference in New Issue