agent/consul: Intention.Apply, FSM methods, very little validation

pull/4275/head
Mitchell Hashimoto 2018-02-28 10:28:07 -08:00
parent 212a272989
commit 9e307e178e
No known key found for this signature in database
GPG Key ID: 744E147AA52F5B0A
5 changed files with 140 additions and 0 deletions

View File

@ -20,6 +20,7 @@ func init() {
registerCommand(structs.PreparedQueryRequestType, (*FSM).applyPreparedQueryOperation)
registerCommand(structs.TxnRequestType, (*FSM).applyTxn)
registerCommand(structs.AutopilotRequestType, (*FSM).applyAutopilotUpdate)
registerCommand(structs.IntentionRequestType, (*FSM).applyIntentionOperation)
}
func (c *FSM) applyRegister(buf []byte, index uint64) interface{} {
@ -246,3 +247,26 @@ func (c *FSM) applyAutopilotUpdate(buf []byte, index uint64) interface{} {
}
return c.state.AutopilotSetConfig(index, &req.Config)
}
// applyIntentionOperation applies the given intention operation to the state store.
func (c *FSM) applyIntentionOperation(buf []byte, index uint64) interface{} {
var req structs.IntentionRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSinceWithLabels([]string{"consul", "fsm", "intention"}, time.Now(),
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
defer metrics.MeasureSinceWithLabels([]string{"fsm", "intention"}, time.Now(),
[]metrics.Label{{Name: "op", Value: string(req.Op)}})
switch req.Op {
case structs.IntentionOpCreate, structs.IntentionOpUpdate:
return c.state.IntentionSet(index, req.Intention)
case structs.IntentionOpDelete:
panic("TODO")
//return c.state.PreparedQueryDelete(index, req.Query.ID)
default:
c.logger.Printf("[WARN] consul.fsm: Invalid Intention operation '%s'", req.Op)
return fmt.Errorf("Invalid Intention operation '%s'", req.Op)
}
}

View File

@ -1,9 +1,13 @@
package consul
import (
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-uuid"
)
// Intention manages the Connect intentions.
@ -12,6 +16,56 @@ type Intention struct {
srv *Server
}
// Apply creates or updates an intention in the data store.
func (s *Intention) Apply(
args *structs.IntentionRequest,
reply *string) error {
if done, err := s.srv.forward("Intention.Apply", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"consul", "intention", "apply"}, time.Now())
defer metrics.MeasureSince([]string{"intention", "apply"}, time.Now())
// If no ID is provided, generate a new ID. This must be done prior to
// appending to the Raft log, because the ID is not deterministic. Once
// the entry is in the log, the state update MUST be deterministic or
// the followers will not converge.
if args.Op == structs.IntentionOpCreate && args.Intention.ID == "" {
state := s.srv.fsm.State()
for {
var err error
args.Intention.ID, err = uuid.GenerateUUID()
if err != nil {
s.srv.logger.Printf("[ERR] consul.intention: UUID generation failed: %v", err)
return err
}
_, ixn, err := state.IntentionGet(nil, args.Intention.ID)
if err != nil {
s.srv.logger.Printf("[ERR] consul.intention: intention lookup failed: %v", err)
return err
}
if ixn == nil {
break
}
}
}
*reply = args.Intention.ID
// Commit
resp, err := s.srv.raftApply(structs.IntentionRequestType, args)
if err != nil {
s.srv.logger.Printf("[ERR] consul.intention: Apply failed %v", err)
return err
}
if respErr, ok := resp.(error); ok {
return respErr
}
return nil
}
// List returns all the intentions.
func (s *Intention) List(
args *structs.DCSpecificRequest,
reply *structs.IndexedIntentions) error {

View File

@ -9,6 +9,37 @@ import (
"github.com/hashicorp/net-rpc-msgpackrpc"
)
func TestIntentionApply_new(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// Setup a basic record to create
ixn := structs.IntentionRequest{
Datacenter: "dc1",
Op: structs.IntentionOpCreate,
Intention: &structs.Intention{
SourceName: "test",
},
}
var reply string
// Create
if err := msgpackrpc.CallWithCodec(codec, "Intention.Apply", &ixn, &reply); err != nil {
t.Fatalf("err: %v", err)
}
if reply == "" {
t.Fatal("reply should be non-empty")
}
// TODO test read
}
func TestIntentionList(t *testing.T) {
t.Parallel()
dir1, s1 := testServer(t)

View File

@ -69,3 +69,33 @@ type IndexedIntentions struct {
Intentions Intentions
QueryMeta
}
// IntentionOp is the operation for a request related to intentions.
type IntentionOp string
const (
IntentionOpCreate IntentionOp = "create"
IntentionOpUpdate IntentionOp = "update"
IntentionOpDelete IntentionOp = "delete"
)
// IntentionRequest is used to create, update, and delete intentions.
type IntentionRequest struct {
// Datacenter is the target for this request.
Datacenter string
// Op is the type of operation being requested.
Op IntentionOp
// Intention is the intention.
Intention *Intention
// WriteRequest is a common struct containing ACL tokens and other
// write-related common elements for requests.
WriteRequest
}
// RequestDatacenter returns the datacenter for a given request.
func (q *IntentionRequest) RequestDatacenter() string {
return q.Datacenter
}

View File

@ -39,6 +39,7 @@ const (
AutopilotRequestType = 9
AreaRequestType = 10
ACLBootstrapRequestType = 11 // FSM snapshots only.
IntentionRequestType = 12
)
const (