mirror of https://github.com/hashicorp/consul
parent
9d07add047
commit
17aa6a5a34
@ -0,0 +1,157 @@
|
||||
package state
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
memdb "github.com/hashicorp/go-memdb"
|
||||
)
|
||||
|
||||
const (
|
||||
configTableName = "config-entries"
|
||||
)
|
||||
|
||||
// configTableSchema returns a new table schema used to store global service
|
||||
// and proxy configurations.
|
||||
func configTableSchema() *memdb.TableSchema {
|
||||
return &memdb.TableSchema{
|
||||
Name: configTableName,
|
||||
Indexes: map[string]*memdb.IndexSchema{
|
||||
"id": &memdb.IndexSchema{
|
||||
Name: "id",
|
||||
AllowMissing: false,
|
||||
Unique: true,
|
||||
Indexer: &memdb.CompoundIndex{
|
||||
Indexes: []memdb.Indexer{
|
||||
&memdb.StringFieldIndex{
|
||||
Field: "Kind",
|
||||
Lowercase: true,
|
||||
},
|
||||
&memdb.StringFieldIndex{
|
||||
Field: "Name",
|
||||
Lowercase: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func init() {
|
||||
registerSchema(configTableSchema)
|
||||
}
|
||||
|
||||
// ConfigEntries is used to pull all the config entries for the snapshot.
|
||||
func (s *Snapshot) ConfigEntries() ([]structs.ConfigEntry, error) {
|
||||
ixns, err := s.tx.Get(configTableName, "id")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var ret []structs.ConfigEntry
|
||||
for wrapped := ixns.Next(); wrapped != nil; wrapped = ixns.Next() {
|
||||
ret = append(ret, wrapped.(structs.ConfigEntry))
|
||||
}
|
||||
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
// Configuration is used when restoring from a snapshot.
|
||||
func (s *Restore) ConfigEntry(c structs.ConfigEntry) error {
|
||||
// Insert
|
||||
if err := s.tx.Insert(configTableName, c); err != nil {
|
||||
return fmt.Errorf("failed restoring config entry object: %s", err)
|
||||
}
|
||||
if err := indexUpdateMaxTxn(s.tx, c.GetRaftIndex().ModifyIndex, configTableName); err != nil {
|
||||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Configuration is called to get a given config entry.
|
||||
func (s *Store) ConfigEntry(kind, name string) (uint64, structs.ConfigEntry, error) {
|
||||
tx := s.db.Txn(true)
|
||||
defer tx.Abort()
|
||||
|
||||
// Get the index
|
||||
idx := maxIndexTxn(tx, configTableName)
|
||||
|
||||
// Get the existing config entry.
|
||||
existing, err := tx.First(configTableName, "id", kind, name)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed config entry lookup: %s", err)
|
||||
}
|
||||
if existing == nil {
|
||||
return 0, nil, nil
|
||||
}
|
||||
|
||||
conf, ok := existing.(structs.ConfigEntry)
|
||||
if !ok {
|
||||
return 0, nil, fmt.Errorf("config entry %q (%s) is an invalid type: %T", name, kind, conf)
|
||||
}
|
||||
|
||||
return idx, conf, nil
|
||||
}
|
||||
|
||||
// EnsureConfigEntry is called to upsert creation of a given config entry.
|
||||
func (s *Store) EnsureConfigEntry(idx uint64, conf structs.ConfigEntry) error {
|
||||
tx := s.db.Txn(true)
|
||||
defer tx.Abort()
|
||||
|
||||
// Check for existing configuration.
|
||||
existing, err := tx.First(configTableName, "id", conf.GetKind(), conf.GetName())
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed configuration lookup: %s", err)
|
||||
}
|
||||
|
||||
raftIndex := conf.GetRaftIndex()
|
||||
if existing != nil {
|
||||
existingIdx := existing.(structs.ConfigEntry).GetRaftIndex()
|
||||
raftIndex.CreateIndex = existingIdx.CreateIndex
|
||||
raftIndex.ModifyIndex = existingIdx.ModifyIndex
|
||||
} else {
|
||||
raftIndex.CreateIndex = idx
|
||||
}
|
||||
raftIndex.ModifyIndex = idx
|
||||
|
||||
// Insert the config entry and update the index
|
||||
if err := tx.Insert(configTableName, conf); err != nil {
|
||||
return fmt.Errorf("failed inserting config entry: %s", err)
|
||||
}
|
||||
if err := tx.Insert("index", &IndexEntry{configTableName, idx}); err != nil {
|
||||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
|
||||
tx.Commit()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) DeleteConfigEntry(kind, name string) error {
|
||||
tx := s.db.Txn(true)
|
||||
defer tx.Abort()
|
||||
|
||||
// Get the index
|
||||
idx := maxIndexTxn(tx, configTableName)
|
||||
|
||||
// Try to retrieve the existing health check.
|
||||
existing, err := tx.First(configTableName, "id", kind, name)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed config entry lookup: %s", err)
|
||||
}
|
||||
if existing == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete the config entry from the DB and update the index.
|
||||
if err := tx.Delete(configTableName, existing); err != nil {
|
||||
return fmt.Errorf("failed removing check: %s", err)
|
||||
}
|
||||
if err := tx.Insert("index", &IndexEntry{configTableName, idx}); err != nil {
|
||||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
|
||||
tx.Commit()
|
||||
return nil
|
||||
}
|
@ -0,0 +1,76 @@
|
||||
package state
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
func TestStore_ConfigEntry(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
expected := &structs.ProxyConfigEntry{
|
||||
Kind: structs.ProxyDefaults,
|
||||
Name: "global",
|
||||
ProxyConfig: structs.ConnectProxyConfig{
|
||||
DestinationServiceName: "foo",
|
||||
},
|
||||
}
|
||||
|
||||
// Create
|
||||
if err := s.EnsureConfigEntry(0, expected); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
{
|
||||
idx, config, err := s.ConfigEntry(structs.ProxyDefaults, "global")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if idx != 0 {
|
||||
t.Fatalf("bad: %d", idx)
|
||||
}
|
||||
if !reflect.DeepEqual(expected, config) {
|
||||
t.Fatalf("bad: %#v, %#v", expected, config)
|
||||
}
|
||||
}
|
||||
|
||||
// Update
|
||||
updated := &structs.ProxyConfigEntry{
|
||||
Kind: structs.ProxyDefaults,
|
||||
Name: "global",
|
||||
ProxyConfig: structs.ConnectProxyConfig{
|
||||
DestinationServiceName: "bar",
|
||||
},
|
||||
}
|
||||
if err := s.EnsureConfigEntry(1, updated); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
{
|
||||
idx, config, err := s.ConfigEntry(structs.ProxyDefaults, "global")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if idx != 1 {
|
||||
t.Fatalf("bad: %d", idx)
|
||||
}
|
||||
if !reflect.DeepEqual(updated, config) {
|
||||
t.Fatalf("bad: %#v, %#v", updated, config)
|
||||
}
|
||||
}
|
||||
|
||||
// Delete
|
||||
if err := s.DeleteConfigEntry(structs.ProxyDefaults, "global"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_, config, err := s.ConfigEntry(structs.ProxyDefaults, "global")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if config != nil {
|
||||
t.Fatalf("config should be deleted: %v", config)
|
||||
}
|
||||
}
|
@ -1,127 +0,0 @@
|
||||
package state
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
memdb "github.com/hashicorp/go-memdb"
|
||||
)
|
||||
|
||||
const (
|
||||
configTableName = "configurations"
|
||||
)
|
||||
|
||||
// configTableSchema returns a new table schema used to store global service
|
||||
// and proxy configurations.
|
||||
func configTableSchema() *memdb.TableSchema {
|
||||
return &memdb.TableSchema{
|
||||
Name: configTableName,
|
||||
Indexes: map[string]*memdb.IndexSchema{
|
||||
"id": &memdb.IndexSchema{
|
||||
Name: "id",
|
||||
AllowMissing: false,
|
||||
Unique: true,
|
||||
Indexer: &memdb.CompoundIndex{
|
||||
Indexes: []memdb.Indexer{
|
||||
&memdb.StringFieldIndex{
|
||||
Field: "Kind",
|
||||
Lowercase: true,
|
||||
},
|
||||
&memdb.StringFieldIndex{
|
||||
Field: "Name",
|
||||
Lowercase: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func init() {
|
||||
registerSchema(configTableSchema)
|
||||
}
|
||||
|
||||
// Configurations is used to pull all the configurations for the snapshot.
|
||||
func (s *Snapshot) Configurations() ([]structs.Configuration, error) {
|
||||
ixns, err := s.tx.Get(configTableName, "id")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var ret []structs.Configuration
|
||||
for wrapped := ixns.Next(); wrapped != nil; wrapped = ixns.Next() {
|
||||
ret = append(ret, wrapped.(structs.Configuration))
|
||||
}
|
||||
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
// Configuration is used when restoring from a snapshot.
|
||||
func (s *Restore) Configuration(c structs.Configuration) error {
|
||||
// Insert
|
||||
if err := s.tx.Insert(configTableName, c); err != nil {
|
||||
return fmt.Errorf("failed restoring configuration object: %s", err)
|
||||
}
|
||||
if err := indexUpdateMaxTxn(s.tx, c.ModifyIndex, configTableName); err != nil {
|
||||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// EnsureConfiguration is called to upsert creation of a given configuration.
|
||||
func (s *Store) EnsureConfiguration(idx uint64, conf structs.Configuration) error {
|
||||
tx := s.db.Txn(true)
|
||||
defer tx.Abort()
|
||||
|
||||
// Does it make sense to validate here? We do this for service meta in the state store
|
||||
// but could also do this in RPC endpoint. More version compatibility that way?
|
||||
if err := conf.Validate(); err != nil {
|
||||
return fmt.Errorf("failed validating config: %v", err)
|
||||
}
|
||||
|
||||
// Check for existing configuration.
|
||||
existing, err := tx.First("configurations", "id", conf.GetKind(), conf.GetName())
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed configuration lookup: %s", err)
|
||||
}
|
||||
|
||||
if existing != nil {
|
||||
conf.CreateIndex = serviceNode.CreateIndex
|
||||
conf.ModifyIndex = serviceNode.ModifyIndex
|
||||
} else {
|
||||
conf.CreateIndex = idx
|
||||
}
|
||||
conf.ModifyIndex = idx
|
||||
|
||||
// Insert the configuration and update the index
|
||||
if err := tx.Insert("configurations", conf); err != nil {
|
||||
return fmt.Errorf("failed inserting service: %s", err)
|
||||
}
|
||||
if err := tx.Insert("index", &IndexEntry{"configurations", idx}); err != nil {
|
||||
return fmt.Errorf("failed updating index: %s", err)
|
||||
}
|
||||
|
||||
tx.Commit()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Configuration is called to get a given configuration.
|
||||
func (s *Store) Configuration(idx uint64, kind structs.ConfigurationKind, name string) (structs.Configuration, error) {
|
||||
tx := s.db.Txn(true)
|
||||
defer tx.Abort()
|
||||
|
||||
// Get the existing configuration.
|
||||
existing, err := tx.First("configurations", "id", kind, name)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed configuration lookup: %s", err)
|
||||
}
|
||||
|
||||
conf, ok := existing.(structs.Configuration)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("configuration %q (%s) is an invalid type: %T", name, kind, conf)
|
||||
}
|
||||
|
||||
return conf, nil
|
||||
}
|
Loading…
Reference in new issue