server: create new memdb table for storing system metadata (#8703)

This adds a new very tiny memdb table and corresponding raft operation
for updating a very small effective map[string]string collection of
"system metadata". This can persistently record a fact about the Consul
state machine itself.

The first use of this feature will come in a later PR.
pull/8833/head
R.B. Boyer 2020-10-06 10:08:37 -05:00 committed by GitHub
parent 3fe95f17d5
commit 4998a08c56
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 503 additions and 0 deletions

4
.changelog/8703.txt Normal file
View File

@ -0,0 +1,4 @@
```release-note:feature
server: create new memdb table for storing system metadata
```

View File

@ -37,6 +37,7 @@ func init() {
registerCommand(structs.ACLAuthMethodSetRequestType, (*FSM).applyACLAuthMethodSetOperation)
registerCommand(structs.ACLAuthMethodDeleteRequestType, (*FSM).applyACLAuthMethodDeleteOperation)
registerCommand(structs.FederationStateRequestType, (*FSM).applyFederationStateOperation)
registerCommand(structs.SystemMetadataRequestType, (*FSM).applySystemMetadataOperation)
}
func (c *FSM) applyRegister(buf []byte, index uint64) interface{} {
@ -568,3 +569,26 @@ func (c *FSM) applyFederationStateOperation(buf []byte, index uint64) interface{
return fmt.Errorf("invalid federation state operation type: %v", req.Op)
}
}
func (c *FSM) applySystemMetadataOperation(buf []byte, index uint64) interface{} {
var req structs.SystemMetadataRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
switch req.Op {
case structs.SystemMetadataUpsert:
defer metrics.MeasureSinceWithLabels([]string{"fsm", "system_metadata"}, time.Now(),
[]metrics.Label{{Name: "op", Value: "upsert"}})
if err := c.state.SystemMetadataSet(index, req.Entry); err != nil {
return err
}
return true
case structs.SystemMetadataDelete:
defer metrics.MeasureSinceWithLabels([]string{"fsm", "system_metadata"}, time.Now(),
[]metrics.Label{{Name: "op", Value: "delete"}})
return c.state.SystemMetadataDelete(index, req.Entry)
default:
return fmt.Errorf("invalid system metadata operation type: %v", req.Op)
}
}

View File

@ -32,6 +32,7 @@ func init() {
registerRestorer(structs.ACLBindingRuleSetRequestType, restoreBindingRule)
registerRestorer(structs.ACLAuthMethodSetRequestType, restoreAuthMethod)
registerRestorer(structs.FederationStateRequestType, restoreFederationState)
registerRestorer(structs.SystemMetadataRequestType, restoreSystemMetadata)
}
func persistOSS(s *snapshot, sink raft.SnapshotSink, encoder *codec.Encoder) error {
@ -74,6 +75,9 @@ func persistOSS(s *snapshot, sink raft.SnapshotSink, encoder *codec.Encoder) err
if err := s.persistFederationStates(sink, encoder); err != nil {
return err
}
if err := s.persistSystemMetadata(sink, encoder); err != nil {
return err
}
if err := s.persistIndex(sink, encoder); err != nil {
return err
}
@ -464,6 +468,23 @@ func (s *snapshot) persistFederationStates(sink raft.SnapshotSink, encoder *code
return nil
}
func (s *snapshot) persistSystemMetadata(sink raft.SnapshotSink, encoder *codec.Encoder) error {
entries, err := s.state.SystemMetadataEntries()
if err != nil {
return err
}
for _, entry := range entries {
if _, err := sink.Write([]byte{byte(structs.SystemMetadataRequestType)}); err != nil {
return err
}
if err := encoder.Encode(entry); err != nil {
return err
}
}
return nil
}
func (s *snapshot) persistIndex(sink raft.SnapshotSink, encoder *codec.Encoder) error {
// Get all the indexes
iter, err := s.state.Indexes()
@ -712,3 +733,11 @@ func restoreFederationState(header *snapshotHeader, restore *state.Restore, deco
}
return restore.FederationState(req.State)
}
func restoreSystemMetadata(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
var req structs.SystemMetadataEntry
if err := decoder.Decode(&req); err != nil {
return err
}
return restore.SystemMetadataEntry(&req)
}

View File

@ -399,6 +399,12 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
ServiceID: "web",
}))
// system metadata
systemMetadataEntry := &structs.SystemMetadataEntry{
Key: "key1", Value: "val1",
}
require.NoError(t, fsm.state.SystemMetadataSet(25, systemMetadataEntry))
// Snapshot
snap, err := fsm.Snapshot()
require.NoError(t, err)
@ -660,6 +666,12 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
require.Equal(t, len(nodes), nodeCount)
require.NotZero(t, idx)
// Verify system metadata is restored.
_, systemMetadataLoaded, err := fsm2.state.SystemMetadataList(nil)
require.NoError(t, err)
require.Len(t, systemMetadataLoaded, 1)
require.Equal(t, systemMetadataEntry, systemMetadataLoaded[0])
// Snapshot
snap, err = fsm2.Snapshot()
require.NoError(t, err)

