From 17aa6a5a342176dde47ae47423eb1f43e879795d Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Tue, 19 Mar 2019 15:56:17 -0700 Subject: [PATCH] Fill out state store/FSM functions and add tests --- agent/consul/fsm/commands_oss.go | 20 +++ agent/consul/fsm/commands_oss_test.go | 36 ++++ agent/consul/state/config_entry.go | 157 ++++++++++++++++++ agent/consul/state/config_entry_test.go | 76 +++++++++ agent/consul/state/service_config.go | 127 -------------- .../{service_config.go => config_entry.go} | 61 +++++-- agent/structs/structs.go | 1 + 7 files changed, 337 insertions(+), 141 deletions(-) create mode 100644 agent/consul/state/config_entry.go create mode 100644 agent/consul/state/config_entry_test.go delete mode 100644 agent/consul/state/service_config.go rename agent/structs/{service_config.go => config_entry.go} (58%) diff --git a/agent/consul/fsm/commands_oss.go b/agent/consul/fsm/commands_oss.go index 0e4b972f74..ac503fd5b1 100644 --- a/agent/consul/fsm/commands_oss.go +++ b/agent/consul/fsm/commands_oss.go @@ -29,6 +29,7 @@ func init() { registerCommand(structs.ACLPolicySetRequestType, (*FSM).applyACLPolicySetOperation) registerCommand(structs.ACLPolicyDeleteRequestType, (*FSM).applyACLPolicyDeleteOperation) registerCommand(structs.ConnectCALeafRequestType, (*FSM).applyConnectCALeafOperation) + registerCommand(structs.ConfigEntryRequestType, (*FSM).applyConfigEntryOperation) } func (c *FSM) applyRegister(buf []byte, index uint64) interface{} { @@ -428,3 +429,22 @@ func (c *FSM) applyACLPolicyDeleteOperation(buf []byte, index uint64) interface{ return c.state.ACLPolicyBatchDelete(index, req.PolicyIDs) } + +func (c *FSM) applyConfigEntryOperation(buf []byte, index uint64) interface{} { + var req structs.ConfigEntryRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + switch req.Op { + case structs.ConfigEntryUpsert: + defer metrics.MeasureSinceWithLabels([]string{"fsm", "config_entry"}, time.Now(), + []metrics.Label{{Name: "op", Value: "upsert"}}) + return c.state.EnsureConfigEntry(index, req.Entry) + case structs.ConfigEntryDelete: + defer metrics.MeasureSinceWithLabels([]string{"fsm", "config_entry"}, time.Now(), + []metrics.Label{{Name: "op", Value: "delete"}}) + return c.state.DeleteConfigEntry(req.Entry.GetKind(), req.Entry.GetName()) + default: + return fmt.Errorf("invalid config entry operation type: %v", req.Op) + } +} diff --git a/agent/consul/fsm/commands_oss_test.go b/agent/consul/fsm/commands_oss_test.go index 6778a3f7e8..d8a98644e0 100644 --- a/agent/consul/fsm/commands_oss_test.go +++ b/agent/consul/fsm/commands_oss_test.go @@ -1355,3 +1355,39 @@ func TestFSM_CABuiltinProvider(t *testing.T) { assert.Equal(expected, state) } } + +func TestFSM_ConfigEntry(t *testing.T) { + t.Parallel() + + assert := assert.New(t) + fsm, err := New(nil, os.Stderr) + assert.Nil(err) + + // Roots + entry := &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: "global", + ProxyConfig: structs.ConnectProxyConfig{ + DestinationServiceName: "foo", + }, + } + + // Create a new request. + req := structs.ConfigEntryRequest{ + Op: structs.ConfigEntryUpsert, + Entry: entry, + } + + { + buf, err := structs.Encode(structs.ConfigEntryRequestType, req) + assert.Nil(err) + assert.True(fsm.Apply(makeLog(buf)).(bool)) + } + + // Verify it's in the state store. + { + _, config, err := fsm.state.ConfigEntry(structs.ProxyDefaults, "global") + assert.Nil(err) + assert.Equal(entry, config) + } +} diff --git a/agent/consul/state/config_entry.go b/agent/consul/state/config_entry.go new file mode 100644 index 0000000000..0a1e2a70aa --- /dev/null +++ b/agent/consul/state/config_entry.go @@ -0,0 +1,157 @@ +package state + +import ( + "fmt" + + "github.com/hashicorp/consul/agent/structs" + memdb "github.com/hashicorp/go-memdb" +) + +const ( + configTableName = "config-entries" +) + +// configTableSchema returns a new table schema used to store global service +// and proxy configurations. +func configTableSchema() *memdb.TableSchema { + return &memdb.TableSchema{ + Name: configTableName, + Indexes: map[string]*memdb.IndexSchema{ + "id": &memdb.IndexSchema{ + Name: "id", + AllowMissing: false, + Unique: true, + Indexer: &memdb.CompoundIndex{ + Indexes: []memdb.Indexer{ + &memdb.StringFieldIndex{ + Field: "Kind", + Lowercase: true, + }, + &memdb.StringFieldIndex{ + Field: "Name", + Lowercase: true, + }, + }, + }, + }, + }, + } +} + +func init() { + registerSchema(configTableSchema) +} + +// ConfigEntries is used to pull all the config entries for the snapshot. +func (s *Snapshot) ConfigEntries() ([]structs.ConfigEntry, error) { + ixns, err := s.tx.Get(configTableName, "id") + if err != nil { + return nil, err + } + + var ret []structs.ConfigEntry + for wrapped := ixns.Next(); wrapped != nil; wrapped = ixns.Next() { + ret = append(ret, wrapped.(structs.ConfigEntry)) + } + + return ret, nil +} + +// Configuration is used when restoring from a snapshot. +func (s *Restore) ConfigEntry(c structs.ConfigEntry) error { + // Insert + if err := s.tx.Insert(configTableName, c); err != nil { + return fmt.Errorf("failed restoring config entry object: %s", err) + } + if err := indexUpdateMaxTxn(s.tx, c.GetRaftIndex().ModifyIndex, configTableName); err != nil { + return fmt.Errorf("failed updating index: %s", err) + } + + return nil +} + +// Configuration is called to get a given config entry. +func (s *Store) ConfigEntry(kind, name string) (uint64, structs.ConfigEntry, error) { + tx := s.db.Txn(true) + defer tx.Abort() + + // Get the index + idx := maxIndexTxn(tx, configTableName) + + // Get the existing config entry. + existing, err := tx.First(configTableName, "id", kind, name) + if err != nil { + return 0, nil, fmt.Errorf("failed config entry lookup: %s", err) + } + if existing == nil { + return 0, nil, nil + } + + conf, ok := existing.(structs.ConfigEntry) + if !ok { + return 0, nil, fmt.Errorf("config entry %q (%s) is an invalid type: %T", name, kind, conf) + } + + return idx, conf, nil +} + +// EnsureConfigEntry is called to upsert creation of a given config entry. +func (s *Store) EnsureConfigEntry(idx uint64, conf structs.ConfigEntry) error { + tx := s.db.Txn(true) + defer tx.Abort() + + // Check for existing configuration. + existing, err := tx.First(configTableName, "id", conf.GetKind(), conf.GetName()) + if err != nil { + return fmt.Errorf("failed configuration lookup: %s", err) + } + + raftIndex := conf.GetRaftIndex() + if existing != nil { + existingIdx := existing.(structs.ConfigEntry).GetRaftIndex() + raftIndex.CreateIndex = existingIdx.CreateIndex + raftIndex.ModifyIndex = existingIdx.ModifyIndex + } else { + raftIndex.CreateIndex = idx + } + raftIndex.ModifyIndex = idx + + // Insert the config entry and update the index + if err := tx.Insert(configTableName, conf); err != nil { + return fmt.Errorf("failed inserting config entry: %s", err) + } + if err := tx.Insert("index", &IndexEntry{configTableName, idx}); err != nil { + return fmt.Errorf("failed updating index: %s", err) + } + + tx.Commit() + return nil +} + +func (s *Store) DeleteConfigEntry(kind, name string) error { + tx := s.db.Txn(true) + defer tx.Abort() + + // Get the index + idx := maxIndexTxn(tx, configTableName) + + // Try to retrieve the existing health check. + existing, err := tx.First(configTableName, "id", kind, name) + if err != nil { + return fmt.Errorf("failed config entry lookup: %s", err) + } + if existing == nil { + return nil + } + + // Delete the config entry from the DB and update the index. + if err := tx.Delete(configTableName, existing); err != nil { + return fmt.Errorf("failed removing check: %s", err) + } + if err := tx.Insert("index", &IndexEntry{configTableName, idx}); err != nil { + return fmt.Errorf("failed updating index: %s", err) + } + + tx.Commit() + return nil +} diff --git a/agent/consul/state/config_entry_test.go b/agent/consul/state/config_entry_test.go new file mode 100644 index 0000000000..7c8a1d0b38 --- /dev/null +++ b/agent/consul/state/config_entry_test.go @@ -0,0 +1,76 @@ +package state + +import ( + "reflect" + "testing" + + "github.com/hashicorp/consul/agent/structs" +) + +func TestStore_ConfigEntry(t *testing.T) { + s := testStateStore(t) + + expected := &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: "global", + ProxyConfig: structs.ConnectProxyConfig{ + DestinationServiceName: "foo", + }, + } + + // Create + if err := s.EnsureConfigEntry(0, expected); err != nil { + t.Fatal(err) + } + + { + idx, config, err := s.ConfigEntry(structs.ProxyDefaults, "global") + if err != nil { + t.Fatal(err) + } + if idx != 0 { + t.Fatalf("bad: %d", idx) + } + if !reflect.DeepEqual(expected, config) { + t.Fatalf("bad: %#v, %#v", expected, config) + } + } + + // Update + updated := &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: "global", + ProxyConfig: structs.ConnectProxyConfig{ + DestinationServiceName: "bar", + }, + } + if err := s.EnsureConfigEntry(1, updated); err != nil { + t.Fatal(err) + } + + { + idx, config, err := s.ConfigEntry(structs.ProxyDefaults, "global") + if err != nil { + t.Fatal(err) + } + if idx != 1 { + t.Fatalf("bad: %d", idx) + } + if !reflect.DeepEqual(updated, config) { + t.Fatalf("bad: %#v, %#v", updated, config) + } + } + + // Delete + if err := s.DeleteConfigEntry(structs.ProxyDefaults, "global"); err != nil { + t.Fatal(err) + } + + _, config, err := s.ConfigEntry(structs.ProxyDefaults, "global") + if err != nil { + t.Fatal(err) + } + if config != nil { + t.Fatalf("config should be deleted: %v", config) + } +} diff --git a/agent/consul/state/service_config.go b/agent/consul/state/service_config.go deleted file mode 100644 index 7dd56bf57a..0000000000 --- a/agent/consul/state/service_config.go +++ /dev/null @@ -1,127 +0,0 @@ -package state - -import ( - "fmt" - - "github.com/hashicorp/consul/agent/structs" - memdb "github.com/hashicorp/go-memdb" -) - -const ( - configTableName = "configurations" -) - -// configTableSchema returns a new table schema used to store global service -// and proxy configurations. -func configTableSchema() *memdb.TableSchema { - return &memdb.TableSchema{ - Name: configTableName, - Indexes: map[string]*memdb.IndexSchema{ - "id": &memdb.IndexSchema{ - Name: "id", - AllowMissing: false, - Unique: true, - Indexer: &memdb.CompoundIndex{ - Indexes: []memdb.Indexer{ - &memdb.StringFieldIndex{ - Field: "Kind", - Lowercase: true, - }, - &memdb.StringFieldIndex{ - Field: "Name", - Lowercase: true, - }, - }, - }, - }, - }, - } -} - -func init() { - registerSchema(configTableSchema) -} - -// Configurations is used to pull all the configurations for the snapshot. -func (s *Snapshot) Configurations() ([]structs.Configuration, error) { - ixns, err := s.tx.Get(configTableName, "id") - if err != nil { - return nil, err - } - - var ret []structs.Configuration - for wrapped := ixns.Next(); wrapped != nil; wrapped = ixns.Next() { - ret = append(ret, wrapped.(structs.Configuration)) - } - - return ret, nil -} - -// Configuration is used when restoring from a snapshot. -func (s *Restore) Configuration(c structs.Configuration) error { - // Insert - if err := s.tx.Insert(configTableName, c); err != nil { - return fmt.Errorf("failed restoring configuration object: %s", err) - } - if err := indexUpdateMaxTxn(s.tx, c.ModifyIndex, configTableName); err != nil { - return fmt.Errorf("failed updating index: %s", err) - } - - return nil -} - -// EnsureConfiguration is called to upsert creation of a given configuration. -func (s *Store) EnsureConfiguration(idx uint64, conf structs.Configuration) error { - tx := s.db.Txn(true) - defer tx.Abort() - - // Does it make sense to validate here? We do this for service meta in the state store - // but could also do this in RPC endpoint. More version compatibility that way? - if err := conf.Validate(); err != nil { - return fmt.Errorf("failed validating config: %v", err) - } - - // Check for existing configuration. - existing, err := tx.First("configurations", "id", conf.GetKind(), conf.GetName()) - if err != nil { - return fmt.Errorf("failed configuration lookup: %s", err) - } - - if existing != nil { - conf.CreateIndex = serviceNode.CreateIndex - conf.ModifyIndex = serviceNode.ModifyIndex - } else { - conf.CreateIndex = idx - } - conf.ModifyIndex = idx - - // Insert the configuration and update the index - if err := tx.Insert("configurations", conf); err != nil { - return fmt.Errorf("failed inserting service: %s", err) - } - if err := tx.Insert("index", &IndexEntry{"configurations", idx}); err != nil { - return fmt.Errorf("failed updating index: %s", err) - } - - tx.Commit() - return nil -} - -// Configuration is called to get a given configuration. -func (s *Store) Configuration(idx uint64, kind structs.ConfigurationKind, name string) (structs.Configuration, error) { - tx := s.db.Txn(true) - defer tx.Abort() - - // Get the existing configuration. - existing, err := tx.First("configurations", "id", kind, name) - if err != nil { - return nil, fmt.Errorf("failed configuration lookup: %s", err) - } - - conf, ok := existing.(structs.Configuration) - if !ok { - return nil, fmt.Errorf("configuration %q (%s) is an invalid type: %T", name, kind, conf) - } - - return conf, nil -} diff --git a/agent/structs/service_config.go b/agent/structs/config_entry.go similarity index 58% rename from agent/structs/service_config.go rename to agent/structs/config_entry.go index 1a65108183..0d070e1195 100644 --- a/agent/structs/service_config.go +++ b/agent/structs/config_entry.go @@ -1,23 +1,26 @@ package structs -type ConfigurationKind string - const ( - ServiceDefaults ConfigurationKind = "service-defaults" - ProxyDefaults ConfigurationKind = "proxy-defaults" + ServiceDefaults string = "service-defaults" + ProxyDefaults string = "proxy-defaults" ) -// Should this be an interface or a switch on the existing config types? -type Configuration interface { - GetKind() ConfigurationKind +// ConfigEntry is the +type ConfigEntry interface { + GetKind() string GetName() string + + // This is called in the RPC endpoint and can apply defaults + Normalize() error Validate() error + + GetRaftIndex() *RaftIndex } // ServiceConfiguration is the top-level struct for the configuration of a service // across the entire cluster. -type ServiceConfiguration struct { - Kind ConfigurationKind +type ServiceConfigEntry struct { + Kind string Name string Protocol string Connect ConnectConfiguration @@ -26,7 +29,7 @@ type ServiceConfiguration struct { RaftIndex } -func (s *ServiceConfiguration) GetKind() ConfigurationKind { +func (s *ServiceConfigEntry) GetKind() string { return ServiceDefaults } @@ -63,13 +66,43 @@ type ServiceDefinitionDefaults struct { DisableDirectDiscovery bool } -// ProxyConfiguration is the top-level struct for global proxy configuration defaults. -type ProxyConfiguration struct { - Kind ConfigurationKind +// ProxyConfigEntry is the top-level struct for global proxy configuration defaults. +type ProxyConfigEntry struct { + Kind string Name string ProxyConfig ConnectProxyConfig + + RaftIndex } -func (p *ProxyConfiguration) GetKind() ConfigurationKind { +func (p *ProxyConfigEntry) GetKind() string { return ProxyDefaults } + +func (p *ProxyConfigEntry) GetName() string { + return p.Name +} + +func (p *ProxyConfigEntry) Normalize() error { + return nil +} + +func (p *ProxyConfigEntry) Validate() error { + return nil +} + +func (p *ProxyConfigEntry) GetRaftIndex() *RaftIndex { + return &p.RaftIndex +} + +type ConfigEntryOp string + +const ( + ConfigEntryUpsert ConfigEntryOp = "upsert" + ConfigEntryDelete ConfigEntryOp = "delete" +) + +type ConfigEntryRequest struct { + Op ConfigEntryOp + Entry ConfigEntry +} diff --git a/agent/structs/structs.go b/agent/structs/structs.go index f3c3a024b3..56d86ab529 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -55,6 +55,7 @@ const ( ACLPolicySetRequestType = 19 ACLPolicyDeleteRequestType = 20 ConnectCALeafRequestType = 21 + ConfigEntryRequestType = 22 ) const (