mirror of https://github.com/hashicorp/consul
427 lines
10 KiB
Go
427 lines
10 KiB
Go
package memdb
|
|
|
|
import (
|
|
"fmt"
|
|
"strings"
|
|
|
|
"github.com/hashicorp/go-immutable-radix"
|
|
)
|
|
|
|
const (
|
|
id = "id"
|
|
)
|
|
// tableIndex is a tuple of (Table, Index) used for lookups
|
|
type tableIndex struct {
|
|
Table string
|
|
Index string
|
|
}
|
|
|
|
// Txn is a transaction against a MemDB.
|
|
// This can be a read or write transaction.
|
|
type Txn struct {
|
|
db *MemDB
|
|
write bool
|
|
rootTxn *iradix.Txn
|
|
after []func()
|
|
|
|
modified map[tableIndex]*iradix.Txn
|
|
}
|
|
|
|
// readableIndex returns a transaction usable for reading the given
|
|
// index in a table. If a write transaction is in progress, we may need
|
|
// to use an existing modified txn.
|
|
func (txn *Txn) readableIndex(table, index string) *iradix.Txn {
|
|
// Look for existing transaction
|
|
if txn.write && txn.modified != nil {
|
|
key := tableIndex{table, index}
|
|
exist, ok := txn.modified[key]
|
|
if ok {
|
|
return exist
|
|
}
|
|
}
|
|
|
|
// Create a read transaction
|
|
path := indexPath(table, index)
|
|
raw, _ := txn.rootTxn.Get(path)
|
|
indexTxn := raw.(*iradix.Tree).Txn()
|
|
return indexTxn
|
|
}
|
|
|
|
// writableIndex returns a transaction usable for modifying the
|
|
// given index in a table.
|
|
func (txn *Txn) writableIndex(table, index string) *iradix.Txn {
|
|
if txn.modified == nil {
|
|
txn.modified = make(map[tableIndex]*iradix.Txn)
|
|
}
|
|
|
|
// Look for existing transaction
|
|
key := tableIndex{table, index}
|
|
exist, ok := txn.modified[key]
|
|
if ok {
|
|
return exist
|
|
}
|
|
|
|
// Start a new transaction
|
|
path := indexPath(table, index)
|
|
raw, _ := txn.rootTxn.Get(path)
|
|
indexTxn := raw.(*iradix.Tree).Txn()
|
|
|
|
// Keep this open for the duration of the txn
|
|
txn.modified[key] = indexTxn
|
|
return indexTxn
|
|
}
|
|
|
|
// Abort is used to cancel this transaction.
|
|
// This is a noop for read transactions.
|
|
func (txn *Txn) Abort() {
|
|
// Noop for a read transaction
|
|
if !txn.write {
|
|
return
|
|
}
|
|
|
|
// Check if already aborted or committed
|
|
if txn.rootTxn == nil {
|
|
return
|
|
}
|
|
|
|
// Clear the txn
|
|
txn.rootTxn = nil
|
|
txn.modified = nil
|
|
|
|
// Release the writer lock since this is invalid
|
|
txn.db.writer.Unlock()
|
|
}
|
|
|
|
// Commit is used to finalize this transaction.
|
|
// This is a noop for read transactions.
|
|
func (txn *Txn) Commit() {
|
|
// Noop for a read transaction
|
|
if !txn.write {
|
|
return
|
|
}
|
|
|
|
// Check if already aborted or committed
|
|
if txn.rootTxn == nil {
|
|
return
|
|
}
|
|
|
|
// Commit each sub-transaction scoped to (table, index)
|
|
for key, subTxn := range txn.modified {
|
|
path := indexPath(key.Table, key.Index)
|
|
final := subTxn.Commit()
|
|
txn.rootTxn.Insert(path, final)
|
|
}
|
|
|
|
// Update the root of the DB
|
|
txn.db.root = txn.rootTxn.Commit()
|
|
|
|
// Clear the txn
|
|
txn.rootTxn = nil
|
|
txn.modified = nil
|
|
|
|
// Release the writer lock since this is invalid
|
|
txn.db.writer.Unlock()
|
|
|
|
// Run the deferred functions, if any
|
|
for i := len(txn.after); i > 0; i-- {
|
|
fn := txn.after[i-1]
|
|
fn()
|
|
}
|
|
}
|
|
|
|
// Insert is used to add or update an object into the given table
|
|
func (txn *Txn) Insert(table string, obj interface{}) error {
|
|
if !txn.write {
|
|
return fmt.Errorf("cannot insert in read-only transaction")
|
|
}
|
|
|
|
// Get the table schema
|
|
tableSchema, ok := txn.db.schema.Tables[table]
|
|
if !ok {
|
|
return fmt.Errorf("invalid table '%s'", table)
|
|
}
|
|
|
|
// Get the primary ID of the object
|
|
idSchema := tableSchema.Indexes[id]
|
|
ok, idVal, err := idSchema.Indexer.FromObject(obj)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to build primary index: %v", err)
|
|
}
|
|
if !ok {
|
|
return fmt.Errorf("object missing primary index")
|
|
}
|
|
|
|
// Lookup the object by ID first, to see if this is an update
|
|
idTxn := txn.writableIndex(table, id)
|
|
existing, update := idTxn.Get(idVal)
|
|
|
|
// On an update, there is an existing object with the given
|
|
// primary ID. We do the update by deleting the current object
|
|
// and inserting the new object.
|
|
for name, indexSchema := range tableSchema.Indexes {
|
|
indexTxn := txn.writableIndex(table, name)
|
|
|
|
// Handle the update by deleting from the index first
|
|
if update {
|
|
ok, val, err := indexSchema.Indexer.FromObject(existing)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to build index '%s': %v", name, err)
|
|
}
|
|
if ok {
|
|
// Handle non-unique index by computing a unique index.
|
|
// This is done by appending the primary key which must
|
|
// be unique anyways.
|
|
if !indexSchema.Unique {
|
|
val = append(val, idVal...)
|
|
}
|
|
indexTxn.Delete(val)
|
|
}
|
|
}
|
|
|
|
// Handle the insert after the update
|
|
ok, val, err := indexSchema.Indexer.FromObject(obj)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to build index '%s': %v", name, err)
|
|
}
|
|
if !ok {
|
|
if indexSchema.AllowMissing {
|
|
continue
|
|
} else {
|
|
return fmt.Errorf("missing value for index '%s'", name)
|
|
}
|
|
}
|
|
|
|
// Handle non-unique index by computing a unique index.
|
|
// This is done by appending the primary key which must
|
|
// be unique anyways.
|
|
if !indexSchema.Unique {
|
|
val = append(val, idVal...)
|
|
}
|
|
indexTxn.Insert(val, obj)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Delete is used to delete a single object from the given table
|
|
// This object must already exist in the table
|
|
func (txn *Txn) Delete(table string, obj interface{}) error {
|
|
if !txn.write {
|
|
return fmt.Errorf("cannot delete in read-only transaction")
|
|
}
|
|
|
|
// Get the table schema
|
|
tableSchema, ok := txn.db.schema.Tables[table]
|
|
if !ok {
|
|
return fmt.Errorf("invalid table '%s'", table)
|
|
}
|
|
|
|
// Get the primary ID of the object
|
|
idSchema := tableSchema.Indexes[id]
|
|
ok, idVal, err := idSchema.Indexer.FromObject(obj)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to build primary index: %v", err)
|
|
}
|
|
if !ok {
|
|
return fmt.Errorf("object missing primary index")
|
|
}
|
|
|
|
// Lookup the object by ID first, check fi we should continue
|
|
idTxn := txn.writableIndex(table, id)
|
|
existing, ok := idTxn.Get(idVal)
|
|
if !ok {
|
|
return fmt.Errorf("not found")
|
|
}
|
|
|
|
// Remove the object from all the indexes
|
|
for name, indexSchema := range tableSchema.Indexes {
|
|
indexTxn := txn.writableIndex(table, name)
|
|
|
|
// Handle the update by deleting from the index first
|
|
ok, val, err := indexSchema.Indexer.FromObject(existing)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to build index '%s': %v", name, err)
|
|
}
|
|
if ok {
|
|
// Handle non-unique index by computing a unique index.
|
|
// This is done by appending the primary key which must
|
|
// be unique anyways.
|
|
if !indexSchema.Unique {
|
|
val = append(val, idVal...)
|
|
}
|
|
indexTxn.Delete(val)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// DeleteAll is used to delete all the objects in a given table
|
|
// matching the constraints on the index
|
|
func (txn *Txn) DeleteAll(table, index string, args ...interface{}) (int, error) {
|
|
if !txn.write {
|
|
return 0, fmt.Errorf("cannot delete in read-only transaction")
|
|
}
|
|
|
|
// Get all the objects
|
|
iter, err := txn.Get(table, index, args...)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
// Put them into a slice so there are no safety concerns while actually
|
|
// performing the deletes
|
|
var objs []interface{}
|
|
for {
|
|
obj := iter.Next()
|
|
if obj == nil {
|
|
break
|
|
}
|
|
|
|
objs = append(objs, obj)
|
|
}
|
|
|
|
// Do the deletes
|
|
num := 0
|
|
for _, obj := range(objs) {
|
|
if err := txn.Delete(table, obj); err != nil {
|
|
return num, err
|
|
}
|
|
num++
|
|
}
|
|
return num, nil
|
|
}
|
|
|
|
// First is used to return the first matching object for
|
|
// the given constraints on the index
|
|
func (txn *Txn) First(table, index string, args ...interface{}) (interface{}, error) {
|
|
// Get the index value
|
|
indexSchema, val, err := txn.getIndexValue(table, index, args...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Get the index itself
|
|
indexTxn := txn.readableIndex(table, indexSchema.Name)
|
|
|
|
// Do an exact lookup
|
|
if indexSchema.Unique && val != nil && indexSchema.Name == index {
|
|
obj, ok := indexTxn.Get(val)
|
|
if !ok {
|
|
return nil, nil
|
|
}
|
|
return obj, nil
|
|
}
|
|
|
|
// Handle non-unique index by using an iterator and getting the first value
|
|
iter := indexTxn.Root().Iterator()
|
|
iter.SeekPrefix(val)
|
|
_, value, _ := iter.Next()
|
|
return value, nil
|
|
}
|
|
|
|
// getIndexValue is used to get the IndexSchema and the value
|
|
// used to scan the index given the parameters. This handles prefix based
|
|
// scans when the index has the "_prefix" suffix. The index must support
|
|
// prefix iteration.
|
|
func (txn *Txn) getIndexValue(table, index string, args ...interface{}) (*IndexSchema, []byte, error) {
|
|
// Get the table schema
|
|
tableSchema, ok := txn.db.schema.Tables[table]
|
|
if !ok {
|
|
return nil, nil, fmt.Errorf("invalid table '%s'", table)
|
|
}
|
|
|
|
// Check for a prefix scan
|
|
prefixScan := false
|
|
if strings.HasSuffix(index, "_prefix") {
|
|
index = strings.TrimSuffix(index, "_prefix")
|
|
prefixScan = true
|
|
}
|
|
|
|
// Get the index schema
|
|
indexSchema, ok := tableSchema.Indexes[index]
|
|
if !ok {
|
|
return nil, nil, fmt.Errorf("invalid index '%s'", index)
|
|
}
|
|
|
|
// Hot-path for when there are no arguments
|
|
if len(args) == 0 {
|
|
return indexSchema, nil, nil
|
|
}
|
|
|
|
// Special case the prefix scanning
|
|
if prefixScan {
|
|
prefixIndexer, ok := indexSchema.Indexer.(PrefixIndexer)
|
|
if !ok {
|
|
return indexSchema, nil,
|
|
fmt.Errorf("index '%s' does not support prefix scanning", index)
|
|
}
|
|
|
|
val, err := prefixIndexer.PrefixFromArgs(args...)
|
|
if err != nil {
|
|
return indexSchema, nil, fmt.Errorf("index error: %v", err)
|
|
}
|
|
return indexSchema, val, err
|
|
}
|
|
|
|
// Get the exact match index
|
|
val, err := indexSchema.Indexer.FromArgs(args...)
|
|
if err != nil {
|
|
return indexSchema, nil, fmt.Errorf("index error: %v", err)
|
|
}
|
|
return indexSchema, val, err
|
|
}
|
|
|
|
// ResultIterator is used to iterate over a list of results
|
|
// from a Get query on a table.
|
|
type ResultIterator interface {
|
|
Next() interface{}
|
|
}
|
|
|
|
// Get is used to construct a ResultIterator over all the
|
|
// rows that match the given constraints of an index.
|
|
func (txn *Txn) Get(table, index string, args ...interface{}) (ResultIterator, error) {
|
|
// Get the index value to scan
|
|
indexSchema, val, err := txn.getIndexValue(table, index, args...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Get the index itself
|
|
indexTxn := txn.readableIndex(table, indexSchema.Name)
|
|
indexRoot := indexTxn.Root()
|
|
|
|
// Get an interator over the index
|
|
indexIter := indexRoot.Iterator()
|
|
|
|
// Seek the iterator to the appropriate sub-set
|
|
indexIter.SeekPrefix(val)
|
|
|
|
// Create an iterator
|
|
iter := &radixIterator{
|
|
iter: indexIter,
|
|
}
|
|
return iter, nil
|
|
}
|
|
|
|
// Defer is used to push a new arbitrary function onto a stack which
|
|
// gets called when a transaction is committed and finished. Deferred
|
|
// functions are called in LIFO order, and only invoked at the end of
|
|
// write transactions.
|
|
func (txn *Txn) Defer(fn func()) {
|
|
txn.after = append(txn.after, fn)
|
|
}
|
|
|
|
// radixIterator is used to wrap an underlying iradix iterator.
|
|
// This is much mroe efficient than a sliceIterator as we are not
|
|
// materializing the entire view.
|
|
type radixIterator struct {
|
|
iter *iradix.Iterator
|
|
}
|
|
|
|
func (r *radixIterator) Next() interface{} {
|
|
_, value, ok := r.iter.Next()
|
|
if !ok {
|
|
return nil
|
|
}
|
|
return value
|
|
}
|