View File

@ -0,0 +1,193 @@
package state
import (
"fmt"
"github.com/hashicorp/consul/agent/structs"
memdb "github.com/hashicorp/go-memdb"
)
const systemMetadataTableName = "system-metadata"
func systemMetadataTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: systemMetadataTableName,
Indexes: map[string]*memdb.IndexSchema{
"id": {
Name: "id",
AllowMissing: false,
Unique: true,
Indexer: &memdb.StringFieldIndex{
Field: "Key",
Lowercase: true,
},
},
},
}
}
func init() {
registerSchema(systemMetadataTableSchema)
}
// SystemMetadataEntries used to pull all the system metadata entries for the snapshot.
func (s *Snapshot) SystemMetadataEntries() ([]*structs.SystemMetadataEntry, error) {
entries, err := s.tx.Get(systemMetadataTableName, "id")
if err != nil {
return nil, err
}
var ret []*structs.SystemMetadataEntry
for wrapped := entries.Next(); wrapped != nil; wrapped = entries.Next() {
ret = append(ret, wrapped.(*structs.SystemMetadataEntry))
}
return ret, nil
}
// SystemMetadataEntry is used when restoring from a snapshot.
func (s *Restore) SystemMetadataEntry(entry *structs.SystemMetadataEntry) error {
// Insert
if err := s.tx.Insert(systemMetadataTableName, entry); err != nil {
return fmt.Errorf("failed restoring system metadata object: %s", err)
}
if err := indexUpdateMaxTxn(s.tx, entry.ModifyIndex, systemMetadataTableName); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
return nil
}
// SystemMetadataSet is called to do an upsert of a set of system metadata entries.
func (s *Store) SystemMetadataSet(idx uint64, entry *structs.SystemMetadataEntry) error {
tx := s.db.WriteTxn(idx)
defer tx.Abort()
if err := systemMetadataSetTxn(tx, idx, entry); err != nil {
return err
}
return tx.Commit()
}
// systemMetadataSetTxn upserts a system metadata inside of a transaction.
func systemMetadataSetTxn(tx *txn, idx uint64, entry *structs.SystemMetadataEntry) error {
// The only validation we care about is non-empty keys.
if entry.Key == "" {
return fmt.Errorf("missing key on system metadata")
}
// Check for existing.
var existing *structs.SystemMetadataEntry
existingRaw, err := tx.First(systemMetadataTableName, "id", entry.Key)
if err != nil {
return fmt.Errorf("failed system metadata lookup: %s", err)
}
if existingRaw != nil {
existing = existingRaw.(*structs.SystemMetadataEntry)
}
// Set the indexes
if existing != nil {
entry.CreateIndex = existing.CreateIndex
entry.ModifyIndex = idx
} else {
entry.CreateIndex = idx
entry.ModifyIndex = idx
}
// Insert the system metadata and update the index
if err := tx.Insert(systemMetadataTableName, entry); err != nil {
return fmt.Errorf("failed inserting system metadata: %s", err)
}
if err := tx.Insert("index", &IndexEntry{systemMetadataTableName, idx}); err != nil {
return fmt.Errorf("failed updating index: %v", err)
}
return nil
}
// SystemMetadataGet is called to get a system metadata.
func (s *Store) SystemMetadataGet(ws memdb.WatchSet, key string) (uint64, *structs.SystemMetadataEntry, error) {
tx := s.db.ReadTxn()
defer tx.Abort()
return systemMetadataGetTxn(tx, ws, key)
}
func systemMetadataGetTxn(tx ReadTxn, ws memdb.WatchSet, key string) (uint64, *structs.SystemMetadataEntry, error) {
// Get the index
idx := maxIndexTxn(tx, systemMetadataTableName)
// Get the existing contents.
watchCh, existing, err := tx.FirstWatch(systemMetadataTableName, "id", key)
if err != nil {
return 0, nil, fmt.Errorf("failed system metadata lookup: %s", err)
}
ws.Add(watchCh)
if existing == nil {
return idx, nil, nil
}
entry, ok := existing.(*structs.SystemMetadataEntry)
if !ok {
return 0, nil, fmt.Errorf("system metadata %q is an invalid type: %T", key, existing)
}
return idx, entry, nil
}
// SystemMetadataList is called to get all system metadata objects.
func (s *Store) SystemMetadataList(ws memdb.WatchSet) (uint64, []*structs.SystemMetadataEntry, error) {
tx := s.db.ReadTxn()
defer tx.Abort()
return systemMetadataListTxn(tx, ws)
}
func systemMetadataListTxn(tx ReadTxn, ws memdb.WatchSet) (uint64, []*structs.SystemMetadataEntry, error) {
// Get the index
idx := maxIndexTxn(tx, systemMetadataTableName)
iter, err := tx.Get(systemMetadataTableName, "id")
if err != nil {
return 0, nil, fmt.Errorf("failed system metadata lookup: %s", err)
}
ws.Add(iter.WatchCh())
var results []*structs.SystemMetadataEntry
for v := iter.Next(); v != nil; v = iter.Next() {
results = append(results, v.(*structs.SystemMetadataEntry))
}
return idx, results, nil
}
func (s *Store) SystemMetadataDelete(idx uint64, entry *structs.SystemMetadataEntry) error {
tx := s.db.WriteTxn(idx)
defer tx.Abort()
if err := systemMetadataDeleteTxn(tx, idx, entry.Key); err != nil {
return err
}
return tx.Commit()
}
func systemMetadataDeleteTxn(tx *txn, idx uint64, key string) error {
// Try to retrieve the existing system metadata.
existing, err := tx.First(systemMetadataTableName, "id", key)
if err != nil {
return fmt.Errorf("failed system metadata lookup: %s", err)
}
if existing == nil {
return nil
}
// Delete the system metadata from the DB and update the index.
if err := tx.Delete(systemMetadataTableName, existing); err != nil {
return fmt.Errorf("failed removing system metadata: %s", err)
}
if err := tx.Insert("index", &IndexEntry{systemMetadataTableName, idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
return nil
}

View File

@ -0,0 +1,96 @@
package state
import (
"testing"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/require"
)
func TestStore_SystemMetadata(t *testing.T) {
s := testStateStore(t)
mapify := func(entries []*structs.SystemMetadataEntry) map[string]string {
m := make(map[string]string)
for _, entry := range entries {
m[entry.Key] = entry.Value
}
return m
}
checkListAndGet := func(t *testing.T, expect map[string]string) {
// List all
_, entries, err := s.SystemMetadataList(nil)
require.NoError(t, err)
require.Len(t, entries, len(expect))
require.Equal(t, expect, mapify(entries))
// Read each
for expectKey, expectVal := range expect {
_, entry, err := s.SystemMetadataGet(nil, expectKey)
require.NoError(t, err)
require.NotNil(t, entry)
require.Equal(t, expectVal, entry.Value)
}
}
checkListAndGet(t, map[string]string{})
var nextIndex uint64
// Create 3 keys
nextIndex++
require.NoError(t, s.SystemMetadataSet(nextIndex, &structs.SystemMetadataEntry{
Key: "key1", Value: "val1",
}))
nextIndex++
require.NoError(t, s.SystemMetadataSet(nextIndex, &structs.SystemMetadataEntry{
Key: "key2", Value: "val2",
}))
nextIndex++
require.NoError(t, s.SystemMetadataSet(nextIndex, &structs.SystemMetadataEntry{
Key: "key3",
}))
checkListAndGet(t, map[string]string{
"key1": "val1",
"key2": "val2",
"key3": "",
})
// Missing results are nil
_, entry, err := s.SystemMetadataGet(nil, "key4")
require.NoError(t, err)
require.Nil(t, entry)
// Delete one that exists and one that does not
nextIndex++
require.NoError(t, s.SystemMetadataDelete(nextIndex, &structs.SystemMetadataEntry{
Key: "key2",
}))
nextIndex++
require.NoError(t, s.SystemMetadataDelete(nextIndex, &structs.SystemMetadataEntry{
Key: "key4",
}))
checkListAndGet(t, map[string]string{
"key1": "val1",
"key3": "",
})
// Update one that exists and add another one.
nextIndex++
require.NoError(t, s.SystemMetadataSet(nextIndex, &structs.SystemMetadataEntry{
Key: "key3", Value: "val3",
}))
require.NoError(t, s.SystemMetadataSet(nextIndex, &structs.SystemMetadataEntry{
Key: "key4", Value: "val4",
}))
checkListAndGet(t, map[string]string{
"key1": "val1",
"key3": "val3",
"key4": "val4",
})
}

View File

@ -0,0 +1,108 @@
package consul
import (
"os"
"testing"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/testrpc"
"github.com/stretchr/testify/require"
)
func TestLeader_SystemMetadata_CRUD(t *testing.T) {
// This test is a little strange because it is testing behavior that
// doesn't have an exposed RPC. We're just testing the full round trip of
// raft+fsm For now,
dir1, srv := testServerWithConfig(t, nil)
defer os.RemoveAll(dir1)
defer srv.Shutdown()
codec := rpcClient(t, srv)
defer codec.Close()
testrpc.WaitForLeader(t, srv.RPC, "dc1")
state := srv.fsm.State()
// Initially empty
_, entries, err := state.SystemMetadataList(nil)
require.NoError(t, err)
require.Len(t, entries, 0)
// Create 3
require.NoError(t, setSystemMetadataKey(srv, "key1", "val1"))
require.NoError(t, setSystemMetadataKey(srv, "key2", "val2"))
require.NoError(t, setSystemMetadataKey(srv, "key3", ""))
mapify := func(entries []*structs.SystemMetadataEntry) map[string]string {
m := make(map[string]string)
for _, entry := range entries {
m[entry.Key] = entry.Value
}
return m
}
_, entries, err = state.SystemMetadataList(nil)
require.NoError(t, err)
require.Len(t, entries, 3)
require.Equal(t, map[string]string{
"key1": "val1",
"key2": "val2",
"key3": "",
}, mapify(entries))
// Update one and delete one.
require.NoError(t, setSystemMetadataKey(srv, "key3", "val3"))
require.NoError(t, deleteSystemMetadataKey(srv, "key1"))
_, entries, err = state.SystemMetadataList(nil)
require.NoError(t, err)
require.Len(t, entries, 2)
require.Equal(t, map[string]string{
"key2": "val2",
"key3": "val3",
}, mapify(entries))
}
// Note when this behavior is actually used, consider promoting these 2
// functions out of test code.
func setSystemMetadataKey(s *Server, key, val string) error {
args := &structs.SystemMetadataRequest{
Op: structs.SystemMetadataUpsert,
Entry: &structs.SystemMetadataEntry{
Key: key, Value: val,
},
}
resp, err := s.raftApply(structs.SystemMetadataRequestType, args)
if err != nil {
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
return nil
}
func deleteSystemMetadataKey(s *Server, key string) error {
args := &structs.SystemMetadataRequest{
Op: structs.SystemMetadataDelete,
Entry: &structs.SystemMetadataEntry{
Key: key,
},
}
resp, err := s.raftApply(structs.SystemMetadataRequestType, args)
if err != nil {
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
return nil
}

View File

@ -68,6 +68,7 @@ const (
ACLAuthMethodDeleteRequestType = 28
ChunkingStateType = 29
FederationStateRequestType = 30
SystemMetadataRequestType = 31
)
const (

View File

@ -0,0 +1,36 @@
package structs
// SystemMetadataOp is the operation for a request related to system metadata.
type SystemMetadataOp string
const (
SystemMetadataUpsert SystemMetadataOp = "upsert"
SystemMetadataDelete SystemMetadataOp = "delete"
)
// SystemMetadataRequest is used to upsert and delete system metadata.
type SystemMetadataRequest struct {
// Datacenter is the target for this request.
Datacenter string
// Op is the type of operation being requested.
Op SystemMetadataOp
// Entry is the key to modify.
Entry *SystemMetadataEntry
// WriteRequest is a common struct containing ACL tokens and other
// write-related common elements for requests.
WriteRequest
}
type SystemMetadataEntry struct {
Key string
Value string `json:",omitempty"`
RaftIndex
}
// RequestDatacenter returns the datacenter for a given request.
func (c *SystemMetadataRequest) RequestDatacenter() string {
return c.Datacenter
}