mirror of https://github.com/hashicorp/consul
state: Update docstrings for changeTrackerDB and txn
And un-embed memdb.DB to prevent accidental access to underlying methods.pull/8007/head
parent
f6ac08be04
commit
59bac0f99d
|
@ -4,41 +4,45 @@ import (
|
||||||
"github.com/hashicorp/go-memdb"
|
"github.com/hashicorp/go-memdb"
|
||||||
)
|
)
|
||||||
|
|
||||||
// memDBWrapper is a thin shim over memdb.DB which forces all new tranactions to
|
// memDBWrapper is a thin wrapper around memdb.DB which enables TrackChanges on
|
||||||
// report changes and for those changes to automatically deliver the changed
|
// all write transactions. When the transaction is committed the changes are
|
||||||
// objects to our central event handler in case new streaming events need to be
|
// sent to the eventPublisher which will create and emit change events.
|
||||||
// omitted.
|
|
||||||
type memDBWrapper struct {
|
type memDBWrapper struct {
|
||||||
*memdb.MemDB
|
db *memdb.MemDB
|
||||||
// TODO: add publisher
|
// TODO: add publisher
|
||||||
}
|
}
|
||||||
|
|
||||||
// Txn intercepts MemDB.Txn(). It allows read-only transactions to pass through
|
// Txn exists to maintain backwards compatibility with memdb.DB.Txn. Preexisting
|
||||||
// but fails write transactions as we now need to force callers to use a
|
// code may use it to create a read-only transaction, but it will panic if called
|
||||||
// slightly different API so that change capture can happen automatically. The
|
// with write=true.
|
||||||
// returned Txn object is wrapped with a no-op wrapper that just keeps all
|
//
|
||||||
// transactions in our state store the same type. The wrapper only has
|
// Deprecated: use either ReadTxn, or WriteTxn.
|
||||||
// non-passthrough behavior for write transactions though.
|
|
||||||
func (db *memDBWrapper) Txn(write bool) *txnWrapper {
|
func (db *memDBWrapper) Txn(write bool) *txnWrapper {
|
||||||
if write {
|
if write {
|
||||||
panic("don't use db.Txn(true), use db.WriteTxn(idx uin64)")
|
panic("don't use db.Txn(true), use db.WriteTxn(idx uin64)")
|
||||||
}
|
}
|
||||||
return &txnWrapper{
|
return db.ReadTxn()
|
||||||
Txn: db.MemDB.Txn(false),
|
}
|
||||||
}
|
|
||||||
|
// ReadTxn returns a read-only transaction which behaves exactly the same as
|
||||||
|
// memdb.Txn
|
||||||
|
func (db *memDBWrapper) ReadTxn() *txnWrapper {
|
||||||
|
return &txnWrapper{Txn: db.db.Txn(false)}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteTxn returns a wrapped memdb.Txn suitable for writes to the state store.
|
// WriteTxn returns a wrapped memdb.Txn suitable for writes to the state store.
|
||||||
// It will track changes and publish events for them automatically when Commit
|
// It will track changes and publish events for the changes when Commit
|
||||||
// is called. The idx argument should be the index of the currently operating
|
// is called.
|
||||||
// Raft operation. Almost all mutations to state should happen as part of a raft
|
//
|
||||||
// apply so the index of that log being applied should be plumbed through to
|
// The idx argument must be the index of the current Raft operation. Almost
|
||||||
// here. A few exceptional cases are transactions that are only executed on a
|
// all mutations to state should happen as part of a raft apply so the index of
|
||||||
// fresh memdb store during a Restore and a few places in tests where we insert
|
// the log being applied can be passed to WriteTxn.
|
||||||
|
// The exceptional cases are transactions that are executed on an empty
|
||||||
|
// memdb.DB as part of Restore, and those executed by tests where we insert
|
||||||
// data directly into the DB. These cases may use WriteTxnRestore.
|
// data directly into the DB. These cases may use WriteTxnRestore.
|
||||||
func (db *memDBWrapper) WriteTxn(idx uint64) *txnWrapper {
|
func (db *memDBWrapper) WriteTxn(idx uint64) *txnWrapper {
|
||||||
t := &txnWrapper{
|
t := &txnWrapper{
|
||||||
Txn: db.MemDB.Txn(true),
|
Txn: db.db.Txn(true),
|
||||||
Index: idx,
|
Index: idx,
|
||||||
}
|
}
|
||||||
t.Txn.TrackChanges()
|
t.Txn.TrackChanges()
|
||||||
|
@ -47,40 +51,43 @@ func (db *memDBWrapper) WriteTxn(idx uint64) *txnWrapper {
|
||||||
|
|
||||||
// WriteTxnRestore returns a wrapped RW transaction that does NOT have change
|
// WriteTxnRestore returns a wrapped RW transaction that does NOT have change
|
||||||
// tracking enabled. This should only be used in Restore where we need to
|
// tracking enabled. This should only be used in Restore where we need to
|
||||||
// replace the entire contents of the Store without a need to track the changes
|
// replace the entire contents of the Store without a need to track the changes.
|
||||||
// made and emit events. Subscribers will all reset on a restore and start
|
// WriteTxnRestore uses a zero index since the whole restore doesn't really occur
|
||||||
// again. It also uses a zero index since the whole restore doesn't really occur
|
|
||||||
// at one index - the effect is to write many values that were previously
|
// at one index - the effect is to write many values that were previously
|
||||||
// written across many indexes.
|
// written across many indexes.
|
||||||
func (db *memDBWrapper) WriteTxnRestore() *txnWrapper {
|
func (db *memDBWrapper) WriteTxnRestore() *txnWrapper {
|
||||||
t := &txnWrapper{
|
t := &txnWrapper{
|
||||||
Txn: db.MemDB.Txn(true),
|
Txn: db.db.Txn(true),
|
||||||
Index: 0,
|
Index: 0,
|
||||||
}
|
}
|
||||||
return t
|
return t
|
||||||
}
|
}
|
||||||
|
|
||||||
// txnWrapper wraps a memdb.Txn to automatically capture changes and process
|
// txnWrapper wraps a memdb.Txn to capture changes and send them to the
|
||||||
// events for the write as it commits. This can't be done just with txn.Defer
|
// EventPublisher.
|
||||||
// because errors the callback is invoked after commit completes so errors
|
//
|
||||||
// during event publishing would cause silent dropped events while the state
|
// This can not be done with txn.Defer because the callback passed to Defer is
|
||||||
// store still changed and the write looked successful from the outside.
|
// invoked after commit completes, and because the callback can not return an
|
||||||
|
// error. Any errors from the callback would be lost, which would result in a
|
||||||
|
// missing change event, even though the state store had changed.
|
||||||
type txnWrapper struct {
|
type txnWrapper struct {
|
||||||
// Index stores the index the write is occuring at in raft if this is a write
|
// Index in raft where the write is occurring. The value is zero for a
|
||||||
// transaction. If it's zero it means this is a read transaction.
|
// read-only transaction, and for a WriteTxnRestore transaction.
|
||||||
|
// Index is stored so that it may be passed along to any subscribers as part
|
||||||
|
// of a change event.
|
||||||
Index uint64
|
Index uint64
|
||||||
*memdb.Txn
|
*memdb.Txn
|
||||||
|
|
||||||
store *Store
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Commit overrides Commit on the underlying memdb.Txn and causes events to be
|
// Commit first pushes changes to EventPublisher, then calls Commit on the
|
||||||
// published for any changes. Note that it has a different signature to
|
// underlying transaction.
|
||||||
// memdb.Txn - returning an error that should be checked since an error implies
|
|
||||||
// that something prevented the commit from completing.
|
|
||||||
//
|
//
|
||||||
|
// Note that this function, unlike memdb.Txn, returns an error which must be checked
|
||||||
|
// by the caller. A non-nil error indicates that a commit failed and was not
|
||||||
|
// applied.
|
||||||
// TODO: currently none of the callers check error, should they all be changed?
|
// TODO: currently none of the callers check error, should they all be changed?
|
||||||
func (tx *txnWrapper) Commit() error {
|
func (tx *txnWrapper) Commit() error {
|
||||||
|
// changes may be empty if this is a read-only or WriteTxnRestore transaction.
|
||||||
//changes := tx.Txn.Changes()
|
//changes := tx.Txn.Changes()
|
||||||
// TODO: publish changes
|
// TODO: publish changes
|
||||||
|
|
||||||
|
|
|
@ -161,7 +161,7 @@ func NewStateStore(gc *TombstoneGC) (*Store, error) {
|
||||||
lockDelay: NewDelay(),
|
lockDelay: NewDelay(),
|
||||||
}
|
}
|
||||||
s.db = &memDBWrapper{
|
s.db = &memDBWrapper{
|
||||||
MemDB: db,
|
db: db,
|
||||||
}
|
}
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue