mirror of https://github.com/hashicorp/consul
Kyle Havlovitz
6 years ago
committed by
GitHub
8 changed files with 760 additions and 19 deletions
@ -0,0 +1,240 @@
|
||||
package state |
||||
|
||||
import ( |
||||
"fmt" |
||||
|
||||
"github.com/hashicorp/consul/agent/structs" |
||||
memdb "github.com/hashicorp/go-memdb" |
||||
) |
||||
|
||||
const ( |
||||
configTableName = "config-entries" |
||||
) |
||||
|
||||
// configTableSchema returns a new table schema used to store global
|
||||
// config entries.
|
||||
func configTableSchema() *memdb.TableSchema { |
||||
return &memdb.TableSchema{ |
||||
Name: configTableName, |
||||
Indexes: map[string]*memdb.IndexSchema{ |
||||
"id": &memdb.IndexSchema{ |
||||
Name: "id", |
||||
AllowMissing: false, |
||||
Unique: true, |
||||
Indexer: &memdb.CompoundIndex{ |
||||
Indexes: []memdb.Indexer{ |
||||
&memdb.StringFieldIndex{ |
||||
Field: "Kind", |
||||
Lowercase: true, |
||||
}, |
||||
&memdb.StringFieldIndex{ |
||||
Field: "Name", |
||||
Lowercase: true, |
||||
}, |
||||
}, |
||||
}, |
||||
}, |
||||
"kind": &memdb.IndexSchema{ |
||||
Name: "kind", |
||||
AllowMissing: false, |
||||
Unique: true, |
||||
Indexer: &memdb.StringFieldIndex{ |
||||
Field: "Kind", |
||||
Lowercase: true, |
||||
}, |
||||
}, |
||||
}, |
||||
} |
||||
} |
||||
|
||||
func init() { |
||||
registerSchema(configTableSchema) |
||||
} |
||||
|
||||
// ConfigEntries is used to pull all the config entries for the snapshot.
|
||||
func (s *Snapshot) ConfigEntries() ([]structs.ConfigEntry, error) { |
||||
entries, err := s.tx.Get(configTableName, "id") |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
var ret []structs.ConfigEntry |
||||
for wrapped := entries.Next(); wrapped != nil; wrapped = entries.Next() { |
||||
ret = append(ret, wrapped.(structs.ConfigEntry)) |
||||
} |
||||
|
||||
return ret, nil |
||||
} |
||||
|
||||
// ConfigEntry is used when restoring from a snapshot.
|
||||
func (s *Restore) ConfigEntry(c structs.ConfigEntry) error { |
||||
// Insert
|
||||
if err := s.tx.Insert(configTableName, c); err != nil { |
||||
return fmt.Errorf("failed restoring config entry object: %s", err) |
||||
} |
||||
if err := indexUpdateMaxTxn(s.tx, c.GetRaftIndex().ModifyIndex, configTableName); err != nil { |
||||
return fmt.Errorf("failed updating index: %s", err) |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// ConfigEntry is called to get a given config entry.
|
||||
func (s *Store) ConfigEntry(kind, name string) (uint64, structs.ConfigEntry, error) { |
||||
tx := s.db.Txn(false) |
||||
defer tx.Abort() |
||||
|
||||
// Get the index
|
||||
idx := maxIndexTxn(tx, configTableName) |
||||
|
||||
// Get the existing config entry.
|
||||
existing, err := tx.First(configTableName, "id", kind, name) |
||||
if err != nil { |
||||
return 0, nil, fmt.Errorf("failed config entry lookup: %s", err) |
||||
} |
||||
if existing == nil { |
||||
return idx, nil, nil |
||||
} |
||||
|
||||
conf, ok := existing.(structs.ConfigEntry) |
||||
if !ok { |
||||
return 0, nil, fmt.Errorf("config entry %q (%s) is an invalid type: %T", name, kind, conf) |
||||
} |
||||
|
||||
return idx, conf, nil |
||||
} |
||||
|
||||
// ConfigEntries is called to get all config entry objects.
|
||||
func (s *Store) ConfigEntries() (uint64, []structs.ConfigEntry, error) { |
||||
return s.ConfigEntriesByKind("") |
||||
} |
||||
|
||||
// ConfigEntriesByKind is called to get all config entry objects with the given kind.
|
||||
// If kind is empty, all config entries will be returned.
|
||||
func (s *Store) ConfigEntriesByKind(kind string) (uint64, []structs.ConfigEntry, error) { |
||||
tx := s.db.Txn(false) |
||||
defer tx.Abort() |
||||
|
||||
// Get the index
|
||||
idx := maxIndexTxn(tx, configTableName) |
||||
|
||||
// Lookup by kind, or all if kind is empty
|
||||
var iter memdb.ResultIterator |
||||
var err error |
||||
if kind != "" { |
||||
iter, err = tx.Get(configTableName, "kind", kind) |
||||
} else { |
||||
iter, err = tx.Get(configTableName, "id") |
||||
} |
||||
if err != nil { |
||||
return 0, nil, fmt.Errorf("failed config entry lookup: %s", err) |
||||
} |
||||
|
||||
var results []structs.ConfigEntry |
||||
for v := iter.Next(); v != nil; v = iter.Next() { |
||||
results = append(results, v.(structs.ConfigEntry)) |
||||
} |
||||
return idx, results, nil |
||||
} |
||||
|
||||
// EnsureConfigEntry is called to do an upsert of a given config entry.
|
||||
func (s *Store) EnsureConfigEntry(idx uint64, conf structs.ConfigEntry) error { |
||||
tx := s.db.Txn(true) |
||||
defer tx.Abort() |
||||
|
||||
if err := s.ensureConfigEntryTxn(tx, idx, conf); err != nil { |
||||
return err |
||||
} |
||||
|
||||
tx.Commit() |
||||
return nil |
||||
} |
||||
|
||||
// ensureConfigEntryTxn upserts a config entry inside of a transaction.
|
||||
func (s *Store) ensureConfigEntryTxn(tx *memdb.Txn, idx uint64, conf structs.ConfigEntry) error { |
||||
// Check for existing configuration.
|
||||
existing, err := tx.First(configTableName, "id", conf.GetKind(), conf.GetName()) |
||||
if err != nil { |
||||
return fmt.Errorf("failed configuration lookup: %s", err) |
||||
} |
||||
|
||||
raftIndex := conf.GetRaftIndex() |
||||
if existing != nil { |
||||
existingIdx := existing.(structs.ConfigEntry).GetRaftIndex() |
||||
raftIndex.CreateIndex = existingIdx.CreateIndex |
||||
raftIndex.ModifyIndex = existingIdx.ModifyIndex |
||||
} else { |
||||
raftIndex.CreateIndex = idx |
||||
} |
||||
raftIndex.ModifyIndex = idx |
||||
|
||||
// Insert the config entry and update the index
|
||||
if err := tx.Insert(configTableName, conf); err != nil { |
||||
return fmt.Errorf("failed inserting config entry: %s", err) |
||||
} |
||||
if err := indexUpdateMaxTxn(tx, idx, configTableName); err != nil { |
||||
return fmt.Errorf("failed updating index: %v", err) |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// EnsureConfigEntryCAS is called to do a check-and-set upsert of a given config entry.
|
||||
func (s *Store) EnsureConfigEntryCAS(idx, cidx uint64, conf structs.ConfigEntry) (bool, error) { |
||||
tx := s.db.Txn(true) |
||||
defer tx.Abort() |
||||
|
||||
// Check for existing configuration.
|
||||
existing, err := tx.First(configTableName, "id", conf.GetKind(), conf.GetName()) |
||||
if err != nil { |
||||
return false, fmt.Errorf("failed configuration lookup: %s", err) |
||||
} |
||||
|
||||
// Check if the we should do the set. A ModifyIndex of 0 means that
|
||||
// we are doing a set-if-not-exists.
|
||||
var existingIdx structs.RaftIndex |
||||
if existing != nil { |
||||
existingIdx = *existing.(structs.ConfigEntry).GetRaftIndex() |
||||
} |
||||
if cidx == 0 && existing != nil { |
||||
return false, nil |
||||
} |
||||
if cidx != 0 && existing == nil { |
||||
return false, nil |
||||
} |
||||
if existing != nil && cidx != 0 && cidx != existingIdx.ModifyIndex { |
||||
return false, nil |
||||
} |
||||
|
||||
if err := s.ensureConfigEntryTxn(tx, idx, conf); err != nil { |
||||
return false, err |
||||
} |
||||
|
||||
tx.Commit() |
||||
return true, nil |
||||
} |
||||
|
||||
func (s *Store) DeleteConfigEntry(idx uint64, kind, name string) error { |
||||
tx := s.db.Txn(true) |
||||
defer tx.Abort() |
||||
|
||||
// Try to retrieve the existing health check.
|
||||
existing, err := tx.First(configTableName, "id", kind, name) |
||||
if err != nil { |
||||
return fmt.Errorf("failed config entry lookup: %s", err) |
||||
} |
||||
if existing == nil { |
||||
return nil |
||||
} |
||||
|
||||
// Delete the config entry from the DB and update the index.
|
||||
if err := tx.Delete(configTableName, existing); err != nil { |
||||
return fmt.Errorf("failed removing check: %s", err) |
||||
} |
||||
if err := tx.Insert("index", &IndexEntry{configTableName, idx}); err != nil { |
||||
return fmt.Errorf("failed updating index: %s", err) |
||||
} |
||||
|
||||
tx.Commit() |
||||
return nil |
||||
} |
@ -0,0 +1,138 @@
|
||||
package state |
||||
|
||||
import ( |
||||
"testing" |
||||
|
||||
"github.com/hashicorp/consul/agent/structs" |
||||
"github.com/stretchr/testify/require" |
||||
) |
||||
|
||||
func TestStore_ConfigEntry(t *testing.T) { |
||||
require := require.New(t) |
||||
s := testStateStore(t) |
||||
|
||||
expected := &structs.ProxyConfigEntry{ |
||||
Kind: structs.ProxyDefaults, |
||||
Name: "global", |
||||
Config: map[string]interface{}{ |
||||
"DestinationServiceName": "foo", |
||||
}, |
||||
} |
||||
|
||||
// Create
|
||||
require.NoError(s.EnsureConfigEntry(0, expected)) |
||||
|
||||
idx, config, err := s.ConfigEntry(structs.ProxyDefaults, "global") |
||||
require.NoError(err) |
||||
require.Equal(uint64(0), idx) |
||||
require.Equal(expected, config) |
||||
|
||||
// Update
|
||||
updated := &structs.ProxyConfigEntry{ |
||||
Kind: structs.ProxyDefaults, |
||||
Name: "global", |
||||
Config: map[string]interface{}{ |
||||
"DestinationServiceName": "bar", |
||||
}, |
||||
} |
||||
require.NoError(s.EnsureConfigEntry(1, updated)) |
||||
|
||||
idx, config, err = s.ConfigEntry(structs.ProxyDefaults, "global") |
||||
require.NoError(err) |
||||
require.Equal(uint64(1), idx) |
||||
require.Equal(updated, config) |
||||
|
||||
// Delete
|
||||
require.NoError(s.DeleteConfigEntry(2, structs.ProxyDefaults, "global")) |
||||
|
||||
idx, config, err = s.ConfigEntry(structs.ProxyDefaults, "global") |
||||
require.NoError(err) |
||||
require.Equal(uint64(2), idx) |
||||
require.Nil(config) |
||||
} |
||||
|
||||
func TestStore_ConfigEntryCAS(t *testing.T) { |
||||
require := require.New(t) |
||||
s := testStateStore(t) |
||||
|
||||
expected := &structs.ProxyConfigEntry{ |
||||
Kind: structs.ProxyDefaults, |
||||
Name: "global", |
||||
Config: map[string]interface{}{ |
||||
"DestinationServiceName": "foo", |
||||
}, |
||||
} |
||||
|
||||
// Create
|
||||
require.NoError(s.EnsureConfigEntry(1, expected)) |
||||
|
||||
idx, config, err := s.ConfigEntry(structs.ProxyDefaults, "global") |
||||
require.NoError(err) |
||||
require.Equal(uint64(1), idx) |
||||
require.Equal(expected, config) |
||||
|
||||
// Update with invalid index
|
||||
updated := &structs.ProxyConfigEntry{ |
||||
Kind: structs.ProxyDefaults, |
||||
Name: "global", |
||||
Config: map[string]interface{}{ |
||||
"DestinationServiceName": "bar", |
||||
}, |
||||
} |
||||
ok, err := s.EnsureConfigEntryCAS(2, 99, updated) |
||||
require.False(ok) |
||||
require.NoError(err) |
||||
|
||||
// Entry should not be changed
|
||||
idx, config, err = s.ConfigEntry(structs.ProxyDefaults, "global") |
||||
require.NoError(err) |
||||
require.Equal(uint64(1), idx) |
||||
require.Equal(expected, config) |
||||
|
||||
// Update with a valid index
|
||||
ok, err = s.EnsureConfigEntryCAS(2, 1, updated) |
||||
require.True(ok) |
||||
require.NoError(err) |
||||
|
||||
// Entry should be updated
|
||||
idx, config, err = s.ConfigEntry(structs.ProxyDefaults, "global") |
||||
require.NoError(err) |
||||
require.Equal(uint64(2), idx) |
||||
require.Equal(updated, config) |
||||
} |
||||
|
||||
func TestStore_ConfigEntries(t *testing.T) { |
||||
require := require.New(t) |
||||
s := testStateStore(t) |
||||
|
||||
// Create some config entries.
|
||||
entry1 := &structs.ProxyConfigEntry{ |
||||
Kind: structs.ProxyDefaults, |
||||
Name: "test1", |
||||
} |
||||
entry2 := &structs.ServiceConfigEntry{ |
||||
Kind: structs.ServiceDefaults, |
||||
Name: "test2", |
||||
} |
||||
|
||||
require.NoError(s.EnsureConfigEntry(0, entry1)) |
||||
require.NoError(s.EnsureConfigEntry(1, entry2)) |
||||
|
||||
// Get all entries
|
||||
idx, entries, err := s.ConfigEntries() |
||||
require.NoError(err) |
||||
require.Equal(uint64(1), idx) |
||||
require.Equal([]structs.ConfigEntry{entry1, entry2}, entries) |
||||
|
||||
// Get all proxy entries
|
||||
idx, entries, err = s.ConfigEntriesByKind(structs.ProxyDefaults) |
||||
require.NoError(err) |
||||
require.Equal(uint64(1), idx) |
||||
require.Equal([]structs.ConfigEntry{entry1}, entries) |
||||
|
||||
// Get all service entries
|
||||
idx, entries, err = s.ConfigEntriesByKind(structs.ServiceDefaults) |
||||
require.NoError(err) |
||||
require.Equal(uint64(1), idx) |
||||
require.Equal([]structs.ConfigEntry{entry2}, entries) |
||||
} |
@ -0,0 +1,230 @@
|
||||
package structs |
||||
|
||||
import ( |
||||
"fmt" |
||||
"strings" |
||||
|
||||
"github.com/hashicorp/go-msgpack/codec" |
||||
) |
||||
|
||||
const ( |
||||
ServiceDefaults string = "service-defaults" |
||||
ProxyDefaults string = "proxy-defaults" |
||||
|
||||
ProxyConfigGlobal string = "global" |
||||
|
||||
DefaultServiceProtocol = "tcp" |
||||
) |
||||
|
||||
// ConfigEntry is the
|
||||
type ConfigEntry interface { |
||||
GetKind() string |
||||
GetName() string |
||||
|
||||
// This is called in the RPC endpoint and can apply defaults or limits.
|
||||
Normalize() error |
||||
Validate() error |
||||
|
||||
GetRaftIndex() *RaftIndex |
||||
} |
||||
|
||||
// ServiceConfiguration is the top-level struct for the configuration of a service
|
||||
// across the entire cluster.
|
||||
type ServiceConfigEntry struct { |
||||
Kind string |
||||
Name string |
||||
Protocol string |
||||
Connect ConnectConfiguration |
||||
ServiceDefinitionDefaults ServiceDefinitionDefaults |
||||
|
||||
RaftIndex |
||||
} |
||||
|
||||
func (e *ServiceConfigEntry) GetKind() string { |
||||
return ServiceDefaults |
||||
} |
||||
|
||||
func (e *ServiceConfigEntry) GetName() string { |
||||
if e == nil { |
||||
return "" |
||||
} |
||||
|
||||
return e.Name |
||||
} |
||||
|
||||
func (e *ServiceConfigEntry) Normalize() error { |
||||
if e == nil { |
||||
return fmt.Errorf("config entry is nil") |
||||
} |
||||
|
||||
e.Kind = ServiceDefaults |
||||
if e.Protocol == "" { |
||||
e.Protocol = DefaultServiceProtocol |
||||
} else { |
||||
e.Protocol = strings.ToLower(e.Protocol) |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func (e *ServiceConfigEntry) Validate() error { |
||||
return nil |
||||
} |
||||
|
||||
func (e *ServiceConfigEntry) GetRaftIndex() *RaftIndex { |
||||
if e == nil { |
||||
return &RaftIndex{} |
||||
} |
||||
|
||||
return &e.RaftIndex |
||||
} |
||||
|
||||
type ConnectConfiguration struct { |
||||
SidecarProxy bool |
||||
} |
||||
|
||||
type ServiceDefinitionDefaults struct { |
||||
EnableTagOverride bool |
||||
|
||||
// Non script/docker checks only
|
||||
Check *HealthCheck |
||||
Checks HealthChecks |
||||
|
||||
// Kind is allowed to accommodate non-sidecar proxies but it will be an error
|
||||
// if they also set Connect.DestinationServiceID since sidecars are
|
||||
// configured via their associated service's config.
|
||||
Kind ServiceKind |
||||
|
||||
// Only DestinationServiceName and Config are supported.
|
||||
Proxy ConnectProxyConfig |
||||
|
||||
Connect ServiceConnect |
||||
|
||||
Weights Weights |
||||
} |
||||
|
||||
// ProxyConfigEntry is the top-level struct for global proxy configuration defaults.
|
||||
type ProxyConfigEntry struct { |
||||
Kind string |
||||
Name string |
||||
Config map[string]interface{} |
||||
|
||||
RaftIndex |
||||
} |
||||
|
||||
func (e *ProxyConfigEntry) GetKind() string { |
||||
return ProxyDefaults |
||||
} |
||||
|
||||
func (e *ProxyConfigEntry) GetName() string { |
||||
if e == nil { |
||||
return "" |
||||
} |
||||
|
||||
return e.Name |
||||
} |
||||
|
||||
func (e *ProxyConfigEntry) Normalize() error { |
||||
if e == nil { |
||||
return fmt.Errorf("config entry is nil") |
||||
} |
||||
|
||||
e.Kind = ProxyDefaults |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func (e *ProxyConfigEntry) Validate() error { |
||||
if e == nil { |
||||
return fmt.Errorf("config entry is nil") |
||||
} |
||||
|
||||
if e.Name != ProxyConfigGlobal { |
||||
return fmt.Errorf("invalid name (%q), only %q is supported", e.Name, ProxyConfigGlobal) |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func (e *ProxyConfigEntry) GetRaftIndex() *RaftIndex { |
||||
if e == nil { |
||||
return &RaftIndex{} |
||||
} |
||||
|
||||
return &e.RaftIndex |
||||
} |
||||
|
||||
type ConfigEntryOp string |
||||
|
||||
const ( |
||||
ConfigEntryUpsert ConfigEntryOp = "upsert" |
||||
ConfigEntryDelete ConfigEntryOp = "delete" |
||||
) |
||||
|
||||
type ConfigEntryRequest struct { |
||||
Op ConfigEntryOp |
||||
Entry ConfigEntry |
||||
} |
||||
|
||||
func (r *ConfigEntryRequest) MarshalBinary() (data []byte, err error) { |
||||
// bs will grow if needed but allocate enough to avoid reallocation in common
|
||||
// case.
|
||||
bs := make([]byte, 128) |
||||
enc := codec.NewEncoderBytes(&bs, msgpackHandle) |
||||
// Encode kind first
|
||||
err = enc.Encode(r.Entry.GetKind()) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
// Then actual value using alias trick to avoid infinite recursion
|
||||
type Alias ConfigEntryRequest |
||||
err = enc.Encode(struct { |
||||
*Alias |
||||
}{ |
||||
Alias: (*Alias)(r), |
||||
}) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return bs, nil |
||||
} |
||||
|
||||
func (r *ConfigEntryRequest) UnmarshalBinary(data []byte) error { |
||||
// First decode the kind prefix
|
||||
var kind string |
||||
dec := codec.NewDecoderBytes(data, msgpackHandle) |
||||
if err := dec.Decode(&kind); err != nil { |
||||
return err |
||||
} |
||||
|
||||
// Then decode the real thing with appropriate kind of ConfigEntry
|
||||
entry, err := makeConfigEntry(kind) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
r.Entry = entry |
||||
|
||||
// Alias juggling to prevent infinite recursive calls back to this decode
|
||||
// method.
|
||||
type Alias ConfigEntryRequest |
||||
as := struct { |
||||
*Alias |
||||
}{ |
||||
Alias: (*Alias)(r), |
||||
} |
||||
if err := dec.Decode(&as); err != nil { |
||||
return err |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func makeConfigEntry(kind string) (ConfigEntry, error) { |
||||
switch kind { |
||||
case ServiceDefaults: |
||||
return &ServiceConfigEntry{}, nil |
||||
case ProxyDefaults: |
||||
return &ProxyConfigEntry{}, nil |
||||
default: |
||||
return nil, fmt.Errorf("invalid config entry kind: %s", kind) |
||||
} |
||||
} |
Loading…
Reference in new issue