mirror of https://github.com/hashicorp/consul
txn: add node operations
parent
01e1b5b1df
commit
ab58986ac3
|
@ -373,6 +373,36 @@ func (s *Store) ensureNoNodeWithSimilarNameTxn(tx *memdb.Txn, node *structs.Node
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ensureNodeCASTxn updates a node only if the existing index matches the given index.
|
||||||
|
// Returns a bool indicating if a write happened and any error.
|
||||||
|
func (s *Store) ensureNodeCASTxn(tx *memdb.Txn, idx uint64, node *structs.Node) (bool, error) {
|
||||||
|
// Retrieve the existing entry.
|
||||||
|
existing, err := tx.First("nodes", "id", node.Node)
|
||||||
|
if err != nil {
|
||||||
|
return false, fmt.Errorf("node lookup failed: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if the we should do the set. A ModifyIndex of 0 means that
|
||||||
|
// we are doing a set-if-not-exists.
|
||||||
|
if node.ModifyIndex == 0 && existing != nil {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
if node.ModifyIndex != 0 && existing == nil {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
e, ok := existing.(*structs.Node)
|
||||||
|
if ok && node.ModifyIndex != 0 && node.ModifyIndex != e.ModifyIndex {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Perform the update.
|
||||||
|
if err := s.ensureNodeTxn(tx, idx, node); err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
// ensureNodeTxn is the inner function called to actually create a node
|
// ensureNodeTxn is the inner function called to actually create a node
|
||||||
// registration or modify an existing one in the state store. It allows
|
// registration or modify an existing one in the state store. It allows
|
||||||
// passing in a memdb transaction so it may be part of a larger txn.
|
// passing in a memdb transaction so it may be part of a larger txn.
|
||||||
|
@ -569,6 +599,35 @@ func (s *Store) DeleteNode(idx uint64, nodeName string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// deleteNodeCASTxn is used to try doing a node delete operation with a given
|
||||||
|
// raft index. If the CAS index specified is not equal to the last observed index for
|
||||||
|
// the given check, then the call is a noop, otherwise a normal check delete is invoked.
|
||||||
|
func (s *Store) deleteNodeCASTxn(tx *memdb.Txn, idx, cidx uint64, nodeName string) (bool, error) {
|
||||||
|
// Look up the node.
|
||||||
|
node, err := tx.First("nodes", "id", nodeName)
|
||||||
|
if err != nil {
|
||||||
|
return false, fmt.Errorf("check lookup failed: %s", err)
|
||||||
|
}
|
||||||
|
if node == nil {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the existing index does not match the provided CAS
|
||||||
|
// index arg, then we shouldn't update anything and can safely
|
||||||
|
// return early here.
|
||||||
|
existing, ok := node.(*structs.Node)
|
||||||
|
if !ok || existing.ModifyIndex != cidx {
|
||||||
|
return existing == nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Call the actual deletion if the above passed.
|
||||||
|
if err := s.deleteNodeTxn(tx, idx, nodeName); err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
// deleteNodeTxn is the inner method used for removing a node from
|
// deleteNodeTxn is the inner method used for removing a node from
|
||||||
// the store within a given transaction.
|
// the store within a given transaction.
|
||||||
func (s *Store) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeName string) error {
|
func (s *Store) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeName string) error {
|
||||||
|
|
|
@ -122,6 +122,59 @@ func (s *Store) txnIntention(tx *memdb.Txn, idx uint64, op *structs.TxnIntention
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// txnNode handles all Node-related operations.
|
||||||
|
func (s *Store) txnNode(tx *memdb.Txn, idx uint64, op *structs.TxnNodeOp) (structs.TxnResults, error) {
|
||||||
|
var entry *structs.Node
|
||||||
|
var err error
|
||||||
|
|
||||||
|
switch op.Verb {
|
||||||
|
case api.NodeGet:
|
||||||
|
entry, err = getNodeIDTxn(tx, op.Node.ID)
|
||||||
|
|
||||||
|
case api.NodeSet:
|
||||||
|
err = s.ensureNodeTxn(tx, idx, &op.Node)
|
||||||
|
|
||||||
|
case api.NodeCAS:
|
||||||
|
var ok bool
|
||||||
|
ok, err = s.ensureNodeCASTxn(tx, idx, &op.Node)
|
||||||
|
if !ok && err == nil {
|
||||||
|
err = fmt.Errorf("failed to set node %q, index is stale", op.Node.Node)
|
||||||
|
}
|
||||||
|
|
||||||
|
case api.NodeDelete:
|
||||||
|
err = s.deleteNodeTxn(tx, idx, op.Node.Node)
|
||||||
|
|
||||||
|
case api.NodeDeleteCAS:
|
||||||
|
var ok bool
|
||||||
|
ok, err = s.deleteNodeCASTxn(tx, idx, op.Node.ModifyIndex, op.Node.Node)
|
||||||
|
if !ok && err == nil {
|
||||||
|
err = fmt.Errorf("failed to delete node %q, index is stale", op.Node.Node)
|
||||||
|
}
|
||||||
|
|
||||||
|
default:
|
||||||
|
err = fmt.Errorf("unknown Node verb %q", op.Verb)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// For a GET we keep the value, otherwise we clone and blank out the
|
||||||
|
// value (we have to clone so we don't modify the entry being used by
|
||||||
|
// the state store).
|
||||||
|
if entry != nil {
|
||||||
|
if op.Verb == api.NodeGet {
|
||||||
|
result := structs.TxnResult{Node: entry}
|
||||||
|
return structs.TxnResults{&result}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
clone := *entry
|
||||||
|
result := structs.TxnResult{Node: &clone}
|
||||||
|
return structs.TxnResults{&result}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
// txnCheck handles all Check-related operations.
|
// txnCheck handles all Check-related operations.
|
||||||
func (s *Store) txnCheck(tx *memdb.Txn, idx uint64, op *structs.TxnCheckOp) (structs.TxnResults, error) {
|
func (s *Store) txnCheck(tx *memdb.Txn, idx uint64, op *structs.TxnCheckOp) (structs.TxnResults, error) {
|
||||||
var entry *structs.HealthCheck
|
var entry *structs.HealthCheck
|
||||||
|
|
|
@ -19,6 +19,28 @@ type TxnKVOp struct {
|
||||||
// inside a transaction.
|
// inside a transaction.
|
||||||
type TxnKVResult *DirEntry
|
type TxnKVResult *DirEntry
|
||||||
|
|
||||||
|
// TxnNodeOp is used to define a single operation on a node in the catalog inside
|
||||||
|
// a transaction.
|
||||||
|
type TxnNodeOp struct {
|
||||||
|
Verb api.NodeOp
|
||||||
|
Node Node
|
||||||
|
}
|
||||||
|
|
||||||
|
// TxnNodeResult is used to define the result of a single operation on a node
|
||||||
|
// in the catalog inside a transaction.
|
||||||
|
type TxnNodeResult *Node
|
||||||
|
|
||||||
|
// TxnServiceOp is used to define a single operation on a service in the catalog inside
|
||||||
|
// a transaction.
|
||||||
|
type TxnServiceOp struct {
|
||||||
|
Verb api.ServiceOp
|
||||||
|
Service NodeService
|
||||||
|
}
|
||||||
|
|
||||||
|
// TxnServiceResult is used to define the result of a single operation on a service
|
||||||
|
// in the catalog inside a transaction.
|
||||||
|
type TxnServiceResult *NodeService
|
||||||
|
|
||||||
// TxnCheckOp is used to define a single operation on a health check inside a
|
// TxnCheckOp is used to define a single operation on a health check inside a
|
||||||
// transaction.
|
// transaction.
|
||||||
type TxnCheckOp struct {
|
type TxnCheckOp struct {
|
||||||
|
@ -39,6 +61,8 @@ type TxnIntentionOp IntentionRequest
|
||||||
type TxnOp struct {
|
type TxnOp struct {
|
||||||
KV *TxnKVOp
|
KV *TxnKVOp
|
||||||
Intention *TxnIntentionOp
|
Intention *TxnIntentionOp
|
||||||
|
Node *TxnNodeOp
|
||||||
|
Service *TxnServiceOp
|
||||||
Check *TxnCheckOp
|
Check *TxnCheckOp
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -87,8 +111,10 @@ type TxnErrors []*TxnError
|
||||||
// TxnResult is used to define the result of a given operation inside a
|
// TxnResult is used to define the result of a given operation inside a
|
||||||
// transaction. Only one of the types should be filled out per entry.
|
// transaction. Only one of the types should be filled out per entry.
|
||||||
type TxnResult struct {
|
type TxnResult struct {
|
||||||
KV TxnKVResult
|
KV TxnKVResult
|
||||||
Check TxnCheckResult
|
Node TxnNodeResult
|
||||||
|
Service TxnServiceResult
|
||||||
|
Check TxnCheckResult
|
||||||
}
|
}
|
||||||
|
|
||||||
// TxnResults is a list of TxnResult entries.
|
// TxnResults is a list of TxnResult entries.
|
||||||
|
|
22
api/txn.go
22
api/txn.go
|
@ -89,6 +89,28 @@ type KVTxnResponse struct {
|
||||||
Errors TxnErrors
|
Errors TxnErrors
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NodeOp constants give possible operations available in a transaction.
|
||||||
|
type NodeOp string
|
||||||
|
|
||||||
|
const (
|
||||||
|
NodeGet NodeOp = "get"
|
||||||
|
NodeSet NodeOp = "set"
|
||||||
|
NodeCAS NodeOp = "cas"
|
||||||
|
NodeDelete NodeOp = "delete"
|
||||||
|
NodeDeleteCAS NodeOp = "delete-cas"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ServiceOp constants give possible operations available in a transaction.
|
||||||
|
type ServiceOp string
|
||||||
|
|
||||||
|
const (
|
||||||
|
ServiceGet ServiceOp = "get"
|
||||||
|
ServiceSet ServiceOp = "set"
|
||||||
|
ServiceCAS ServiceOp = "cas"
|
||||||
|
ServiceDelete ServiceOp = "delete"
|
||||||
|
ServiceDeleteCAS ServiceOp = "delete-cas"
|
||||||
|
)
|
||||||
|
|
||||||
// CheckOp constants give possible operations available in a transaction.
|
// CheckOp constants give possible operations available in a transaction.
|
||||||
type CheckOp string
|
type CheckOp string
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue