From 67bac7a8150394e7acafed6689e7bcb79277dd41 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Wed, 12 Dec 2018 09:14:02 -0800 Subject: [PATCH] api: add support for new txn operations --- agent/consul/acl.go | 15 -- agent/consul/state/catalog.go | 16 ++- agent/consul/state/txn.go | 12 +- agent/consul/state/txn_test.go | 21 +-- agent/consul/txn_endpoint.go | 31 +++++ agent/http.go | 13 +- agent/structs/structs.go | 64 +++++++-- agent/txn_endpoint.go | 114 ++++++++++++++- api/health.go | 58 +++++++- api/health_test.go | 6 +- api/kv.go | 2 +- api/kv_test.go | 2 +- api/txn.go | 37 +++-- api/txn_test.go | 247 +++++++++++++++++++++++++++++++++ 14 files changed, 571 insertions(+), 67 deletions(-) create mode 100644 api/txn_test.go diff --git a/agent/consul/acl.go b/agent/consul/acl.go index 749e0ece16..113cf93d9d 100644 --- a/agent/consul/acl.go +++ b/agent/consul/acl.go @@ -1361,11 +1361,6 @@ func vetNodeTxnOp(op *structs.TxnNodeOp, rule acl.Authorizer) error { node := op.Node - // Filtering for GETs is done on the output side. - if op.Verb == api.NodeGet { - return nil - } - n := &api.Node{ Node: node.Node, ID: string(node.ID), @@ -1399,11 +1394,6 @@ func vetServiceTxnOp(op *structs.TxnServiceOp, rule acl.Authorizer) error { service := op.Service - // Filtering for GETs is done on the output side. - if op.Verb == api.ServiceGet { - return nil - } - n := &api.Node{Node: op.Node} svc := &api.AgentService{ ID: service.ID, @@ -1431,11 +1421,6 @@ func vetCheckTxnOp(op *structs.TxnCheckOp, rule acl.Authorizer) error { return nil } - // Filtering for GETs is done on the output side. - if op.Verb == api.CheckGet { - return nil - } - n := &api.Node{Node: op.Check.Node} svc := &api.AgentService{ ID: op.Check.ServiceID, diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index b2de582065..0e21241e00 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -491,14 +491,22 @@ func (s *Store) GetNode(id string) (uint64, *structs.Node, error) { idx := maxIndexTxn(tx, "nodes") // Retrieve the node from the state store - node, err := tx.First("nodes", "id", id) + node, err := getNodeTxn(tx, id) if err != nil { return 0, nil, fmt.Errorf("node lookup failed: %s", err) } - if node != nil { - return idx, node.(*structs.Node), nil + return idx, node, nil +} + +func getNodeTxn(tx *memdb.Txn, nodeName string) (*structs.Node, error) { + node, err := tx.First("nodes", "id", nodeName) + if err != nil { + return nil, fmt.Errorf("node lookup failed: %s", err) } - return idx, nil, nil + if node != nil { + return node.(*structs.Node), nil + } + return nil, nil } func getNodeIDTxn(tx *memdb.Txn, id types.NodeID) (*structs.Node, error) { diff --git a/agent/consul/state/txn.go b/agent/consul/state/txn.go index aa6458f02a..53d16460ad 100644 --- a/agent/consul/state/txn.go +++ b/agent/consul/state/txn.go @@ -129,7 +129,14 @@ func (s *Store) txnNode(tx *memdb.Txn, idx uint64, op *structs.TxnNodeOp) (struc switch op.Verb { case api.NodeGet: - entry, err = getNodeIDTxn(tx, op.Node.ID) + if op.Node.ID != "" { + entry, err = getNodeIDTxn(tx, op.Node.ID) + } else { + entry, err = getNodeTxn(tx, op.Node.Node) + } + if entry == nil && err == nil { + err = fmt.Errorf("node %q doesn't exist", op.Node.Node) + } case api.NodeSet: err = s.ensureNodeTxn(tx, idx, &op.Node) @@ -188,6 +195,9 @@ func (s *Store) txnService(tx *memdb.Txn, idx uint64, op *structs.TxnServiceOp) switch op.Verb { case api.ServiceGet: entry, err = s.nodeServiceTxn(tx, op.Node, op.Service.ID) + if entry == nil && err == nil { + err = fmt.Errorf("service %q on node %q doesn't exist", op.Service.ID, op.Node) + } case api.ServiceSet: err = s.ensureServiceTxn(tx, idx, op.Node, &op.Service) diff --git a/agent/consul/state/txn_test.go b/agent/consul/state/txn_test.go index 64af556da2..c21e3a0ea9 100644 --- a/agent/consul/state/txn_test.go +++ b/agent/consul/state/txn_test.go @@ -279,27 +279,32 @@ func TestStateStore_Txn_Service(t *testing.T) { Service: "svc1", Address: "1.1.1.1", Port: 1111, + Weights: &structs.Weights{Passing: 1, Warning: 1}, RaftIndex: structs.RaftIndex{ CreateIndex: 2, ModifyIndex: 2, }, + }, + }, + &structs.TxnResult{ + Service: &structs.NodeService{ + ID: "svc5", Weights: &structs.Weights{Passing: 1, Warning: 1}, + RaftIndex: structs.RaftIndex{ + CreateIndex: 6, + ModifyIndex: 6, + }, }, }, &structs.TxnResult{ Service: &structs.NodeService{ - ID: "svc5", - }, - }, - &structs.TxnResult{ - Service: &structs.NodeService{ - ID: "svc2", - Tags: []string{"modified"}, + ID: "svc2", + Tags: []string{"modified"}, + Weights: &structs.Weights{Passing: 1, Warning: 1}, RaftIndex: structs.RaftIndex{ CreateIndex: 3, ModifyIndex: 6, }, - Weights: &structs.Weights{Passing: 1, Warning: 1}, }, }, } diff --git a/agent/consul/txn_endpoint.go b/agent/consul/txn_endpoint.go index b69113b4eb..b3f2299324 100644 --- a/agent/consul/txn_endpoint.go +++ b/agent/consul/txn_endpoint.go @@ -7,6 +7,7 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/api" ) // Txn endpoint is used to perform multi-object atomic transactions. @@ -37,6 +38,11 @@ func (t *Txn) preCheck(authorizer acl.Authorizer, ops structs.TxnOps) structs.Tx }) } case op.Node != nil: + // Skip the pre-apply checks if this is a GET. + if op.Node.Verb == api.NodeGet { + break + } + node := op.Node.Node if err := nodePreApply(node.Node, string(node.ID)); err != nil { errors = append(errors, &structs.TxnError{ @@ -54,6 +60,11 @@ func (t *Txn) preCheck(authorizer acl.Authorizer, ops structs.TxnOps) structs.Tx }) } case op.Service != nil: + // Skip the pre-apply checks if this is a GET. + if op.Service.Verb == api.ServiceGet { + break + } + service := &op.Service.Service if err := servicePreApply(service, nil); err != nil { errors = append(errors, &structs.TxnError{ @@ -71,6 +82,11 @@ func (t *Txn) preCheck(authorizer acl.Authorizer, ops structs.TxnOps) structs.Tx }) } case op.Check != nil: + // Skip the pre-apply checks if this is a GET. + if op.Check.Verb == api.CheckGet { + break + } + checkPreApply(&op.Check.Check) // Check that the token has permissions for the given operation. @@ -103,6 +119,21 @@ func (t *Txn) Apply(args *structs.TxnRequest, reply *structs.TxnResponse) error return nil } + str := "" + for _, op := range args.Ops { + switch { + case op.KV != nil: + str += fmt.Sprintf("%#v\n", op.KV) + case op.Node != nil: + str += fmt.Sprintf("%#v\n", op.Node) + case op.Service != nil: + str += fmt.Sprintf("%#v\n", op.Service) + case op.Check != nil: + str += fmt.Sprintf("%#v\n", op.Check) + } + } + //return fmt.Errorf("%s", str) + // Apply the update. resp, err := t.srv.raftApply(structs.TxnRequestType, args) if err != nil { diff --git a/agent/http.go b/agent/http.go index 73e9925a2c..b8a80f580c 100644 --- a/agent/http.go +++ b/agent/http.go @@ -446,7 +446,18 @@ func decodeBody(req *http.Request, out interface{}, cb func(interface{}) error) return err } } - return mapstructure.Decode(raw, out) + + decodeConf := &mapstructure.DecoderConfig{ + DecodeHook: mapstructure.StringToTimeDurationHookFunc(), + Result: &out, + } + + decoder, err := mapstructure.NewDecoder(decodeConf) + if err != nil { + return err + } + + return decoder.Decode(raw) } // setTranslateAddr is used to set the address translation header. This is only diff --git a/agent/structs/structs.go b/agent/structs/structs.go index 371edcb40c..a699c9e4a3 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -2,6 +2,7 @@ package structs import ( "bytes" + "encoding/json" "fmt" "math/rand" "reflect" @@ -893,14 +894,61 @@ type HealthCheck struct { } type HealthCheckDefinition struct { - HTTP string `json:",omitempty"` - TLSSkipVerify bool `json:",omitempty"` - Header map[string][]string `json:",omitempty"` - Method string `json:",omitempty"` - TCP string `json:",omitempty"` - Interval api.ReadableDuration `json:",omitempty"` - Timeout api.ReadableDuration `json:",omitempty"` - DeregisterCriticalServiceAfter api.ReadableDuration `json:",omitempty"` + HTTP string `json:",omitempty"` + TLSSkipVerify bool `json:",omitempty"` + Header map[string][]string `json:",omitempty"` + Method string `json:",omitempty"` + TCP string `json:",omitempty"` + Interval time.Duration `json:",omitempty"` + Timeout time.Duration `json:",omitempty"` + DeregisterCriticalServiceAfter time.Duration `json:",omitempty"` +} + +func (d *HealthCheckDefinition) MarshalJSON() ([]byte, error) { + type Alias HealthCheckDefinition + return json.Marshal(&struct { + Interval string + Timeout string + DeregisterCriticalServiceAfter string + *Alias + }{ + Interval: d.Interval.String(), + Timeout: d.Timeout.String(), + DeregisterCriticalServiceAfter: d.DeregisterCriticalServiceAfter.String(), + Alias: (*Alias)(d), + }) +} + +func (d *HealthCheckDefinition) UnmarshalJSON(data []byte) error { + type Alias HealthCheckDefinition + aux := &struct { + Interval string + Timeout string + DeregisterCriticalServiceAfter string + *Alias + }{ + Alias: (*Alias)(d), + } + if err := json.Unmarshal(data, &aux); err != nil { + return err + } + var err error + if aux.Interval != "" { + if d.Interval, err = time.ParseDuration(aux.Interval); err != nil { + return err + } + } + if aux.Timeout != "" { + if d.Timeout, err = time.ParseDuration(aux.Timeout); err != nil { + return err + } + } + if aux.DeregisterCriticalServiceAfter != "" { + if d.DeregisterCriticalServiceAfter, err = time.ParseDuration(aux.DeregisterCriticalServiceAfter); err != nil { + return err + } + } + return nil } // IsSame checks if one HealthCheck is the same as another, without looking diff --git a/agent/txn_endpoint.go b/agent/txn_endpoint.go index 4870b0327a..ff7b043d94 100644 --- a/agent/txn_endpoint.go +++ b/agent/txn_endpoint.go @@ -8,6 +8,7 @@ import ( "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/types" ) const ( @@ -48,9 +49,9 @@ func decodeValue(rawKV interface{}) error { return nil } -// fixupKVOp looks for non-nil KV operations and passes them on for +// fixupTxnOp looks for non-nil Txn operations and passes them on for // value conversion. -func fixupKVOp(rawOp interface{}) error { +func fixupTxnOp(rawOp interface{}) error { rawMap, ok := rawOp.(map[string]interface{}) if !ok { return fmt.Errorf("unexpected raw op type: %T", rawOp) @@ -67,15 +68,15 @@ func fixupKVOp(rawOp interface{}) error { return nil } -// fixupKVOps takes the raw decoded JSON and base64 decodes values in KV ops, +// fixupTxnOps takes the raw decoded JSON and base64 decodes values in Txn ops, // replacing them with byte arrays. -func fixupKVOps(raw interface{}) error { +func fixupTxnOps(raw interface{}) error { rawSlice, ok := raw.([]interface{}) if !ok { return fmt.Errorf("unexpected raw type: %t", raw) } for _, rawOp := range rawSlice { - if err := fixupKVOp(rawOp); err != nil { + if err := fixupTxnOp(rawOp); err != nil { return err } } @@ -100,7 +101,7 @@ func (s *HTTPServer) convertOps(resp http.ResponseWriter, req *http.Request) (st // decode it, we will return a 400 since we don't have enough context to // associate the error with a given operation. var ops api.TxnOps - if err := decodeBody(req, &ops, fixupKVOps); err != nil { + if err := decodeBody(req, &ops, fixupTxnOps); err != nil { resp.WriteHeader(http.StatusBadRequest) fmt.Fprintf(resp, "Failed to parse body: %v", err) return nil, 0, false @@ -123,7 +124,8 @@ func (s *HTTPServer) convertOps(resp http.ResponseWriter, req *http.Request) (st var writes int var netKVSize int for _, in := range ops { - if in.KV != nil { + switch { + case in.KV != nil: size := len(in.KV.Value) if size > maxKVSize { resp.WriteHeader(http.StatusRequestEntityTooLarge) @@ -152,6 +154,102 @@ func (s *HTTPServer) convertOps(resp http.ResponseWriter, req *http.Request) (st }, } opsRPC = append(opsRPC, out) + + case in.Node != nil: + if in.Node.Verb != api.NodeGet { + writes++ + } + + // Setup the default DC if not provided + if in.Node.Node.Datacenter == "" { + in.Node.Node.Datacenter = s.agent.config.Datacenter + } + + node := in.Node.Node + out := &structs.TxnOp{ + Node: &structs.TxnNodeOp{ + Verb: in.Node.Verb, + Node: structs.Node{ + ID: types.NodeID(node.ID), + Node: node.Node, + Address: node.Address, + Datacenter: node.Datacenter, + TaggedAddresses: node.TaggedAddresses, + Meta: node.Meta, + RaftIndex: structs.RaftIndex{ + ModifyIndex: node.ModifyIndex, + }, + }, + }, + } + opsRPC = append(opsRPC, out) + + case in.Service != nil: + if in.Service.Verb != api.ServiceGet { + writes++ + } + + svc := in.Service.Service + out := &structs.TxnOp{ + Service: &structs.TxnServiceOp{ + Verb: in.Service.Verb, + Node: in.Service.Node, + Service: structs.NodeService{ + ID: svc.ServiceID, + Service: svc.ServiceName, + Tags: svc.ServiceTags, + Address: svc.ServiceAddress, + Meta: svc.ServiceMeta, + Port: svc.ServicePort, + Weights: &structs.Weights{ + Passing: svc.ServiceWeights.Passing, + Warning: svc.ServiceWeights.Warning, + }, + EnableTagOverride: svc.ServiceEnableTagOverride, + RaftIndex: structs.RaftIndex{ + ModifyIndex: svc.ModifyIndex, + }, + }, + }, + } + opsRPC = append(opsRPC, out) + + case in.Check != nil: + if in.Check.Verb != api.CheckGet { + writes++ + } + + check := in.Check.Check + out := &structs.TxnOp{ + Check: &structs.TxnCheckOp{ + Verb: in.Check.Verb, + Check: structs.HealthCheck{ + Node: check.Node, + CheckID: types.CheckID(check.CheckID), + Name: check.Name, + Status: check.Status, + Notes: check.Notes, + Output: check.Output, + ServiceID: check.ServiceID, + ServiceName: check.ServiceName, + ServiceTags: check.ServiceTags, + Definition: structs.HealthCheckDefinition{ + HTTP: check.Definition.HTTP, + TLSSkipVerify: check.Definition.TLSSkipVerify, + Header: check.Definition.Header, + Method: check.Definition.Method, + TCP: check.Definition.TCP, + Interval: check.Definition.Interval, + Timeout: check.Definition.Timeout, + DeregisterCriticalServiceAfter: check.Definition.DeregisterCriticalServiceAfter, + }, + RaftIndex: structs.RaftIndex{ + ModifyIndex: check.ModifyIndex, + }, + }, + }, + } + opsRPC = append(opsRPC, out) } } @@ -180,6 +278,7 @@ func (s *HTTPServer) Txn(resp http.ResponseWriter, req *http.Request) (interface // Fast-path a transaction with only writes to the read-only endpoint, // which bypasses Raft, and allows for staleness. + s.agent.logger.Printf("ops: %d", len(ops)) conflict := false var ret interface{} if writes == 0 { @@ -209,6 +308,7 @@ func (s *HTTPServer) Txn(resp http.ResponseWriter, req *http.Request) (interface return nil, err } ret, conflict = reply, len(reply.Errors) > 0 + s.agent.logger.Printf("results: %d, errors: %d", len(reply.Results), len(reply.Errors)) } // If there was a conflict return the response object but set a special diff --git a/api/health.go b/api/health.go index eae6a01a86..b3f6b41cf8 100644 --- a/api/health.go +++ b/api/health.go @@ -1,8 +1,10 @@ package api import ( + "encoding/json" "fmt" "strings" + "time" ) const ( @@ -36,6 +38,9 @@ type HealthCheck struct { ServiceTags []string Definition HealthCheckDefinition + + CreateIndex uint64 + ModifyIndex uint64 } // HealthCheckDefinition is used to store the details about @@ -46,9 +51,56 @@ type HealthCheckDefinition struct { Method string TLSSkipVerify bool TCP string - Interval ReadableDuration - Timeout ReadableDuration - DeregisterCriticalServiceAfter ReadableDuration + Interval time.Duration + Timeout time.Duration + DeregisterCriticalServiceAfter time.Duration +} + +func (d *HealthCheckDefinition) MarshalJSON() ([]byte, error) { + type Alias HealthCheckDefinition + return json.Marshal(&struct { + Interval string + Timeout string + DeregisterCriticalServiceAfter string + *Alias + }{ + Interval: d.Interval.String(), + Timeout: d.Timeout.String(), + DeregisterCriticalServiceAfter: d.DeregisterCriticalServiceAfter.String(), + Alias: (*Alias)(d), + }) +} + +func (d *HealthCheckDefinition) UnmarshalJSON(data []byte) error { + type Alias HealthCheckDefinition + aux := &struct { + Interval string + Timeout string + DeregisterCriticalServiceAfter string + *Alias + }{ + Alias: (*Alias)(d), + } + if err := json.Unmarshal(data, &aux); err != nil { + return err + } + var err error + if aux.Interval != "" { + if d.Interval, err = time.ParseDuration(aux.Interval); err != nil { + return err + } + } + if aux.Timeout != "" { + if d.Timeout, err = time.ParseDuration(aux.Timeout); err != nil { + return err + } + } + if aux.DeregisterCriticalServiceAfter != "" { + if d.DeregisterCriticalServiceAfter, err = time.ParseDuration(aux.DeregisterCriticalServiceAfter); err != nil { + return err + } + } + return nil } // HealthChecks is a collection of HealthCheck structs. diff --git a/api/health_test.go b/api/health_test.go index 7a9147350e..9b25b0f3c6 100644 --- a/api/health_test.go +++ b/api/health_test.go @@ -213,9 +213,9 @@ func TestAPI_HealthChecks(t *testing.T) { if meta.LastIndex == 0 { r.Fatalf("bad: %v", meta) } - if got, want := out, checks; !verify.Values(t, "checks", got, want) { - r.Fatal("health.Checks failed") - } + checks[0].CreateIndex = out[0].CreateIndex + checks[0].ModifyIndex = out[0].ModifyIndex + verify.Values(r, "checks", out, checks) }) } diff --git a/api/kv.go b/api/kv.go index cb79f18728..bd45a067c9 100644 --- a/api/kv.go +++ b/api/kv.go @@ -265,7 +265,7 @@ func (k *KV) deleteInternal(key string, params map[string]string, q *WriteOption // The Txn function has been deprecated from the KV object; please see the Txn // object for more information about Transactions. func (k *KV) Txn(txn KVTxnOps, q *QueryOptions) (bool, *KVTxnResponse, *QueryMeta, error) { - ops := make(TxnOps, len(txn)) + var ops TxnOps for _, op := range txn { ops = append(ops, &TxnOp{KV: op}) } diff --git a/api/kv_test.go b/api/kv_test.go index 1ca6cd794a..f19cfc1a78 100644 --- a/api/kv_test.go +++ b/api/kv_test.go @@ -456,7 +456,7 @@ func TestAPI_ClientAcquireRelease(t *testing.T) { } } -func TestAPI_ClientTxn(t *testing.T) { +func TestAPI_KVClientTxn(t *testing.T) { t.Parallel() c, s := makeClient(t) defer s.Stop() diff --git a/api/txn.go b/api/txn.go index 59864045e6..02ae66252f 100644 --- a/api/txn.go +++ b/api/txn.go @@ -20,8 +20,10 @@ func (c *Client) Txn() *Txn { // TxnOp is the internal format we send to Consul. Currently only K/V and // check operations are supported. type TxnOp struct { - KV *KVTxnOp - Check *CheckTxnOp + KV *KVTxnOp + Node *NodeTxnOp + Service *ServiceTxnOp + Check *CheckTxnOp } // TxnOps is a list of transaction operations. @@ -29,8 +31,10 @@ type TxnOps []*TxnOp // TxnResult is the internal format we receive from Consul. type TxnResult struct { - KV *KVPair - Check *HealthCheck + KV *KVPair + Node *Node + Service *CatalogService + Check *HealthCheck } // TxnResults is a list of TxnResult objects. @@ -100,6 +104,12 @@ const ( NodeDeleteCAS NodeOp = "delete-cas" ) +// NodeTxnOp defines a single operation inside a transaction. +type NodeTxnOp struct { + Verb NodeOp + Node Node +} + // ServiceOp constants give possible operations available in a transaction. type ServiceOp string @@ -111,6 +121,13 @@ const ( ServiceDeleteCAS ServiceOp = "delete-cas" ) +// ServiceTxnOp defines a single operation inside a transaction. +type ServiceTxnOp struct { + Verb ServiceOp + Node string + Service CatalogService +} + // CheckOp constants give possible operations available in a transaction. type CheckOp string @@ -185,17 +202,7 @@ func (c *Client) txn(txn TxnOps, q *QueryOptions) (bool, *TxnResponse, *QueryMet r := c.newRequest("PUT", "/v1/txn") r.setQueryOptions(q) - // Convert into the internal txn format. - ops := make(TxnOps, 0, len(txn)) - for _, kvOp := range txn { - switch { - case kvOp.KV != nil: - ops = append(ops, &TxnOp{KV: kvOp.KV}) - case kvOp.Check != nil: - ops = append(ops, &TxnOp{Check: kvOp.Check}) - } - } - r.obj = ops + r.obj = txn rtt, resp, err := c.doRequest(r) if err != nil { return false, nil, nil, err diff --git a/api/txn_test.go b/api/txn_test.go new file mode 100644 index 0000000000..74a9cfdf9a --- /dev/null +++ b/api/txn_test.go @@ -0,0 +1,247 @@ +package api + +import ( + "strings" + "testing" + "time" + + "github.com/hashicorp/go-uuid" + + "github.com/pascaldekloe/goe/verify" + "github.com/stretchr/testify/require" +) + +func TestAPI_ClientTxn(t *testing.T) { + t.Parallel() + require := require.New(t) + c, s := makeClient(t) + defer s.Stop() + + session := c.Session() + txn := c.Txn() + + // Set up a test service and health check. + nodeID, err := uuid.GenerateUUID() + require.NoError(err) + + catalog := c.Catalog() + reg := &CatalogRegistration{ + ID: nodeID, + Node: "foo", + Address: "2.2.2.2", + Service: &AgentService{ + ID: "foo1", + Service: "foo", + }, + Check: &AgentCheck{ + CheckID: "bar", + Status: "critical", + Definition: HealthCheckDefinition{ + TCP: "1.1.1.1", + Interval: 5 * time.Second, + }, + }, + } + _, err = catalog.Register(reg, nil) + require.NoError(err) + + node, _, err := catalog.Node("foo", nil) + require.NoError(err) + require.Equal(nodeID, node.Node.ID) + + // Make a session. + id, _, err := session.CreateNoChecks(nil, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + defer session.Destroy(id, nil) + + // Acquire and get the key via a transaction, but don't supply a valid + // session. + key := testKey() + value := []byte("test") + ops := TxnOps{ + &TxnOp{ + KV: &KVTxnOp{ + Verb: KVLock, + Key: key, + Value: value, + }, + }, + &TxnOp{ + KV: &KVTxnOp{ + Verb: KVGet, + Key: key, + }, + }, + &TxnOp{ + Node: &NodeTxnOp{ + Verb: NodeGet, + Node: Node{Node: "foo"}, + }, + }, + &TxnOp{ + Service: &ServiceTxnOp{ + Verb: ServiceGet, + Node: "foo", + Service: CatalogService{ServiceID: "foo1"}, + }, + }, + &TxnOp{ + Check: &CheckTxnOp{ + Verb: CheckGet, + Check: HealthCheck{Node: "foo", CheckID: "bar"}, + }, + }, + } + ok, ret, _, err := txn.Txn(ops, nil) + if err != nil { + t.Fatalf("err: %v", err) + } else if ok { + t.Fatalf("transaction should have failed") + } + + if ret == nil || len(ret.Errors) != 2 || len(ret.Results) != 0 { + t.Fatalf("bad: %v", ret.Errors[2]) + } + if ret.Errors[0].OpIndex != 0 || + !strings.Contains(ret.Errors[0].What, "missing session") || + !strings.Contains(ret.Errors[1].What, "doesn't exist") { + t.Fatalf("bad: %v", ret.Errors[0]) + } + + // Now poke in a real session and try again. + ops[0].KV.Session = id + ok, ret, _, err = txn.Txn(ops, nil) + if err != nil { + t.Fatalf("err: %v", err) + } else if !ok { + t.Fatalf("transaction failure") + } + + if ret == nil || len(ret.Errors) != 0 || len(ret.Results) != 5 { + t.Fatalf("bad: %v", ret) + } + expected := TxnResults{ + &TxnResult{ + KV: &KVPair{ + Key: key, + Session: id, + LockIndex: 1, + CreateIndex: ret.Results[0].KV.CreateIndex, + ModifyIndex: ret.Results[0].KV.ModifyIndex, + }, + }, + &TxnResult{ + KV: &KVPair{ + Key: key, + Session: id, + Value: []byte("test"), + LockIndex: 1, + CreateIndex: ret.Results[1].KV.CreateIndex, + ModifyIndex: ret.Results[1].KV.ModifyIndex, + }, + }, + &TxnResult{ + Node: &Node{ + ID: nodeID, + Node: "foo", + Address: "2.2.2.2", + Datacenter: "dc1", + CreateIndex: ret.Results[2].Node.CreateIndex, + ModifyIndex: ret.Results[2].Node.CreateIndex, + }, + }, + &TxnResult{ + Service: &CatalogService{ + ID: "foo1", + CreateIndex: ret.Results[3].Service.CreateIndex, + ModifyIndex: ret.Results[3].Service.CreateIndex, + }, + }, + &TxnResult{ + Check: &HealthCheck{ + Node: "foo", + CheckID: "bar", + Status: "critical", + Definition: HealthCheckDefinition{ + TCP: "1.1.1.1", + Interval: 5 * time.Second, + }, + CreateIndex: ret.Results[4].Check.CreateIndex, + ModifyIndex: ret.Results[4].Check.CreateIndex, + }, + }, + } + verify.Values(t, "", ret.Results, expected) + + // Run a read-only transaction. + ops = TxnOps{ + &TxnOp{ + KV: &KVTxnOp{ + Verb: KVGet, + Key: key, + }, + }, + &TxnOp{ + Node: &NodeTxnOp{ + Verb: NodeGet, + Node: Node{ID: s.Config.NodeID, Node: s.Config.NodeName}, + }, + }, + } + ok, ret, _, err = txn.Txn(ops, nil) + if err != nil { + t.Fatalf("err: %v", err) + } else if !ok { + t.Fatalf("transaction failure") + } + + expected = TxnResults{ + &TxnResult{ + KV: &KVPair{ + Key: key, + Session: id, + Value: []byte("test"), + LockIndex: 1, + CreateIndex: ret.Results[0].KV.CreateIndex, + ModifyIndex: ret.Results[0].KV.ModifyIndex, + }, + }, + &TxnResult{ + Node: &Node{ + ID: s.Config.NodeID, + Node: s.Config.NodeName, + Address: "127.0.0.1", + Datacenter: "dc1", + TaggedAddresses: map[string]string{ + "lan": s.Config.Bind, + "wan": s.Config.Bind, + }, + Meta: map[string]string{"consul-network-segment": ""}, + CreateIndex: ret.Results[1].Node.CreateIndex, + ModifyIndex: ret.Results[1].Node.ModifyIndex, + }, + }, + } + verify.Values(t, "", ret.Results, expected) + + // Sanity check using the regular GET API. + kv := c.KV() + pair, meta, err := kv.Get(key, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if pair == nil { + t.Fatalf("expected value: %#v", pair) + } + if pair.LockIndex != 1 { + t.Fatalf("Expected lock: %v", pair) + } + if pair.Session != id { + t.Fatalf("Expected lock: %v", pair) + } + if meta.LastIndex == 0 { + t.Fatalf("unexpected value: %#v", meta) + } +}