mirror of https://github.com/hashicorp/consul
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
202 lines
5.5 KiB
202 lines
5.5 KiB
6 years ago
|
package api
|
||
|
|
||
|
import (
|
||
|
"bytes"
|
||
|
"fmt"
|
||
|
"io"
|
||
|
"net/http"
|
||
|
)
|
||
|
|
||
|
// Txn is used to manipulate the Txn API
|
||
|
type Txn struct {
|
||
|
c *Client
|
||
|
}
|
||
|
|
||
|
// Txn is used to return a handle to the K/V apis
|
||
|
func (c *Client) Txn() *Txn {
|
||
|
return &Txn{c}
|
||
|
}
|
||
|
|
||
|
// TxnOp is the internal format we send to Consul. Currently only K/V and
|
||
|
// check operations are supported.
|
||
|
type TxnOp struct {
|
||
|
KV *KVTxnOp
|
||
|
Check *CheckTxnOp
|
||
|
}
|
||
|
|
||
|
// TxnOps is a list of transaction operations.
|
||
|
type TxnOps []*TxnOp
|
||
|
|
||
|
// TxnResult is the internal format we receive from Consul.
|
||
|
type TxnResult struct {
|
||
|
KV *KVPair
|
||
|
Check *HealthCheck
|
||
|
}
|
||
|
|
||
|
// TxnResults is a list of TxnResult objects.
|
||
|
type TxnResults []*TxnResult
|
||
|
|
||
|
// TxnError is used to return information about an operation in a transaction.
|
||
|
type TxnError struct {
|
||
|
OpIndex int
|
||
|
What string
|
||
|
}
|
||
|
|
||
|
// TxnErrors is a list of TxnError objects.
|
||
|
type TxnErrors []*TxnError
|
||
|
|
||
|
// TxnResponse is the internal format we receive from Consul.
|
||
|
type TxnResponse struct {
|
||
|
Results TxnResults
|
||
|
Errors TxnErrors
|
||
|
}
|
||
|
|
||
|
// KVOp constants give possible operations available in a transaction.
|
||
|
type KVOp string
|
||
|
|
||
|
const (
|
||
|
KVSet KVOp = "set"
|
||
|
KVDelete KVOp = "delete"
|
||
|
KVDeleteCAS KVOp = "delete-cas"
|
||
|
KVDeleteTree KVOp = "delete-tree"
|
||
|
KVCAS KVOp = "cas"
|
||
|
KVLock KVOp = "lock"
|
||
|
KVUnlock KVOp = "unlock"
|
||
|
KVGet KVOp = "get"
|
||
|
KVGetTree KVOp = "get-tree"
|
||
|
KVCheckSession KVOp = "check-session"
|
||
|
KVCheckIndex KVOp = "check-index"
|
||
|
KVCheckNotExists KVOp = "check-not-exists"
|
||
|
)
|
||
|
|
||
|
// KVTxnOp defines a single operation inside a transaction.
|
||
|
type KVTxnOp struct {
|
||
|
Verb KVOp
|
||
|
Key string
|
||
|
Value []byte
|
||
|
Flags uint64
|
||
|
Index uint64
|
||
|
Session string
|
||
|
}
|
||
|
|
||
|
// KVTxnOps defines a set of operations to be performed inside a single
|
||
|
// transaction.
|
||
|
type KVTxnOps []*KVTxnOp
|
||
|
|
||
|
// KVTxnResponse has the outcome of a transaction.
|
||
|
type KVTxnResponse struct {
|
||
|
Results []*KVPair
|
||
|
Errors TxnErrors
|
||
|
}
|
||
|
|
||
|
// CheckOp constants give possible operations available in a transaction.
|
||
|
type CheckOp string
|
||
|
|
||
|
const (
|
||
|
CheckGet CheckOp = "get"
|
||
|
CheckSet CheckOp = "set"
|
||
|
CheckCAS CheckOp = "cas"
|
||
|
CheckDelete CheckOp = "delete"
|
||
|
CheckDeleteCAS CheckOp = "delete-cas"
|
||
|
)
|
||
|
|
||
|
// CheckTxnOp defines a single operation inside a transaction.
|
||
|
type CheckTxnOp struct {
|
||
|
Verb CheckOp
|
||
|
Check HealthCheck
|
||
|
}
|
||
|
|
||
|
// Txn is used to apply multiple Consul operations in a single, atomic transaction.
|
||
|
//
|
||
|
// Note that Go will perform the required base64 encoding on the values
|
||
|
// automatically because the type is a byte slice. Transactions are defined as a
|
||
|
// list of operations to perform, using the different fields in the TxnOp structure
|
||
|
// to define operations. If any operation fails, none of the changes are applied
|
||
|
// to the state store.
|
||
|
//
|
||
|
// Even though this is generally a write operation, we take a QueryOptions input
|
||
|
// and return a QueryMeta output. If the transaction contains only read ops, then
|
||
|
// Consul will fast-path it to a different endpoint internally which supports
|
||
|
// consistency controls, but not blocking. If there are write operations then
|
||
|
// the request will always be routed through raft and any consistency settings
|
||
|
// will be ignored.
|
||
|
//
|
||
|
// Here's an example:
|
||
|
//
|
||
|
// ops := KVTxnOps{
|
||
|
// &KVTxnOp{
|
||
|
// Verb: KVLock,
|
||
|
// Key: "test/lock",
|
||
|
// Session: "adf4238a-882b-9ddc-4a9d-5b6758e4159e",
|
||
|
// Value: []byte("hello"),
|
||
|
// },
|
||
|
// &KVTxnOp{
|
||
|
// Verb: KVGet,
|
||
|
// Key: "another/key",
|
||
|
// },
|
||
|
// &CheckTxnOp{
|
||
|
// Verb: CheckSet,
|
||
|
// HealthCheck: HealthCheck{
|
||
|
// Node: "foo",
|
||
|
// CheckID: "redis:a",
|
||
|
// Name: "Redis Health Check",
|
||
|
// Status: "passing",
|
||
|
// },
|
||
|
// }
|
||
|
// }
|
||
|
// ok, response, _, err := kv.Txn(&ops, nil)
|
||
|
//
|
||
|
// If there is a problem making the transaction request then an error will be
|
||
|
// returned. Otherwise, the ok value will be true if the transaction succeeded
|
||
|
// or false if it was rolled back. The response is a structured return value which
|
||
|
// will have the outcome of the transaction. Its Results member will have entries
|
||
|
// for each operation. For KV operations, Deleted keys will have a nil entry in the
|
||
|
// results, and to save space, the Value of each key in the Results will be nil
|
||
|
// unless the operation is a KVGet. If the transaction was rolled back, the Errors
|
||
|
// member will have entries referencing the index of the operation that failed
|
||
|
// along with an error message.
|
||
|
func (t *Txn) Txn(txn TxnOps, q *QueryOptions) (bool, *TxnResponse, *QueryMeta, error) {
|
||
|
return t.c.txn(txn, q)
|
||
|
}
|
||
|
|
||
|
func (c *Client) txn(txn TxnOps, q *QueryOptions) (bool, *TxnResponse, *QueryMeta, error) {
|
||
|
r := c.newRequest("PUT", "/v1/txn")
|
||
|
r.setQueryOptions(q)
|
||
|
|
||
|
// Convert into the internal txn format.
|
||
|
ops := make(TxnOps, 0, len(txn))
|
||
|
for _, kvOp := range txn {
|
||
|
switch {
|
||
|
case kvOp.KV != nil:
|
||
|
ops = append(ops, &TxnOp{KV: kvOp.KV})
|
||
|
case kvOp.Check != nil:
|
||
|
ops = append(ops, &TxnOp{Check: kvOp.Check})
|
||
|
}
|
||
|
}
|
||
|
r.obj = ops
|
||
|
rtt, resp, err := c.doRequest(r)
|
||
|
if err != nil {
|
||
|
return false, nil, nil, err
|
||
|
}
|
||
|
defer resp.Body.Close()
|
||
|
|
||
|
qm := &QueryMeta{}
|
||
|
parseQueryMeta(resp, qm)
|
||
|
qm.RequestTime = rtt
|
||
|
|
||
|
if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusConflict {
|
||
|
var txnResp TxnResponse
|
||
|
if err := decodeBody(resp, &txnResp); err != nil {
|
||
|
return false, nil, nil, err
|
||
|
}
|
||
|
|
||
|
return resp.StatusCode == http.StatusOK, &txnResp, qm, nil
|
||
|
}
|
||
|
|
||
|
var buf bytes.Buffer
|
||
|
if _, err := io.Copy(&buf, resp.Body); err != nil {
|
||
|
return false, nil, nil, fmt.Errorf("Failed to read response: %v", err)
|
||
|
}
|
||
|
return false, nil, nil, fmt.Errorf("Failed request: %s", buf.String())
|
||
|
}
|