Moves txn code into a new endpoint, not specific to KV.

pull/2028/head
James Phillips 2016-05-10 21:41:47 -07:00
parent 1fefdcb962
commit 69f58ad04a
11 changed files with 710 additions and 660 deletions

View File

@ -317,7 +317,7 @@ func (k *KV) deleteInternal(key string, params map[string]string, q *WriteOption
// entries referencing the index of the operation that failed along with an error // entries referencing the index of the operation that failed along with an error
// message. // message.
func (k *KV) Txn(txn *KVTxn, q *WriteOptions) (bool, *KVTxnResult, *WriteMeta, error) { func (k *KV) Txn(txn *KVTxn, q *WriteOptions) (bool, *KVTxnResult, *WriteMeta, error) {
r := k.c.newRequest("PUT", "/v1/kv-txn") r := k.c.newRequest("PUT", "/v1/txn")
r.setWriteOptions(q) r.setWriteOptions(q)
r.obj = txn r.obj = txn

View File

@ -242,7 +242,6 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/v1/event/list", s.wrap(s.EventList)) s.mux.HandleFunc("/v1/event/list", s.wrap(s.EventList))
s.mux.HandleFunc("/v1/kv/", s.wrap(s.KVSEndpoint)) s.mux.HandleFunc("/v1/kv/", s.wrap(s.KVSEndpoint))
s.mux.HandleFunc("/v1/kv-txn", s.wrap(s.KVSTxn))
s.mux.HandleFunc("/v1/session/create", s.wrap(s.SessionCreate)) s.mux.HandleFunc("/v1/session/create", s.wrap(s.SessionCreate))
s.mux.HandleFunc("/v1/session/destroy/", s.wrap(s.SessionDestroy)) s.mux.HandleFunc("/v1/session/destroy/", s.wrap(s.SessionDestroy))
@ -270,6 +269,8 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/v1/query", s.wrap(s.PreparedQueryGeneral)) s.mux.HandleFunc("/v1/query", s.wrap(s.PreparedQueryGeneral))
s.mux.HandleFunc("/v1/query/", s.wrap(s.PreparedQuerySpecific)) s.mux.HandleFunc("/v1/query/", s.wrap(s.PreparedQuerySpecific))
s.mux.HandleFunc("/v1/txn", s.wrap(s.Txn))
if enableDebug { if enableDebug {
s.mux.HandleFunc("/debug/pprof/", pprof.Index) s.mux.HandleFunc("/debug/pprof/", pprof.Index)
s.mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) s.mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)

View File

@ -2,14 +2,12 @@ package agent
import ( import (
"bytes" "bytes"
"encoding/base64"
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"strconv" "strconv"
"strings" "strings"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
) )
@ -286,116 +284,3 @@ func conflictingFlags(resp http.ResponseWriter, req *http.Request, flags ...stri
return false return false
} }
// fixupValues takes the raw decoded JSON and base64 decodes all the values,
// replacing them with byte arrays with the data.
func fixupValues(raw interface{}) error {
// decodeValue decodes the value member of the given operation.
decodeValue := func(rawOp interface{}) error {
rawMap, ok := rawOp.(map[string]interface{})
if !ok {
return fmt.Errorf("unexpected raw op type: %T", rawOp)
}
for k, v := range rawMap {
switch strings.ToLower(k) {
case "value":
// Leave the byte slice nil if we have a nil
// value.
if v == nil {
return nil
}
// Otherwise, base64 decode it.
s, ok := v.(string)
if !ok {
return fmt.Errorf("unexpected value type: %T", v)
}
decoded, err := base64.StdEncoding.DecodeString(s)
if err != nil {
return fmt.Errorf("failed to decode value: %v", err)
}
rawMap[k] = decoded
return nil
}
}
return nil
}
rawSlice, ok := raw.([]interface{})
if !ok {
return fmt.Errorf("unexpected raw type: %t", raw)
}
for _, rawOp := range rawSlice {
if err := decodeValue(rawOp); err != nil {
return err
}
}
return nil
}
// KVSTxn handles requests to apply multiple KVS operations in a single, atomic
// transaction.
func (s *HTTPServer) KVSTxn(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != "PUT" {
resp.WriteHeader(http.StatusMethodNotAllowed)
return nil, nil
}
var args structs.KVSAtomicRequest
s.parseDC(req, &args.Datacenter)
s.parseToken(req, &args.Token)
// Note the body is in API format, and not the RPC format. If we can't
// decode it, we will return a 400 since we don't have enough context to
// associate the error with a given operation.
var txn api.KVTxn
if err := decodeBody(req, &txn, fixupValues); err != nil {
resp.WriteHeader(http.StatusBadRequest)
resp.Write([]byte(fmt.Sprintf("Failed to parse body: %v", err)))
return nil, nil
}
// Convert the API format into the RPC format. Note that fixupValues
// above will have already converted the base64 encoded strings into
// byte arrays so we can assign right over.
for _, in := range txn {
out := &structs.KVSAtomicOp{
Op: structs.KVSOp(in.Op),
DirEnt: structs.DirEntry{
Key: in.Key,
Value: in.Value,
Flags: in.Flags,
Session: in.Session,
RaftIndex: structs.RaftIndex{
ModifyIndex: in.Index,
},
},
}
args.Ops = append(args.Ops, out)
}
// Make the request and return a conflict status if there were errors
// reported from the transaction.
var reply structs.KVSAtomicResponse
if err := s.agent.RPC("KVS.AtomicApply", &args, &reply); err != nil {
return nil, err
}
if len(reply.Errors) > 0 {
var buf []byte
var err error
buf, err = s.marshalJSON(req, reply)
if err != nil {
return nil, err
}
resp.Header().Set("Content-Type", "application/json")
resp.WriteHeader(http.StatusConflict)
resp.Write(buf)
return nil, nil
}
// Otherwise, return the results of the successful transaction.
return reply, nil
}

View File

@ -573,219 +573,3 @@ func TestKVSEndpoint_DELETE_ConflictingFlags(t *testing.T) {
} }
}) })
} }
func TestKVSEndpoint_Txn(t *testing.T) {
// Bad JSON.
httpTest(t, func(srv *HTTPServer) {
buf := bytes.NewBuffer([]byte("{"))
req, err := http.NewRequest("PUT", "/v1/kv-txn", buf)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
if _, err := srv.KVSTxn(resp, req); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 400 {
t.Fatalf("expected 400, got %d", resp.Code)
}
if !bytes.Contains(resp.Body.Bytes(), []byte("Failed to parse")) {
t.Fatalf("expected conflicting args error")
}
})
// Bad request.
httpTest(t, func(srv *HTTPServer) {
buf := bytes.NewBuffer([]byte("{"))
req, err := http.NewRequest("GET", "/v1/kv-txn", buf)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
if _, err := srv.KVSTxn(resp, req); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 405 {
t.Fatalf("expected 405, got %d", resp.Code)
}
})
// Make sure all incoming fields get converted properly to the internal
// RPC format.
httpTest(t, func(srv *HTTPServer) {
var index uint64
id := makeTestSession(t, srv)
{
buf := bytes.NewBuffer([]byte(fmt.Sprintf(`
[
{
"Op": "lock",
"Key": "key",
"Value": "aGVsbG8gd29ybGQ=",
"Flags": 23,
"Session": %q
},
{
"Op": "get",
"Key": "key"
}
]
`, id)))
req, err := http.NewRequest("PUT", "/v1/kv-txn", buf)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.KVSTxn(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 200 {
t.Fatalf("expected 200, got %d", resp.Code)
}
atomic, ok := obj.(structs.KVSAtomicResponse)
if !ok {
t.Fatalf("bad type: %T", obj)
}
if len(atomic.Results) != 2 {
t.Fatalf("bad: %v", atomic)
}
index = atomic.Results[0].ModifyIndex
expected := structs.KVSAtomicResponse{
Results: structs.DirEntries{
&structs.DirEntry{
Key: "key",
Value: nil,
Flags: 23,
Session: id,
LockIndex: 1,
RaftIndex: structs.RaftIndex{
CreateIndex: index,
ModifyIndex: index,
},
},
&structs.DirEntry{
Key: "key",
Value: []byte("hello world"),
Flags: 23,
Session: id,
LockIndex: 1,
RaftIndex: structs.RaftIndex{
CreateIndex: index,
ModifyIndex: index,
},
},
},
}
if !reflect.DeepEqual(atomic, expected) {
t.Fatalf("bad: %v", atomic)
}
}
// Now that we have an index we can do a CAS to make sure the
// index field gets translated to the RPC format.
{
buf := bytes.NewBuffer([]byte(fmt.Sprintf(`
[
{
"Op": "cas",
"Key": "key",
"Value": "Z29vZGJ5ZSB3b3JsZA==",
"Index": %d
},
{
"Op": "get",
"Key": "key"
}
]
`, index)))
req, err := http.NewRequest("PUT", "/v1/kv-txn", buf)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.KVSTxn(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 200 {
t.Fatalf("expected 200, got %d", resp.Code)
}
atomic, ok := obj.(structs.KVSAtomicResponse)
if !ok {
t.Fatalf("bad type: %T", obj)
}
if len(atomic.Results) != 2 {
t.Fatalf("bad: %v", atomic)
}
modIndex := atomic.Results[0].ModifyIndex
expected := structs.KVSAtomicResponse{
Results: structs.DirEntries{
&structs.DirEntry{
Key: "key",
Value: nil,
Session: id,
RaftIndex: structs.RaftIndex{
CreateIndex: index,
ModifyIndex: modIndex,
},
},
&structs.DirEntry{
Key: "key",
Value: []byte("goodbye world"),
Session: id,
RaftIndex: structs.RaftIndex{
CreateIndex: index,
ModifyIndex: modIndex,
},
},
},
}
for _, r := range atomic.Results {
fmt.Printf("%v\n", *r)
}
if !reflect.DeepEqual(atomic, expected) {
t.Fatalf("bad: %v", atomic)
}
}
})
// Verify an error inside a transaction.
httpTest(t, func(srv *HTTPServer) {
buf := bytes.NewBuffer([]byte(`
[
{
"Op": "lock",
"Key": "key",
"Value": "aGVsbG8gd29ybGQ=",
"Session": "nope"
},
{
"Op": "get",
"Key": "key"
}
]
`))
req, err := http.NewRequest("PUT", "/v1/kv-txn", buf)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
if _, err = srv.KVSTxn(resp, req); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 409 {
t.Fatalf("expected 409, got %d", resp.Code)
}
if !bytes.Contains(resp.Body.Bytes(), []byte("failed session lookup")) {
t.Fatalf("bad: %s", resp.Body.String())
}
})
}

View File

@ -0,0 +1,126 @@
package agent
import (
"encoding/base64"
"fmt"
"net/http"
"strings"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/consul/structs"
)
// fixupValues takes the raw decoded JSON and base64 decodes all the values,
// replacing them with byte arrays with the data.
func fixupValues(raw interface{}) error {
// decodeValue decodes the value member of the given operation.
decodeValue := func(rawOp interface{}) error {
rawMap, ok := rawOp.(map[string]interface{})
if !ok {
return fmt.Errorf("unexpected raw op type: %T", rawOp)
}
for k, v := range rawMap {
switch strings.ToLower(k) {
case "value":
// Leave the byte slice nil if we have a nil
// value.
if v == nil {
return nil
}
// Otherwise, base64 decode it.
s, ok := v.(string)
if !ok {
return fmt.Errorf("unexpected value type: %T", v)
}
decoded, err := base64.StdEncoding.DecodeString(s)
if err != nil {
return fmt.Errorf("failed to decode value: %v", err)
}
rawMap[k] = decoded
return nil
}
}
return nil
}
rawSlice, ok := raw.([]interface{})
if !ok {
return fmt.Errorf("unexpected raw type: %t", raw)
}
for _, rawOp := range rawSlice {
if err := decodeValue(rawOp); err != nil {
return err
}
}
return nil
}
// Txn handles requests to apply multiple operations in a single, atomic
// transaction.
func (s *HTTPServer) Txn(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != "PUT" {
resp.WriteHeader(http.StatusMethodNotAllowed)
return nil, nil
}
var args structs.KVSAtomicRequest
s.parseDC(req, &args.Datacenter)
s.parseToken(req, &args.Token)
// Note the body is in API format, and not the RPC format. If we can't
// decode it, we will return a 400 since we don't have enough context to
// associate the error with a given operation.
var txn api.KVTxn
if err := decodeBody(req, &txn, fixupValues); err != nil {
resp.WriteHeader(http.StatusBadRequest)
resp.Write([]byte(fmt.Sprintf("Failed to parse body: %v", err)))
return nil, nil
}
// Convert the API format into the RPC format. Note that fixupValues
// above will have already converted the base64 encoded strings into
// byte arrays so we can assign right over.
for _, in := range txn {
// TODO @slackpad - Verify the size here, or move that down into
// the endpoint.
out := &structs.KVSAtomicOp{
Op: structs.KVSOp(in.Op),
DirEnt: structs.DirEntry{
Key: in.Key,
Value: in.Value,
Flags: in.Flags,
Session: in.Session,
RaftIndex: structs.RaftIndex{
ModifyIndex: in.Index,
},
},
}
args.Ops = append(args.Ops, out)
}
// Make the request and return a conflict status if there were errors
// reported from the transaction.
var reply structs.KVSAtomicResponse
if err := s.agent.RPC("Txn.Apply", &args, &reply); err != nil {
return nil, err
}
if len(reply.Errors) > 0 {
var buf []byte
var err error
buf, err = s.marshalJSON(req, reply)
if err != nil {
return nil, err
}
resp.Header().Set("Content-Type", "application/json")
resp.WriteHeader(http.StatusConflict)
resp.Write(buf)
return nil, nil
}
// Otherwise, return the results of the successful transaction.
return reply, nil
}

View File

@ -0,0 +1,227 @@
package agent
import (
"bytes"
"fmt"
"net/http"
"net/http/httptest"
"reflect"
"testing"
"github.com/hashicorp/consul/consul/structs"
)
func TestTxnEndpoint_Bad_JSON(t *testing.T) {
httpTest(t, func(srv *HTTPServer) {
buf := bytes.NewBuffer([]byte("{"))
req, err := http.NewRequest("PUT", "/v1/txn", buf)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
if _, err := srv.Txn(resp, req); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 400 {
t.Fatalf("expected 400, got %d", resp.Code)
}
if !bytes.Contains(resp.Body.Bytes(), []byte("Failed to parse")) {
t.Fatalf("expected conflicting args error")
}
})
}
func TestTxnEndpoint_Bad_Method(t *testing.T) {
httpTest(t, func(srv *HTTPServer) {
buf := bytes.NewBuffer([]byte("{}"))
req, err := http.NewRequest("GET", "/v1/txn", buf)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
if _, err := srv.Txn(resp, req); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 405 {
t.Fatalf("expected 405, got %d", resp.Code)
}
})
}
func TestTxnEndpoint_KVS_Actions(t *testing.T) {
httpTest(t, func(srv *HTTPServer) {
// Make sure all incoming fields get converted properly to the internal
// RPC format.
var index uint64
id := makeTestSession(t, srv)
{
buf := bytes.NewBuffer([]byte(fmt.Sprintf(`
[
{
"Op": "lock",
"Key": "key",
"Value": "aGVsbG8gd29ybGQ=",
"Flags": 23,
"Session": %q
},
{
"Op": "get",
"Key": "key"
}
]
`, id)))
req, err := http.NewRequest("PUT", "/v1/txn", buf)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.Txn(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 200 {
t.Fatalf("expected 200, got %d", resp.Code)
}
atomic, ok := obj.(structs.KVSAtomicResponse)
if !ok {
t.Fatalf("bad type: %T", obj)
}
if len(atomic.Results) != 2 {
t.Fatalf("bad: %v", atomic)
}
index = atomic.Results[0].ModifyIndex
expected := structs.KVSAtomicResponse{
Results: structs.DirEntries{
&structs.DirEntry{
Key: "key",
Value: nil,
Flags: 23,
Session: id,
LockIndex: 1,
RaftIndex: structs.RaftIndex{
CreateIndex: index,
ModifyIndex: index,
},
},
&structs.DirEntry{
Key: "key",
Value: []byte("hello world"),
Flags: 23,
Session: id,
LockIndex: 1,
RaftIndex: structs.RaftIndex{
CreateIndex: index,
ModifyIndex: index,
},
},
},
}
if !reflect.DeepEqual(atomic, expected) {
t.Fatalf("bad: %v", atomic)
}
}
// Now that we have an index we can do a CAS to make sure the
// index field gets translated to the RPC format.
{
buf := bytes.NewBuffer([]byte(fmt.Sprintf(`
[
{
"Op": "cas",
"Key": "key",
"Value": "Z29vZGJ5ZSB3b3JsZA==",
"Index": %d
},
{
"Op": "get",
"Key": "key"
}
]
`, index)))
req, err := http.NewRequest("PUT", "/v1/txn", buf)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
obj, err := srv.Txn(resp, req)
if err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 200 {
t.Fatalf("expected 200, got %d", resp.Code)
}
atomic, ok := obj.(structs.KVSAtomicResponse)
if !ok {
t.Fatalf("bad type: %T", obj)
}
if len(atomic.Results) != 2 {
t.Fatalf("bad: %v", atomic)
}
modIndex := atomic.Results[0].ModifyIndex
expected := structs.KVSAtomicResponse{
Results: structs.DirEntries{
&structs.DirEntry{
Key: "key",
Value: nil,
Session: id,
RaftIndex: structs.RaftIndex{
CreateIndex: index,
ModifyIndex: modIndex,
},
},
&structs.DirEntry{
Key: "key",
Value: []byte("goodbye world"),
Session: id,
RaftIndex: structs.RaftIndex{
CreateIndex: index,
ModifyIndex: modIndex,
},
},
},
}
if !reflect.DeepEqual(atomic, expected) {
t.Fatalf("bad: %v", atomic)
}
}
})
// Verify an error inside a transaction.
httpTest(t, func(srv *HTTPServer) {
buf := bytes.NewBuffer([]byte(`
[
{
"Op": "lock",
"Key": "key",
"Value": "aGVsbG8gd29ybGQ=",
"Session": "nope"
},
{
"Op": "get",
"Key": "key"
}
]
`))
req, err := http.NewRequest("PUT", "/v1/txn", buf)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := httptest.NewRecorder()
if _, err = srv.Txn(resp, req); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 409 {
t.Fatalf("expected 409, got %d", resp.Code)
}
if !bytes.Contains(resp.Body.Bytes(), []byte("failed session lookup")) {
t.Fatalf("bad: %s", resp.Body.String())
}
})
}

View File

@ -17,7 +17,7 @@ type KVS struct {
// preApply does all the verification of a KVS update that is performed BEFORE // preApply does all the verification of a KVS update that is performed BEFORE
// we submit as a Raft log entry. This includes enforcing the lock delay which // we submit as a Raft log entry. This includes enforcing the lock delay which
// must only be done on the leader. // must only be done on the leader.
func (k *KVS) preApply(acl acl.ACL, op structs.KVSOp, dirEnt *structs.DirEntry) (bool, error) { func kvsPreApply(srv *Server, acl acl.ACL, op structs.KVSOp, dirEnt *structs.DirEntry) (bool, error) {
// Verify the entry. // Verify the entry.
if dirEnt.Key == "" && op != structs.KVSDeleteTree { if dirEnt.Key == "" && op != structs.KVSDeleteTree {
return false, fmt.Errorf("Must provide key") return false, fmt.Errorf("Must provide key")
@ -52,10 +52,10 @@ func (k *KVS) preApply(acl acl.ACL, op structs.KVSOp, dirEnt *structs.DirEntry)
// Instead, the lock-delay must be enforced before commit. This means that // Instead, the lock-delay must be enforced before commit. This means that
// only the wall-time of the leader node is used, preventing any inconsistencies. // only the wall-time of the leader node is used, preventing any inconsistencies.
if op == structs.KVSLock { if op == structs.KVSLock {
state := k.srv.fsm.State() state := srv.fsm.State()
expires := state.KVSLockDelay(dirEnt.Key) expires := state.KVSLockDelay(dirEnt.Key)
if expires.After(time.Now()) { if expires.After(time.Now()) {
k.srv.logger.Printf("[WARN] consul.kvs: Rejecting lock of %s due to lock-delay until %v", srv.logger.Printf("[WARN] consul.kvs: Rejecting lock of %s due to lock-delay until %v",
dirEnt.Key, expires) dirEnt.Key, expires)
return false, nil return false, nil
} }
@ -76,7 +76,7 @@ func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error {
if err != nil { if err != nil {
return err return err
} }
ok, err := k.preApply(acl, args.Op, &args.DirEnt) ok, err := kvsPreApply(k.srv, acl, args.Op, &args.DirEnt)
if err != nil { if err != nil {
return err return err
} }
@ -102,52 +102,6 @@ func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error {
return nil return nil
} }
// AtomicApply is used to apply multiple KVS operations in a single, atomic
// transaction.
func (k *KVS) AtomicApply(args *structs.KVSAtomicRequest, reply *structs.KVSAtomicResponse) error {
if done, err := k.srv.forward("KVS.AtomicApply", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"consul", "kvs", "apply-atomic"}, time.Now())
// Perform the pre-apply checks on each of the operations.
acl, err := k.srv.resolveToken(args.Token)
if err != nil {
return err
}
for i, op := range args.Ops {
ok, err := k.preApply(acl, op.Op, &op.DirEnt)
if err != nil {
reply.Errors = append(reply.Errors, &structs.KVSAtomicError{i, err.Error()})
} else if !ok {
err = fmt.Errorf("failed to lock key %q due to lock delay", op.DirEnt.Key)
reply.Errors = append(reply.Errors, &structs.KVSAtomicError{i, err.Error()})
}
}
if len(reply.Errors) > 0 {
return nil
}
// Apply the update.
resp, err := k.srv.raftApply(structs.KVSAtomicRequestType, args)
if err != nil {
k.srv.logger.Printf("[ERR] consul.kvs: ApplyAtomic failed: %v", err)
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
// Convert the return type. This should be a cheap copy since we are
// just taking the two slices.
if respAtomic, ok := resp.(structs.KVSAtomicResponse); ok {
*reply = respAtomic
} else {
return fmt.Errorf("unexpected return type %T", resp)
}
return nil
}
// Get is used to lookup a single key. // Get is used to lookup a single key.
func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) error { func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) error {
if done, err := k.srv.forward("KVS.Get", args, args, reply); done { if done, err := k.srv.forward("KVS.Get", args, args, reply); done {

View File

@ -1,9 +1,7 @@
package consul package consul
import ( import (
"bytes"
"os" "os"
"reflect"
"strings" "strings"
"testing" "testing"
"time" "time"
@ -131,198 +129,6 @@ func TestKVS_Apply_ACLDeny(t *testing.T) {
} }
} }
func TestKVS_AtomicApply(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Do a super basic request. The state store test covers the details so
// we just need to be sure that the transaction is sent correctly and
// the results are converted appropriately.
arg := structs.KVSAtomicRequest{
Datacenter: "dc1",
Ops: structs.KVSAtomicOps{
&structs.KVSAtomicOp{
Op: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "test",
Flags: 42,
Value: []byte("test"),
},
},
&structs.KVSAtomicOp{
Op: structs.KVSAtomicGet,
DirEnt: structs.DirEntry{
Key: "test",
},
},
},
}
var out structs.KVSAtomicResponse
if err := msgpackrpc.CallWithCodec(codec, "KVS.AtomicApply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Verify the state store directly.
state := s1.fsm.State()
_, d, err := state.KVSGet("test")
if err != nil {
t.Fatalf("err: %v", err)
}
if d == nil {
t.Fatalf("should not be nil")
}
if d.Flags != 42 ||
!bytes.Equal(d.Value, []byte("test")) {
t.Fatalf("bad: %v", d)
}
// Verify the transaction's return value.
expected := structs.KVSAtomicResponse{
Results: structs.DirEntries{
&structs.DirEntry{
Key: "test",
Flags: 42,
Value: nil,
RaftIndex: structs.RaftIndex{
CreateIndex: d.CreateIndex,
ModifyIndex: d.ModifyIndex,
},
},
&structs.DirEntry{
Key: "test",
Flags: 42,
Value: []byte("test"),
RaftIndex: structs.RaftIndex{
CreateIndex: d.CreateIndex,
ModifyIndex: d.ModifyIndex,
},
},
},
}
if !reflect.DeepEqual(out, expected) {
t.Fatalf("bad %v", out)
}
}
func TestKVS_AtomicApply_ACLDeny(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Create the ACL.
var id string
{
arg := structs.ACLRequest{
Datacenter: "dc1",
Op: structs.ACLSet,
ACL: structs.ACL{
Name: "User token",
Type: structs.ACLTypeClient,
Rules: testListRules,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &id); err != nil {
t.Fatalf("err: %v", err)
}
}
// Set up a transaction where every operation should get blocked due to
// ACLs.
arg := structs.KVSAtomicRequest{
Datacenter: "dc1",
Ops: structs.KVSAtomicOps{
&structs.KVSAtomicOp{
Op: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "foo",
},
},
&structs.KVSAtomicOp{
Op: structs.KVSDelete,
DirEnt: structs.DirEntry{
Key: "foo",
},
},
&structs.KVSAtomicOp{
Op: structs.KVSDeleteCAS,
DirEnt: structs.DirEntry{
Key: "foo",
},
},
&structs.KVSAtomicOp{
Op: structs.KVSDeleteTree,
DirEnt: structs.DirEntry{
Key: "foo",
},
},
&structs.KVSAtomicOp{
Op: structs.KVSCAS,
DirEnt: structs.DirEntry{
Key: "foo",
},
},
&structs.KVSAtomicOp{
Op: structs.KVSLock,
DirEnt: structs.DirEntry{
Key: "foo",
},
},
&structs.KVSAtomicOp{
Op: structs.KVSUnlock,
DirEnt: structs.DirEntry{
Key: "foo",
},
},
&structs.KVSAtomicOp{
Op: structs.KVSAtomicGet,
DirEnt: structs.DirEntry{
Key: "nope",
},
},
&structs.KVSAtomicOp{
Op: structs.KVSAtomicCheckSession,
DirEnt: structs.DirEntry{
Key: "nope",
},
},
&structs.KVSAtomicOp{
Op: structs.KVSAtomicCheckIndex,
DirEnt: structs.DirEntry{
Key: "nope",
},
},
},
WriteRequest: structs.WriteRequest{Token: id},
}
var out structs.KVSAtomicResponse
if err := msgpackrpc.CallWithCodec(codec, "KVS.AtomicApply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Verify the transaction's return value.
var expected structs.KVSAtomicResponse
for i, _ := range arg.Ops {
expected.Errors = append(expected.Errors, &structs.KVSAtomicError{i, permissionDeniedErr.Error()})
}
if !reflect.DeepEqual(out, expected) {
t.Fatalf("bad %v", out)
}
}
func TestKVS_Get(t *testing.T) { func TestKVS_Get(t *testing.T) {
dir1, s1 := testServer(t) dir1, s1 := testServer(t)
defer os.RemoveAll(dir1) defer os.RemoveAll(dir1)
@ -881,89 +687,6 @@ func TestKVS_Apply_LockDelay(t *testing.T) {
} }
} }
func TestKVS_AtomicApply_LockDelay(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Create and invalidate a session with a lock.
state := s1.fsm.State()
if err := state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
session := &structs.Session{
ID: generateUUID(),
Node: "foo",
LockDelay: 50 * time.Millisecond,
}
if err := state.SessionCreate(2, session); err != nil {
t.Fatalf("err: %v", err)
}
id := session.ID
d := &structs.DirEntry{
Key: "test",
Session: id,
}
if ok, err := state.KVSLock(3, d); err != nil || !ok {
t.Fatalf("err: %v", err)
}
if err := state.SessionDestroy(4, id); err != nil {
t.Fatalf("err: %v", err)
}
// Make a new session that is valid.
if err := state.SessionCreate(5, session); err != nil {
t.Fatalf("err: %v", err)
}
validId := session.ID
// Make a lock request via an atomic transaction.
arg := structs.KVSAtomicRequest{
Datacenter: "dc1",
Ops: structs.KVSAtomicOps{
&structs.KVSAtomicOp{
Op: structs.KVSLock,
DirEnt: structs.DirEntry{
Key: "test",
Session: validId,
},
},
},
}
{
var out structs.KVSAtomicResponse
if err := msgpackrpc.CallWithCodec(codec, "KVS.AtomicApply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
if len(out.Results) != 0 ||
len(out.Errors) != 1 ||
out.Errors[0].OpIndex != 0 ||
!strings.Contains(out.Errors[0].What, "due to lock delay") {
t.Fatalf("bad: %v", out)
}
}
// Wait for lock-delay.
time.Sleep(50 * time.Millisecond)
// Should acquire.
{
var out structs.KVSAtomicResponse
if err := msgpackrpc.CallWithCodec(codec, "KVS.AtomicApply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
if len(out.Results) != 1 ||
len(out.Errors) != 0 ||
out.Results[0].LockIndex != 2 {
t.Fatalf("bad: %v", out)
}
}
}
func TestKVS_Issue_1626(t *testing.T) { func TestKVS_Issue_1626(t *testing.T) {
dir1, s1 := testServer(t) dir1, s1 := testServer(t)
defer os.RemoveAll(dir1) defer os.RemoveAll(dir1)

View File

@ -165,6 +165,7 @@ type endpoints struct {
ACL *ACL ACL *ACL
Coordinate *Coordinate Coordinate *Coordinate
PreparedQuery *PreparedQuery PreparedQuery *PreparedQuery
Txn *Txn
} }
// NewServer is used to construct a new Consul server from the // NewServer is used to construct a new Consul server from the
@ -441,6 +442,7 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error {
s.endpoints.ACL = &ACL{s} s.endpoints.ACL = &ACL{s}
s.endpoints.Coordinate = NewCoordinate(s) s.endpoints.Coordinate = NewCoordinate(s)
s.endpoints.PreparedQuery = &PreparedQuery{s} s.endpoints.PreparedQuery = &PreparedQuery{s}
s.endpoints.Txn = &Txn{s}
// Register the handlers // Register the handlers
s.rpcServer.Register(s.endpoints.Status) s.rpcServer.Register(s.endpoints.Status)
@ -452,6 +454,7 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error {
s.rpcServer.Register(s.endpoints.ACL) s.rpcServer.Register(s.endpoints.ACL)
s.rpcServer.Register(s.endpoints.Coordinate) s.rpcServer.Register(s.endpoints.Coordinate)
s.rpcServer.Register(s.endpoints.PreparedQuery) s.rpcServer.Register(s.endpoints.PreparedQuery)
s.rpcServer.Register(s.endpoints.Txn)
list, err := net.ListenTCP("tcp", s.config.RPCAddr) list, err := net.ListenTCP("tcp", s.config.RPCAddr)
if err != nil { if err != nil {

59
consul/txn_endpoint.go Normal file
View File

@ -0,0 +1,59 @@
package consul
import (
"fmt"
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/consul/structs"
)
// Txn endpoint is used to perform multi-object atomic transactions.
type Txn struct {
srv *Server
}
// Apply is used to apply multiple operations in a single, atomic transaction.
func (t *Txn) Apply(args *structs.KVSAtomicRequest, reply *structs.KVSAtomicResponse) error {
if done, err := t.srv.forward("Txn.Apply", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"consul", "txn", "apply"}, time.Now())
// Perform the pre-apply checks on each of the operations.
acl, err := t.srv.resolveToken(args.Token)
if err != nil {
return err
}
for i, op := range args.Ops {
ok, err := kvsPreApply(t.srv, acl, op.Op, &op.DirEnt)
if err != nil {
reply.Errors = append(reply.Errors, &structs.KVSAtomicError{i, err.Error()})
} else if !ok {
err = fmt.Errorf("failed to lock key %q due to lock delay", op.DirEnt.Key)
reply.Errors = append(reply.Errors, &structs.KVSAtomicError{i, err.Error()})
}
}
if len(reply.Errors) > 0 {
return nil
}
// Apply the update.
resp, err := t.srv.raftApply(structs.KVSAtomicRequestType, args)
if err != nil {
t.srv.logger.Printf("[ERR] consul.kvs: ApplyAtomic failed: %v", err)
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
// Convert the return type. This should be a cheap copy since we are
// just taking the two slices.
if respAtomic, ok := resp.(structs.KVSAtomicResponse); ok {
*reply = respAtomic
} else {
return fmt.Errorf("unexpected return type %T", resp)
}
return nil
}

288
consul/txn_endpoint_test.go Normal file
View File

@ -0,0 +1,288 @@
package consul
import (
"bytes"
"os"
"reflect"
"strings"
"testing"
"time"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/net-rpc-msgpackrpc"
)
func TestTxn_Apply(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Do a super basic request. The state store test covers the details so
// we just need to be sure that the transaction is sent correctly and
// the results are converted appropriately.
arg := structs.KVSAtomicRequest{
Datacenter: "dc1",
Ops: structs.KVSAtomicOps{
&structs.KVSAtomicOp{
Op: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "test",
Flags: 42,
Value: []byte("test"),
},
},
&structs.KVSAtomicOp{
Op: structs.KVSAtomicGet,
DirEnt: structs.DirEntry{
Key: "test",
},
},
},
}
var out structs.KVSAtomicResponse
if err := msgpackrpc.CallWithCodec(codec, "Txn.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Verify the state store directly.
state := s1.fsm.State()
_, d, err := state.KVSGet("test")
if err != nil {
t.Fatalf("err: %v", err)
}
if d == nil {
t.Fatalf("should not be nil")
}
if d.Flags != 42 ||
!bytes.Equal(d.Value, []byte("test")) {
t.Fatalf("bad: %v", d)
}
// Verify the transaction's return value.
expected := structs.KVSAtomicResponse{
Results: structs.DirEntries{
&structs.DirEntry{
Key: "test",
Flags: 42,
Value: nil,
RaftIndex: structs.RaftIndex{
CreateIndex: d.CreateIndex,
ModifyIndex: d.ModifyIndex,
},
},
&structs.DirEntry{
Key: "test",
Flags: 42,
Value: []byte("test"),
RaftIndex: structs.RaftIndex{
CreateIndex: d.CreateIndex,
ModifyIndex: d.ModifyIndex,
},
},
},
}
if !reflect.DeepEqual(out, expected) {
t.Fatalf("bad %v", out)
}
}
func TestTxn_Apply_ACLDeny(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.ACLDatacenter = "dc1"
c.ACLMasterToken = "root"
c.ACLDefaultPolicy = "deny"
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Create the ACL.
var id string
{
arg := structs.ACLRequest{
Datacenter: "dc1",
Op: structs.ACLSet,
ACL: structs.ACL{
Name: "User token",
Type: structs.ACLTypeClient,
Rules: testListRules,
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &id); err != nil {
t.Fatalf("err: %v", err)
}
}
// Set up a transaction where every operation should get blocked due to
// ACLs.
arg := structs.KVSAtomicRequest{
Datacenter: "dc1",
Ops: structs.KVSAtomicOps{
&structs.KVSAtomicOp{
Op: structs.KVSSet,
DirEnt: structs.DirEntry{
Key: "foo",
},
},
&structs.KVSAtomicOp{
Op: structs.KVSDelete,
DirEnt: structs.DirEntry{
Key: "foo",
},
},
&structs.KVSAtomicOp{
Op: structs.KVSDeleteCAS,
DirEnt: structs.DirEntry{
Key: "foo",
},
},
&structs.KVSAtomicOp{
Op: structs.KVSDeleteTree,
DirEnt: structs.DirEntry{
Key: "foo",
},
},
&structs.KVSAtomicOp{
Op: structs.KVSCAS,
DirEnt: structs.DirEntry{
Key: "foo",
},
},
&structs.KVSAtomicOp{
Op: structs.KVSLock,
DirEnt: structs.DirEntry{
Key: "foo",
},
},
&structs.KVSAtomicOp{
Op: structs.KVSUnlock,
DirEnt: structs.DirEntry{
Key: "foo",
},
},
&structs.KVSAtomicOp{
Op: structs.KVSAtomicGet,
DirEnt: structs.DirEntry{
Key: "nope",
},
},
&structs.KVSAtomicOp{
Op: structs.KVSAtomicCheckSession,
DirEnt: structs.DirEntry{
Key: "nope",
},
},
&structs.KVSAtomicOp{
Op: structs.KVSAtomicCheckIndex,
DirEnt: structs.DirEntry{
Key: "nope",
},
},
},
WriteRequest: structs.WriteRequest{Token: id},
}
var out structs.KVSAtomicResponse
if err := msgpackrpc.CallWithCodec(codec, "Txn.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Verify the transaction's return value.
var expected structs.KVSAtomicResponse
for i, _ := range arg.Ops {
expected.Errors = append(expected.Errors, &structs.KVSAtomicError{i, permissionDeniedErr.Error()})
}
if !reflect.DeepEqual(out, expected) {
t.Fatalf("bad %v", out)
}
}
func TestTxn_Apply_LockDelay(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testutil.WaitForLeader(t, s1.RPC, "dc1")
// Create and invalidate a session with a lock.
state := s1.fsm.State()
if err := state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"}); err != nil {
t.Fatalf("err: %v", err)
}
session := &structs.Session{
ID: generateUUID(),
Node: "foo",
LockDelay: 50 * time.Millisecond,
}
if err := state.SessionCreate(2, session); err != nil {
t.Fatalf("err: %v", err)
}
id := session.ID
d := &structs.DirEntry{
Key: "test",
Session: id,
}
if ok, err := state.KVSLock(3, d); err != nil || !ok {
t.Fatalf("err: %v", err)
}
if err := state.SessionDestroy(4, id); err != nil {
t.Fatalf("err: %v", err)
}
// Make a new session that is valid.
if err := state.SessionCreate(5, session); err != nil {
t.Fatalf("err: %v", err)
}
validId := session.ID
// Make a lock request via an atomic transaction.
arg := structs.KVSAtomicRequest{
Datacenter: "dc1",
Ops: structs.KVSAtomicOps{
&structs.KVSAtomicOp{
Op: structs.KVSLock,
DirEnt: structs.DirEntry{
Key: "test",
Session: validId,
},
},
},
}
{
var out structs.KVSAtomicResponse
if err := msgpackrpc.CallWithCodec(codec, "Txn.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
if len(out.Results) != 0 ||
len(out.Errors) != 1 ||
out.Errors[0].OpIndex != 0 ||
!strings.Contains(out.Errors[0].What, "due to lock delay") {
t.Fatalf("bad: %v", out)
}
}
// Wait for lock-delay.
time.Sleep(50 * time.Millisecond)
// Should acquire.
{
var out structs.KVSAtomicResponse
if err := msgpackrpc.CallWithCodec(codec, "Txn.Apply", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
if len(out.Results) != 1 ||
len(out.Errors) != 0 ||
out.Results[0].LockIndex != 2 {
t.Fatalf("bad: %v", out)
}
}
}