Restoring config entries updates the gateway-services table (#7811)

- Adds a new validateConfigEntryEnterprise function
- Also fixes some state store tests that were failing in enterprise
pull/7713/head
Chris Piraino 2020-05-08 13:24:33 -05:00 committed by GitHub
parent c32a4f1ece
commit 429d0cedd2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 60 additions and 30 deletions

View File

@ -244,6 +244,25 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
require.NoError(fsm.state.EnsureConfigEntry(18, serviceConfig, structs.DefaultEnterpriseMeta()))
require.NoError(fsm.state.EnsureConfigEntry(19, proxyConfig, structs.DefaultEnterpriseMeta()))
ingress := &structs.IngressGatewayConfigEntry{
Kind: structs.IngressGateway,
Name: "ingress",
Listeners: []structs.IngressListener{
{
Port: 8080,
Protocol: "http",
Services: []structs.IngressService{
{
Name: "foo",
},
},
},
},
}
require.NoError(fsm.state.EnsureConfigEntry(20, ingress, structs.DefaultEnterpriseMeta()))
_, gatewayServices, err := fsm.state.GatewayServices(nil, "ingress", structs.DefaultEnterpriseMeta())
require.NoError(err)
// Raft Chunking
chunkState := &raftchunking.State{
ChunkMap: make(raftchunking.ChunkMap),
@ -593,6 +612,14 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
require.NoError(err)
assert.Equal(proxyConfig, proxyConfEntry)
_, ingressRestored, err := fsm2.state.ConfigEntry(nil, structs.IngressGateway, "ingress", structs.DefaultEnterpriseMeta())
require.NoError(err)
assert.Equal(ingress, ingressRestored)
_, restoredGatewayServices, err := fsm2.state.GatewayServices(nil, "ingress", structs.DefaultEnterpriseMeta())
require.NoError(err)
require.Equal(gatewayServices, restoredGatewayServices)
newChunkState, err := fsm2.chunker.CurrentState()
require.NoError(err)
assert.Equal(newChunkState, chunkState)

View File

@ -96,15 +96,7 @@ func (s *Snapshot) ConfigEntries() ([]structs.ConfigEntry, error) {
// ConfigEntry is used when restoring from a snapshot.
func (s *Restore) ConfigEntry(c structs.ConfigEntry) error {
// Insert
if err := s.tx.Insert(configTableName, c); err != nil {
return fmt.Errorf("failed restoring config entry object: %s", err)
}
if err := indexUpdateMaxTxn(s.tx, c.GetRaftIndex().ModifyIndex, configTableName); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
return nil
return s.store.insertConfigEntryWithTxn(s.tx, c.GetRaftIndex().ModifyIndex, c)
}
// ConfigEntry is called to get a given config entry.
@ -216,24 +208,11 @@ func (s *Store) ensureConfigEntryTxn(tx *memdb.Txn, idx uint64, conf structs.Con
return err // Err is already sufficiently decorated.
}
// If the config entry is for a terminating or ingress gateway we update the memdb table
// that associates gateways <-> services.
if conf.GetKind() == structs.TerminatingGateway || conf.GetKind() == structs.IngressGateway {
err = s.updateGatewayServices(tx, idx, conf, entMeta)
if err != nil {
return fmt.Errorf("failed to associate services to gateway: %v", err)
}
if err := s.validateConfigEntryEnterprise(tx, conf); err != nil {
return err
}
// Insert the config entry and update the index
if err := s.insertConfigEntryWithTxn(tx, conf); err != nil {
return fmt.Errorf("failed inserting config entry: %s", err)
}
if err := indexUpdateMaxTxn(tx, idx, configTableName); err != nil {
return fmt.Errorf("failed updating index: %v", err)
}
return nil
return s.insertConfigEntryWithTxn(tx, idx, conf)
}
// EnsureConfigEntryCAS is called to do a check-and-set upsert of a given config entry.
@ -319,6 +298,30 @@ func (s *Store) DeleteConfigEntry(idx uint64, kind, name string, entMeta *struct
return nil
}
func (s *Store) insertConfigEntryWithTxn(tx *memdb.Txn, idx uint64, conf structs.ConfigEntry) error {
if conf == nil {
return fmt.Errorf("cannot insert nil config entry")
}
// If the config entry is for a terminating or ingress gateway we update the memdb table
// that associates gateways <-> services.
if conf.GetKind() == structs.TerminatingGateway || conf.GetKind() == structs.IngressGateway {
err := s.updateGatewayServices(tx, idx, conf, conf.GetEnterpriseMeta())
if err != nil {
return fmt.Errorf("failed to associate services to gateway: %v", err)
}
}
// Insert the config entry and update the index
if err := tx.Insert(configTableName, conf); err != nil {
return fmt.Errorf("failed inserting config entry: %s", err)
}
if err := indexUpdateMaxTxn(tx, idx, configTableName); err != nil {
return fmt.Errorf("failed updating index: %v", err)
}
return nil
}
// validateProposedConfigEntryInGraph can be used to verify graph integrity for
// a proposed graph create/update/delete.
//

View File

@ -59,8 +59,8 @@ func (s *Store) firstWatchConfigEntryWithTxn(tx *memdb.Txn,
return tx.FirstWatch(configTableName, "id", kind, name)
}
func (s *Store) insertConfigEntryWithTxn(tx *memdb.Txn, conf structs.ConfigEntry) error {
return tx.Insert(configTableName, conf)
func (s *Store) validateConfigEntryEnterprise(tx *memdb.Txn, conf structs.ConfigEntry) error {
return nil
}
func getAllConfigEntriesWithTxn(tx *memdb.Txn, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) {

View File

@ -1304,7 +1304,7 @@ func TestStore_ValidateIngressGatewayErrorOnMismatchedProtocols(t *testing.T) {
t.Run("default to tcp", func(t *testing.T) {
err := s.EnsureConfigEntry(0, ingress, nil)
require.Error(t, err)
require.Contains(t, err.Error(), `service "web" has protocol "tcp"`)
require.Contains(t, err.Error(), `has protocol "tcp"`)
})
t.Run("with proxy-default", func(t *testing.T) {
@ -1319,7 +1319,7 @@ func TestStore_ValidateIngressGatewayErrorOnMismatchedProtocols(t *testing.T) {
err := s.EnsureConfigEntry(1, ingress, nil)
require.Error(t, err)
require.Contains(t, err.Error(), `service "web" has protocol "http2"`)
require.Contains(t, err.Error(), `has protocol "http2"`)
})
t.Run("with service-defaults override", func(t *testing.T) {
@ -1331,7 +1331,7 @@ func TestStore_ValidateIngressGatewayErrorOnMismatchedProtocols(t *testing.T) {
require.NoError(t, s.EnsureConfigEntry(1, expected, nil))
err := s.EnsureConfigEntry(2, ingress, nil)
require.Error(t, err)
require.Contains(t, err.Error(), `service "web" has protocol "grpc"`)
require.Contains(t, err.Error(), `has protocol "grpc"`)
})
t.Run("with service-defaults correct protocol", func(t *testing.T) {