mirror of https://github.com/hashicorp/consul
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
399 lines
8.6 KiB
399 lines
8.6 KiB
package agent |
|
|
|
import ( |
|
"bytes" |
|
"fmt" |
|
"net/http" |
|
"net/http/httptest" |
|
"reflect" |
|
"strings" |
|
"testing" |
|
|
|
"github.com/hashicorp/consul/agent/structs" |
|
) |
|
|
|
func TestTxnEndpoint_Bad_JSON(t *testing.T) { |
|
t.Parallel() |
|
a := NewTestAgent(t.Name(), "") |
|
defer a.Shutdown() |
|
|
|
buf := bytes.NewBuffer([]byte("{")) |
|
req, _ := http.NewRequest("PUT", "/v1/txn", buf) |
|
resp := httptest.NewRecorder() |
|
if _, err := a.srv.Txn(resp, req); err != nil { |
|
t.Fatalf("err: %v", err) |
|
} |
|
if resp.Code != 400 { |
|
t.Fatalf("expected 400, got %d", resp.Code) |
|
} |
|
if !bytes.Contains(resp.Body.Bytes(), []byte("Failed to parse")) { |
|
t.Fatalf("expected conflicting args error") |
|
} |
|
} |
|
|
|
func TestTxnEndpoint_Bad_Size_Item(t *testing.T) { |
|
t.Parallel() |
|
a := NewTestAgent(t.Name(), "") |
|
defer a.Shutdown() |
|
|
|
buf := bytes.NewBuffer([]byte(fmt.Sprintf(` |
|
[ |
|
{ |
|
"KV": { |
|
"Verb": "set", |
|
"Key": "key", |
|
"Value": %q |
|
} |
|
} |
|
] |
|
`, strings.Repeat("bad", 2*maxKVSize)))) |
|
req, _ := http.NewRequest("PUT", "/v1/txn", buf) |
|
resp := httptest.NewRecorder() |
|
if _, err := a.srv.Txn(resp, req); err != nil { |
|
t.Fatalf("err: %v", err) |
|
} |
|
if resp.Code != 413 { |
|
t.Fatalf("expected 413, got %d", resp.Code) |
|
} |
|
} |
|
|
|
func TestTxnEndpoint_Bad_Size_Net(t *testing.T) { |
|
t.Parallel() |
|
a := NewTestAgent(t.Name(), "") |
|
defer a.Shutdown() |
|
|
|
value := strings.Repeat("X", maxKVSize/2) |
|
buf := bytes.NewBuffer([]byte(fmt.Sprintf(` |
|
[ |
|
{ |
|
"KV": { |
|
"Verb": "set", |
|
"Key": "key1", |
|
"Value": %q |
|
} |
|
}, |
|
{ |
|
"KV": { |
|
"Verb": "set", |
|
"Key": "key1", |
|
"Value": %q |
|
} |
|
}, |
|
{ |
|
"KV": { |
|
"Verb": "set", |
|
"Key": "key1", |
|
"Value": %q |
|
} |
|
} |
|
] |
|
`, value, value, value))) |
|
req, _ := http.NewRequest("PUT", "/v1/txn", buf) |
|
resp := httptest.NewRecorder() |
|
if _, err := a.srv.Txn(resp, req); err != nil { |
|
t.Fatalf("err: %v", err) |
|
} |
|
if resp.Code != 413 { |
|
t.Fatalf("expected 413, got %d", resp.Code) |
|
} |
|
} |
|
|
|
func TestTxnEndpoint_Bad_Size_Ops(t *testing.T) { |
|
t.Parallel() |
|
a := NewTestAgent(t.Name(), "") |
|
defer a.Shutdown() |
|
|
|
buf := bytes.NewBuffer([]byte(fmt.Sprintf(` |
|
[ |
|
%s |
|
{ |
|
"KV": { |
|
"Verb": "set", |
|
"Key": "key", |
|
"Value": "" |
|
} |
|
} |
|
] |
|
`, strings.Repeat(`{ "KV": { "Verb": "get", "Key": "key" } },`, 2*maxTxnOps)))) |
|
req, _ := http.NewRequest("PUT", "/v1/txn", buf) |
|
resp := httptest.NewRecorder() |
|
if _, err := a.srv.Txn(resp, req); err != nil { |
|
t.Fatalf("err: %v", err) |
|
} |
|
if resp.Code != 413 { |
|
t.Fatalf("expected 413, got %d", resp.Code) |
|
} |
|
} |
|
|
|
func TestTxnEndpoint_KV_Actions(t *testing.T) { |
|
t.Parallel() |
|
t.Run("", func(t *testing.T) { |
|
a := NewTestAgent(t.Name(), "") |
|
defer a.Shutdown() |
|
|
|
// Make sure all incoming fields get converted properly to the internal |
|
// RPC format. |
|
var index uint64 |
|
id := makeTestSession(t, a.srv) |
|
{ |
|
buf := bytes.NewBuffer([]byte(fmt.Sprintf(` |
|
[ |
|
{ |
|
"KV": { |
|
"Verb": "lock", |
|
"Key": "key", |
|
"Value": "aGVsbG8gd29ybGQ=", |
|
"Flags": 23, |
|
"Session": %q |
|
} |
|
}, |
|
{ |
|
"KV": { |
|
"Verb": "get", |
|
"Key": "key" |
|
} |
|
} |
|
] |
|
`, id))) |
|
req, _ := http.NewRequest("PUT", "/v1/txn", buf) |
|
resp := httptest.NewRecorder() |
|
obj, err := a.srv.Txn(resp, req) |
|
if err != nil { |
|
t.Fatalf("err: %v", err) |
|
} |
|
if resp.Code != 200 { |
|
t.Fatalf("expected 200, got %d", resp.Code) |
|
} |
|
|
|
txnResp, ok := obj.(structs.TxnResponse) |
|
if !ok { |
|
t.Fatalf("bad type: %T", obj) |
|
} |
|
if len(txnResp.Results) != 2 { |
|
t.Fatalf("bad: %v", txnResp) |
|
} |
|
index = txnResp.Results[0].KV.ModifyIndex |
|
expected := structs.TxnResponse{ |
|
Results: structs.TxnResults{ |
|
&structs.TxnResult{ |
|
KV: &structs.DirEntry{ |
|
Key: "key", |
|
Value: nil, |
|
Flags: 23, |
|
Session: id, |
|
LockIndex: 1, |
|
RaftIndex: structs.RaftIndex{ |
|
CreateIndex: index, |
|
ModifyIndex: index, |
|
}, |
|
}, |
|
}, |
|
&structs.TxnResult{ |
|
KV: &structs.DirEntry{ |
|
Key: "key", |
|
Value: []byte("hello world"), |
|
Flags: 23, |
|
Session: id, |
|
LockIndex: 1, |
|
RaftIndex: structs.RaftIndex{ |
|
CreateIndex: index, |
|
ModifyIndex: index, |
|
}, |
|
}, |
|
}, |
|
}, |
|
} |
|
if !reflect.DeepEqual(txnResp, expected) { |
|
t.Fatalf("bad: %v", txnResp) |
|
} |
|
} |
|
|
|
// Do a read-only transaction that should get routed to the |
|
// fast-path endpoint. |
|
{ |
|
buf := bytes.NewBuffer([]byte(` |
|
[ |
|
{ |
|
"KV": { |
|
"Verb": "get", |
|
"Key": "key" |
|
} |
|
}, |
|
{ |
|
"KV": { |
|
"Verb": "get-tree", |
|
"Key": "key" |
|
} |
|
} |
|
] |
|
`)) |
|
req, _ := http.NewRequest("PUT", "/v1/txn", buf) |
|
resp := httptest.NewRecorder() |
|
obj, err := a.srv.Txn(resp, req) |
|
if err != nil { |
|
t.Fatalf("err: %v", err) |
|
} |
|
if resp.Code != 200 { |
|
t.Fatalf("expected 200, got %d", resp.Code) |
|
} |
|
|
|
header := resp.Header().Get("X-Consul-KnownLeader") |
|
if header != "true" { |
|
t.Fatalf("bad: %v", header) |
|
} |
|
header = resp.Header().Get("X-Consul-LastContact") |
|
if header != "0" { |
|
t.Fatalf("bad: %v", header) |
|
} |
|
|
|
txnResp, ok := obj.(structs.TxnReadResponse) |
|
if !ok { |
|
t.Fatalf("bad type: %T", obj) |
|
} |
|
expected := structs.TxnReadResponse{ |
|
TxnResponse: structs.TxnResponse{ |
|
Results: structs.TxnResults{ |
|
&structs.TxnResult{ |
|
KV: &structs.DirEntry{ |
|
Key: "key", |
|
Value: []byte("hello world"), |
|
Flags: 23, |
|
Session: id, |
|
LockIndex: 1, |
|
RaftIndex: structs.RaftIndex{ |
|
CreateIndex: index, |
|
ModifyIndex: index, |
|
}, |
|
}, |
|
}, |
|
&structs.TxnResult{ |
|
KV: &structs.DirEntry{ |
|
Key: "key", |
|
Value: []byte("hello world"), |
|
Flags: 23, |
|
Session: id, |
|
LockIndex: 1, |
|
RaftIndex: structs.RaftIndex{ |
|
CreateIndex: index, |
|
ModifyIndex: index, |
|
}, |
|
}, |
|
}, |
|
}, |
|
}, |
|
QueryMeta: structs.QueryMeta{ |
|
KnownLeader: true, |
|
}, |
|
} |
|
if !reflect.DeepEqual(txnResp, expected) { |
|
t.Fatalf("bad: %v", txnResp) |
|
} |
|
} |
|
|
|
// Now that we have an index we can do a CAS to make sure the |
|
// index field gets translated to the RPC format. |
|
{ |
|
buf := bytes.NewBuffer([]byte(fmt.Sprintf(` |
|
[ |
|
{ |
|
"KV": { |
|
"Verb": "cas", |
|
"Key": "key", |
|
"Value": "Z29vZGJ5ZSB3b3JsZA==", |
|
"Index": %d |
|
} |
|
}, |
|
{ |
|
"KV": { |
|
"Verb": "get", |
|
"Key": "key" |
|
} |
|
} |
|
] |
|
`, index))) |
|
req, _ := http.NewRequest("PUT", "/v1/txn", buf) |
|
resp := httptest.NewRecorder() |
|
obj, err := a.srv.Txn(resp, req) |
|
if err != nil { |
|
t.Fatalf("err: %v", err) |
|
} |
|
if resp.Code != 200 { |
|
t.Fatalf("expected 200, got %d", resp.Code) |
|
} |
|
|
|
txnResp, ok := obj.(structs.TxnResponse) |
|
if !ok { |
|
t.Fatalf("bad type: %T", obj) |
|
} |
|
if len(txnResp.Results) != 2 { |
|
t.Fatalf("bad: %v", txnResp) |
|
} |
|
modIndex := txnResp.Results[0].KV.ModifyIndex |
|
expected := structs.TxnResponse{ |
|
Results: structs.TxnResults{ |
|
&structs.TxnResult{ |
|
KV: &structs.DirEntry{ |
|
Key: "key", |
|
Value: nil, |
|
Session: id, |
|
RaftIndex: structs.RaftIndex{ |
|
CreateIndex: index, |
|
ModifyIndex: modIndex, |
|
}, |
|
}, |
|
}, |
|
&structs.TxnResult{ |
|
KV: &structs.DirEntry{ |
|
Key: "key", |
|
Value: []byte("goodbye world"), |
|
Session: id, |
|
RaftIndex: structs.RaftIndex{ |
|
CreateIndex: index, |
|
ModifyIndex: modIndex, |
|
}, |
|
}, |
|
}, |
|
}, |
|
} |
|
if !reflect.DeepEqual(txnResp, expected) { |
|
t.Fatalf("bad: %v", txnResp) |
|
} |
|
} |
|
}) |
|
|
|
// Verify an error inside a transaction. |
|
t.Run("", func(t *testing.T) { |
|
a := NewTestAgent(t.Name(), "") |
|
defer a.Shutdown() |
|
|
|
buf := bytes.NewBuffer([]byte(` |
|
[ |
|
{ |
|
"KV": { |
|
"Verb": "lock", |
|
"Key": "key", |
|
"Value": "aGVsbG8gd29ybGQ=", |
|
"Session": "nope" |
|
} |
|
}, |
|
{ |
|
"KV": { |
|
"Verb": "get", |
|
"Key": "key" |
|
} |
|
} |
|
] |
|
`)) |
|
req, _ := http.NewRequest("PUT", "/v1/txn", buf) |
|
resp := httptest.NewRecorder() |
|
if _, err := a.srv.Txn(resp, req); err != nil { |
|
t.Fatalf("err: %v", err) |
|
} |
|
if resp.Code != 409 { |
|
t.Fatalf("expected 409, got %d", resp.Code) |
|
} |
|
if !bytes.Contains(resp.Body.Bytes(), []byte("failed session lookup")) { |
|
t.Fatalf("bad: %s", resp.Body.String()) |
|
} |
|
}) |
|
}
|
|
|