From 4998a08c5698f31656741819756bfb3dee7f0bf1 Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" Date: Tue, 6 Oct 2020 10:08:37 -0500 Subject: [PATCH] server: create new memdb table for storing system metadata (#8703) This adds a new very tiny memdb table and corresponding raft operation for updating a very small effective map[string]string collection of "system metadata". This can persistently record a fact about the Consul state machine itself. The first use of this feature will come in a later PR. --- .changelog/8703.txt | 4 + agent/consul/fsm/commands_oss.go | 24 +++ agent/consul/fsm/snapshot_oss.go | 29 ++++ agent/consul/fsm/snapshot_oss_test.go | 12 ++ agent/consul/state/system_metadata.go | 193 +++++++++++++++++++++ agent/consul/state/system_metadata_test.go | 96 ++++++++++ agent/consul/system_metadata_test.go | 108 ++++++++++++ agent/structs/structs.go | 1 + agent/structs/system_metadata.go | 36 ++++ 9 files changed, 503 insertions(+) create mode 100644 .changelog/8703.txt create mode 100644 agent/consul/state/system_metadata.go create mode 100644 agent/consul/state/system_metadata_test.go create mode 100644 agent/consul/system_metadata_test.go create mode 100644 agent/structs/system_metadata.go diff --git a/.changelog/8703.txt b/.changelog/8703.txt new file mode 100644 index 0000000000..9d08692f79 --- /dev/null +++ b/.changelog/8703.txt @@ -0,0 +1,4 @@ +```release-note:feature +server: create new memdb table for storing system metadata +``` + diff --git a/agent/consul/fsm/commands_oss.go b/agent/consul/fsm/commands_oss.go index 793bc83a02..b667780c26 100644 --- a/agent/consul/fsm/commands_oss.go +++ b/agent/consul/fsm/commands_oss.go @@ -37,6 +37,7 @@ func init() { registerCommand(structs.ACLAuthMethodSetRequestType, (*FSM).applyACLAuthMethodSetOperation) registerCommand(structs.ACLAuthMethodDeleteRequestType, (*FSM).applyACLAuthMethodDeleteOperation) registerCommand(structs.FederationStateRequestType, (*FSM).applyFederationStateOperation) + registerCommand(structs.SystemMetadataRequestType, (*FSM).applySystemMetadataOperation) } func (c *FSM) applyRegister(buf []byte, index uint64) interface{} { @@ -568,3 +569,26 @@ func (c *FSM) applyFederationStateOperation(buf []byte, index uint64) interface{ return fmt.Errorf("invalid federation state operation type: %v", req.Op) } } + +func (c *FSM) applySystemMetadataOperation(buf []byte, index uint64) interface{} { + var req structs.SystemMetadataRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + + switch req.Op { + case structs.SystemMetadataUpsert: + defer metrics.MeasureSinceWithLabels([]string{"fsm", "system_metadata"}, time.Now(), + []metrics.Label{{Name: "op", Value: "upsert"}}) + if err := c.state.SystemMetadataSet(index, req.Entry); err != nil { + return err + } + return true + case structs.SystemMetadataDelete: + defer metrics.MeasureSinceWithLabels([]string{"fsm", "system_metadata"}, time.Now(), + []metrics.Label{{Name: "op", Value: "delete"}}) + return c.state.SystemMetadataDelete(index, req.Entry) + default: + return fmt.Errorf("invalid system metadata operation type: %v", req.Op) + } +} diff --git a/agent/consul/fsm/snapshot_oss.go b/agent/consul/fsm/snapshot_oss.go index e1ca30b08e..b09e2cc919 100644 --- a/agent/consul/fsm/snapshot_oss.go +++ b/agent/consul/fsm/snapshot_oss.go @@ -32,6 +32,7 @@ func init() { registerRestorer(structs.ACLBindingRuleSetRequestType, restoreBindingRule) registerRestorer(structs.ACLAuthMethodSetRequestType, restoreAuthMethod) registerRestorer(structs.FederationStateRequestType, restoreFederationState) + registerRestorer(structs.SystemMetadataRequestType, restoreSystemMetadata) } func persistOSS(s *snapshot, sink raft.SnapshotSink, encoder *codec.Encoder) error { @@ -74,6 +75,9 @@ func persistOSS(s *snapshot, sink raft.SnapshotSink, encoder *codec.Encoder) err if err := s.persistFederationStates(sink, encoder); err != nil { return err } + if err := s.persistSystemMetadata(sink, encoder); err != nil { + return err + } if err := s.persistIndex(sink, encoder); err != nil { return err } @@ -464,6 +468,23 @@ func (s *snapshot) persistFederationStates(sink raft.SnapshotSink, encoder *code return nil } +func (s *snapshot) persistSystemMetadata(sink raft.SnapshotSink, encoder *codec.Encoder) error { + entries, err := s.state.SystemMetadataEntries() + if err != nil { + return err + } + + for _, entry := range entries { + if _, err := sink.Write([]byte{byte(structs.SystemMetadataRequestType)}); err != nil { + return err + } + if err := encoder.Encode(entry); err != nil { + return err + } + } + return nil +} + func (s *snapshot) persistIndex(sink raft.SnapshotSink, encoder *codec.Encoder) error { // Get all the indexes iter, err := s.state.Indexes() @@ -712,3 +733,11 @@ func restoreFederationState(header *snapshotHeader, restore *state.Restore, deco } return restore.FederationState(req.State) } + +func restoreSystemMetadata(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error { + var req structs.SystemMetadataEntry + if err := decoder.Decode(&req); err != nil { + return err + } + return restore.SystemMetadataEntry(&req) +} diff --git a/agent/consul/fsm/snapshot_oss_test.go b/agent/consul/fsm/snapshot_oss_test.go index f798a0efaa..1b74d1bae5 100644 --- a/agent/consul/fsm/snapshot_oss_test.go +++ b/agent/consul/fsm/snapshot_oss_test.go @@ -399,6 +399,12 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { ServiceID: "web", })) + // system metadata + systemMetadataEntry := &structs.SystemMetadataEntry{ + Key: "key1", Value: "val1", + } + require.NoError(t, fsm.state.SystemMetadataSet(25, systemMetadataEntry)) + // Snapshot snap, err := fsm.Snapshot() require.NoError(t, err) @@ -660,6 +666,12 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { require.Equal(t, len(nodes), nodeCount) require.NotZero(t, idx) + // Verify system metadata is restored. + _, systemMetadataLoaded, err := fsm2.state.SystemMetadataList(nil) + require.NoError(t, err) + require.Len(t, systemMetadataLoaded, 1) + require.Equal(t, systemMetadataEntry, systemMetadataLoaded[0]) + // Snapshot snap, err = fsm2.Snapshot() require.NoError(t, err) diff --git a/agent/consul/state/system_metadata.go b/agent/consul/state/system_metadata.go new file mode 100644 index 0000000000..52f9f43a3a --- /dev/null +++ b/agent/consul/state/system_metadata.go @@ -0,0 +1,193 @@ +package state + +import ( + "fmt" + + "github.com/hashicorp/consul/agent/structs" + memdb "github.com/hashicorp/go-memdb" +) + +const systemMetadataTableName = "system-metadata" + +func systemMetadataTableSchema() *memdb.TableSchema { + return &memdb.TableSchema{ + Name: systemMetadataTableName, + Indexes: map[string]*memdb.IndexSchema{ + "id": { + Name: "id", + AllowMissing: false, + Unique: true, + Indexer: &memdb.StringFieldIndex{ + Field: "Key", + Lowercase: true, + }, + }, + }, + } +} +func init() { + registerSchema(systemMetadataTableSchema) +} + +// SystemMetadataEntries used to pull all the system metadata entries for the snapshot. +func (s *Snapshot) SystemMetadataEntries() ([]*structs.SystemMetadataEntry, error) { + entries, err := s.tx.Get(systemMetadataTableName, "id") + if err != nil { + return nil, err + } + + var ret []*structs.SystemMetadataEntry + for wrapped := entries.Next(); wrapped != nil; wrapped = entries.Next() { + ret = append(ret, wrapped.(*structs.SystemMetadataEntry)) + } + + return ret, nil +} + +// SystemMetadataEntry is used when restoring from a snapshot. +func (s *Restore) SystemMetadataEntry(entry *structs.SystemMetadataEntry) error { + // Insert + if err := s.tx.Insert(systemMetadataTableName, entry); err != nil { + return fmt.Errorf("failed restoring system metadata object: %s", err) + } + if err := indexUpdateMaxTxn(s.tx, entry.ModifyIndex, systemMetadataTableName); err != nil { + return fmt.Errorf("failed updating index: %s", err) + } + + return nil +} + +// SystemMetadataSet is called to do an upsert of a set of system metadata entries. +func (s *Store) SystemMetadataSet(idx uint64, entry *structs.SystemMetadataEntry) error { + tx := s.db.WriteTxn(idx) + defer tx.Abort() + + if err := systemMetadataSetTxn(tx, idx, entry); err != nil { + return err + } + + return tx.Commit() +} + +// systemMetadataSetTxn upserts a system metadata inside of a transaction. +func systemMetadataSetTxn(tx *txn, idx uint64, entry *structs.SystemMetadataEntry) error { + // The only validation we care about is non-empty keys. + if entry.Key == "" { + return fmt.Errorf("missing key on system metadata") + } + + // Check for existing. + var existing *structs.SystemMetadataEntry + existingRaw, err := tx.First(systemMetadataTableName, "id", entry.Key) + if err != nil { + return fmt.Errorf("failed system metadata lookup: %s", err) + } + + if existingRaw != nil { + existing = existingRaw.(*structs.SystemMetadataEntry) + } + + // Set the indexes + if existing != nil { + entry.CreateIndex = existing.CreateIndex + entry.ModifyIndex = idx + } else { + entry.CreateIndex = idx + entry.ModifyIndex = idx + } + + // Insert the system metadata and update the index + if err := tx.Insert(systemMetadataTableName, entry); err != nil { + return fmt.Errorf("failed inserting system metadata: %s", err) + } + if err := tx.Insert("index", &IndexEntry{systemMetadataTableName, idx}); err != nil { + return fmt.Errorf("failed updating index: %v", err) + } + + return nil +} + +// SystemMetadataGet is called to get a system metadata. +func (s *Store) SystemMetadataGet(ws memdb.WatchSet, key string) (uint64, *structs.SystemMetadataEntry, error) { + tx := s.db.ReadTxn() + defer tx.Abort() + return systemMetadataGetTxn(tx, ws, key) +} + +func systemMetadataGetTxn(tx ReadTxn, ws memdb.WatchSet, key string) (uint64, *structs.SystemMetadataEntry, error) { + // Get the index + idx := maxIndexTxn(tx, systemMetadataTableName) + + // Get the existing contents. + watchCh, existing, err := tx.FirstWatch(systemMetadataTableName, "id", key) + if err != nil { + return 0, nil, fmt.Errorf("failed system metadata lookup: %s", err) + } + ws.Add(watchCh) + + if existing == nil { + return idx, nil, nil + } + + entry, ok := existing.(*structs.SystemMetadataEntry) + if !ok { + return 0, nil, fmt.Errorf("system metadata %q is an invalid type: %T", key, existing) + } + + return idx, entry, nil +} + +// SystemMetadataList is called to get all system metadata objects. +func (s *Store) SystemMetadataList(ws memdb.WatchSet) (uint64, []*structs.SystemMetadataEntry, error) { + tx := s.db.ReadTxn() + defer tx.Abort() + return systemMetadataListTxn(tx, ws) +} + +func systemMetadataListTxn(tx ReadTxn, ws memdb.WatchSet) (uint64, []*structs.SystemMetadataEntry, error) { + // Get the index + idx := maxIndexTxn(tx, systemMetadataTableName) + + iter, err := tx.Get(systemMetadataTableName, "id") + if err != nil { + return 0, nil, fmt.Errorf("failed system metadata lookup: %s", err) + } + ws.Add(iter.WatchCh()) + + var results []*structs.SystemMetadataEntry + for v := iter.Next(); v != nil; v = iter.Next() { + results = append(results, v.(*structs.SystemMetadataEntry)) + } + return idx, results, nil +} + +func (s *Store) SystemMetadataDelete(idx uint64, entry *structs.SystemMetadataEntry) error { + tx := s.db.WriteTxn(idx) + defer tx.Abort() + + if err := systemMetadataDeleteTxn(tx, idx, entry.Key); err != nil { + return err + } + + return tx.Commit() +} + +func systemMetadataDeleteTxn(tx *txn, idx uint64, key string) error { + // Try to retrieve the existing system metadata. + existing, err := tx.First(systemMetadataTableName, "id", key) + if err != nil { + return fmt.Errorf("failed system metadata lookup: %s", err) + } + if existing == nil { + return nil + } + + // Delete the system metadata from the DB and update the index. + if err := tx.Delete(systemMetadataTableName, existing); err != nil { + return fmt.Errorf("failed removing system metadata: %s", err) + } + if err := tx.Insert("index", &IndexEntry{systemMetadataTableName, idx}); err != nil { + return fmt.Errorf("failed updating index: %s", err) + } + return nil +} diff --git a/agent/consul/state/system_metadata_test.go b/agent/consul/state/system_metadata_test.go new file mode 100644 index 0000000000..dbb196b4c3 --- /dev/null +++ b/agent/consul/state/system_metadata_test.go @@ -0,0 +1,96 @@ +package state + +import ( + "testing" + + "github.com/hashicorp/consul/agent/structs" + "github.com/stretchr/testify/require" +) + +func TestStore_SystemMetadata(t *testing.T) { + s := testStateStore(t) + + mapify := func(entries []*structs.SystemMetadataEntry) map[string]string { + m := make(map[string]string) + for _, entry := range entries { + m[entry.Key] = entry.Value + } + return m + } + + checkListAndGet := func(t *testing.T, expect map[string]string) { + // List all + _, entries, err := s.SystemMetadataList(nil) + require.NoError(t, err) + require.Len(t, entries, len(expect)) + require.Equal(t, expect, mapify(entries)) + + // Read each + for expectKey, expectVal := range expect { + _, entry, err := s.SystemMetadataGet(nil, expectKey) + require.NoError(t, err) + require.NotNil(t, entry) + require.Equal(t, expectVal, entry.Value) + } + } + + checkListAndGet(t, map[string]string{}) + + var nextIndex uint64 + + // Create 3 keys + nextIndex++ + require.NoError(t, s.SystemMetadataSet(nextIndex, &structs.SystemMetadataEntry{ + Key: "key1", Value: "val1", + })) + nextIndex++ + require.NoError(t, s.SystemMetadataSet(nextIndex, &structs.SystemMetadataEntry{ + Key: "key2", Value: "val2", + })) + nextIndex++ + require.NoError(t, s.SystemMetadataSet(nextIndex, &structs.SystemMetadataEntry{ + Key: "key3", + })) + + checkListAndGet(t, map[string]string{ + "key1": "val1", + "key2": "val2", + "key3": "", + }) + + // Missing results are nil + _, entry, err := s.SystemMetadataGet(nil, "key4") + require.NoError(t, err) + require.Nil(t, entry) + + // Delete one that exists and one that does not + nextIndex++ + require.NoError(t, s.SystemMetadataDelete(nextIndex, &structs.SystemMetadataEntry{ + Key: "key2", + })) + nextIndex++ + require.NoError(t, s.SystemMetadataDelete(nextIndex, &structs.SystemMetadataEntry{ + Key: "key4", + })) + + checkListAndGet(t, map[string]string{ + "key1": "val1", + "key3": "", + }) + + // Update one that exists and add another one. + nextIndex++ + require.NoError(t, s.SystemMetadataSet(nextIndex, &structs.SystemMetadataEntry{ + Key: "key3", Value: "val3", + })) + require.NoError(t, s.SystemMetadataSet(nextIndex, &structs.SystemMetadataEntry{ + Key: "key4", Value: "val4", + })) + + checkListAndGet(t, map[string]string{ + "key1": "val1", + "key3": "val3", + "key4": "val4", + }) + +} diff --git a/agent/consul/system_metadata_test.go b/agent/consul/system_metadata_test.go new file mode 100644 index 0000000000..94f31e8f82 --- /dev/null +++ b/agent/consul/system_metadata_test.go @@ -0,0 +1,108 @@ +package consul + +import ( + "os" + "testing" + + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/testrpc" + "github.com/stretchr/testify/require" +) + +func TestLeader_SystemMetadata_CRUD(t *testing.T) { + // This test is a little strange because it is testing behavior that + // doesn't have an exposed RPC. We're just testing the full round trip of + // raft+fsm For now, + + dir1, srv := testServerWithConfig(t, nil) + defer os.RemoveAll(dir1) + defer srv.Shutdown() + codec := rpcClient(t, srv) + defer codec.Close() + + testrpc.WaitForLeader(t, srv.RPC, "dc1") + + state := srv.fsm.State() + + // Initially empty + _, entries, err := state.SystemMetadataList(nil) + require.NoError(t, err) + require.Len(t, entries, 0) + + // Create 3 + require.NoError(t, setSystemMetadataKey(srv, "key1", "val1")) + require.NoError(t, setSystemMetadataKey(srv, "key2", "val2")) + require.NoError(t, setSystemMetadataKey(srv, "key3", "")) + + mapify := func(entries []*structs.SystemMetadataEntry) map[string]string { + m := make(map[string]string) + for _, entry := range entries { + m[entry.Key] = entry.Value + } + return m + } + + _, entries, err = state.SystemMetadataList(nil) + require.NoError(t, err) + require.Len(t, entries, 3) + + require.Equal(t, map[string]string{ + "key1": "val1", + "key2": "val2", + "key3": "", + }, mapify(entries)) + + // Update one and delete one. + require.NoError(t, setSystemMetadataKey(srv, "key3", "val3")) + require.NoError(t, deleteSystemMetadataKey(srv, "key1")) + + _, entries, err = state.SystemMetadataList(nil) + require.NoError(t, err) + require.Len(t, entries, 2) + + require.Equal(t, map[string]string{ + "key2": "val2", + "key3": "val3", + }, mapify(entries)) +} + +// Note when this behavior is actually used, consider promoting these 2 +// functions out of test code. + +func setSystemMetadataKey(s *Server, key, val string) error { + args := &structs.SystemMetadataRequest{ + Op: structs.SystemMetadataUpsert, + Entry: &structs.SystemMetadataEntry{ + Key: key, Value: val, + }, + } + + resp, err := s.raftApply(structs.SystemMetadataRequestType, args) + if err != nil { + return err + } + if respErr, ok := resp.(error); ok { + return respErr + } + + return nil +} + +func deleteSystemMetadataKey(s *Server, key string) error { + args := &structs.SystemMetadataRequest{ + Op: structs.SystemMetadataDelete, + Entry: &structs.SystemMetadataEntry{ + Key: key, + }, + } + + resp, err := s.raftApply(structs.SystemMetadataRequestType, args) + if err != nil { + return err + } + if respErr, ok := resp.(error); ok { + return respErr + } + + return nil +} diff --git a/agent/structs/structs.go b/agent/structs/structs.go index e133a0cac0..ca730e7449 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -68,6 +68,7 @@ const ( ACLAuthMethodDeleteRequestType = 28 ChunkingStateType = 29 FederationStateRequestType = 30 + SystemMetadataRequestType = 31 ) const ( diff --git a/agent/structs/system_metadata.go b/agent/structs/system_metadata.go new file mode 100644 index 0000000000..32e519c313 --- /dev/null +++ b/agent/structs/system_metadata.go @@ -0,0 +1,36 @@ +package structs + +// SystemMetadataOp is the operation for a request related to system metadata. +type SystemMetadataOp string + +const ( + SystemMetadataUpsert SystemMetadataOp = "upsert" + SystemMetadataDelete SystemMetadataOp = "delete" +) + +// SystemMetadataRequest is used to upsert and delete system metadata. +type SystemMetadataRequest struct { + // Datacenter is the target for this request. + Datacenter string + + // Op is the type of operation being requested. + Op SystemMetadataOp + + // Entry is the key to modify. + Entry *SystemMetadataEntry + + // WriteRequest is a common struct containing ACL tokens and other + // write-related common elements for requests. + WriteRequest +} + +type SystemMetadataEntry struct { + Key string + Value string `json:",omitempty"` + RaftIndex +} + +// RequestDatacenter returns the datacenter for a given request. +func (c *SystemMetadataRequest) RequestDatacenter() string { + return c.Datacenter +}