agent/consul/state: initial work on intentions memdb table

pull/4275/head
Mitchell Hashimoto 2018-02-27 22:25:05 -08:00
parent 97f87b2357
commit cc8a6f7f15
No known key found for this signature in database
GPG Key ID: 744E147AA52F5B0A
4 changed files with 324 additions and 0 deletions

View File

@ -0,0 +1,136 @@
package state
import (
"fmt"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-memdb"
)
const (
intentionsTableName = "connect-intentions"
)
// intentionsTableSchema returns a new table schema used for storing
// intentions for Connect.
func intentionsTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: intentionsTableName,
Indexes: map[string]*memdb.IndexSchema{
"id": &memdb.IndexSchema{
Name: "id",
AllowMissing: false,
Unique: true,
Indexer: &memdb.UUIDFieldIndex{
Field: "ID",
},
},
"destination": &memdb.IndexSchema{
Name: "destination",
AllowMissing: true,
Unique: true,
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "DestinationNS",
Lowercase: true,
},
&memdb.StringFieldIndex{
Field: "DestinationName",
Lowercase: true,
},
},
},
},
"source": &memdb.IndexSchema{
Name: "source",
AllowMissing: true,
Unique: true,
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "SourceNS",
Lowercase: true,
},
&memdb.StringFieldIndex{
Field: "SourceName",
Lowercase: true,
},
},
},
},
},
}
}
func init() {
registerSchema(intentionsTableSchema)
}
// IntentionSet creates or updates an intention.
func (s *Store) IntentionSet(idx uint64, ixn *structs.Intention) error {
tx := s.db.Txn(true)
defer tx.Abort()
if err := s.intentionSetTxn(tx, idx, ixn); err != nil {
return err
}
tx.Commit()
return nil
}
// intentionSetTxn is the inner method used to insert an intention with
// the proper indexes into the state store.
func (s *Store) intentionSetTxn(tx *memdb.Txn, idx uint64, ixn *structs.Intention) error {
// ID is required
if ixn.ID == "" {
return ErrMissingIntentionID
}
// Check for an existing intention
existing, err := tx.First(intentionsTableName, "id", ixn.ID)
if err != nil {
return fmt.Errorf("failed intention looup: %s", err)
}
if existing != nil {
ixn.CreateIndex = existing.(*structs.Intention).CreateIndex
} else {
ixn.CreateIndex = idx
}
ixn.ModifyIndex = idx
// Insert
if err := tx.Insert(intentionsTableName, ixn); err != nil {
return err
}
if err := tx.Insert("index", &IndexEntry{intentionsTableName, idx}); err != nil {
return fmt.Errorf("failed updating index: %s", err)
}
return nil
}
// IntentionGet returns the given intention by ID.
func (s *Store) IntentionGet(ws memdb.WatchSet, id string) (uint64, *structs.Intention, error) {
tx := s.db.Txn(false)
defer tx.Abort()
// Get the table index.
idx := maxIndexTxn(tx, intentionsTableName)
// Look up by its ID.
watchCh, intention, err := tx.FirstWatch(intentionsTableName, "id", id)
if err != nil {
return 0, nil, fmt.Errorf("failed intention lookup: %s", err)
}
ws.Add(watchCh)
// Convert the interface{} if it is non-nil
var result *structs.Intention
if intention != nil {
result = intention.(*structs.Intention)
}
return idx, result, nil
}

View File

@ -0,0 +1,122 @@
package state
import (
"reflect"
"testing"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-memdb"
)
func TestStore_IntentionGet_none(t *testing.T) {
s := testStateStore(t)
// Querying with no results returns nil.
ws := memdb.NewWatchSet()
idx, res, err := s.IntentionGet(ws, testUUID())
if idx != 0 || res != nil || err != nil {
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err)
}
}
func TestStore_IntentionSetGet_basic(t *testing.T) {
s := testStateStore(t)
// Call Get to populate the watch set
ws := memdb.NewWatchSet()
_, _, err := s.IntentionGet(ws, testUUID())
if err != nil {
t.Fatalf("err: %s", err)
}
// Build a valid intention
ixn := &structs.Intention{
ID: testUUID(),
}
// Inserting a with empty ID is disallowed.
if err := s.IntentionSet(1, ixn); err != nil {
t.Fatalf("err: %s", err)
}
// Make sure the index got updated.
if idx := s.maxIndex(intentionsTableName); idx != 1 {
t.Fatalf("bad index: %d", idx)
}
if !watchFired(ws) {
t.Fatalf("bad")
}
// Read it back out and verify it.
expected := &structs.Intention{
ID: ixn.ID,
RaftIndex: structs.RaftIndex{
CreateIndex: 1,
ModifyIndex: 1,
},
}
ws = memdb.NewWatchSet()
idx, actual, err := s.IntentionGet(ws, ixn.ID)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != expected.CreateIndex {
t.Fatalf("bad index: %d", idx)
}
if !reflect.DeepEqual(actual, expected) {
t.Fatalf("bad: %v", actual)
}
// Change a value and test updating
ixn.SourceNS = "foo"
if err := s.IntentionSet(2, ixn); err != nil {
t.Fatalf("err: %s", err)
}
// Make sure the index got updated.
if idx := s.maxIndex(intentionsTableName); idx != 2 {
t.Fatalf("bad index: %d", idx)
}
if !watchFired(ws) {
t.Fatalf("bad")
}
// Read it back and verify the data was updated
expected.SourceNS = ixn.SourceNS
expected.ModifyIndex = 2
ws = memdb.NewWatchSet()
idx, actual, err = s.IntentionGet(ws, ixn.ID)
if err != nil {
t.Fatalf("err: %s", err)
}
if idx != expected.ModifyIndex {
t.Fatalf("bad index: %d", idx)
}
if !reflect.DeepEqual(actual, expected) {
t.Fatalf("bad: %#v", actual)
}
}
func TestStore_IntentionSet_emptyId(t *testing.T) {
s := testStateStore(t)
ws := memdb.NewWatchSet()
_, _, err := s.IntentionGet(ws, testUUID())
if err != nil {
t.Fatalf("err: %s", err)
}
// Inserting a with empty ID is disallowed.
if err := s.IntentionSet(1, &structs.Intention{}); err == nil {
t.Fatalf("expected %#v, got: %#v", ErrMissingIntentionID, err)
}
// Index is not updated if nothing is saved.
if idx := s.maxIndex(intentionsTableName); idx != 0 {
t.Fatalf("bad index: %d", idx)
}
if watchFired(ws) {
t.Fatalf("bad")
}
}

View File

@ -28,6 +28,10 @@ var (
// ErrMissingQueryID is returned when a Query set is called on
// a Query with an empty ID.
ErrMissingQueryID = errors.New("Missing Query ID")
// ErrMissingIntentionID is returned when an Intention set is called
// with an Intention with an empty ID.
ErrMissingIntentionID = errors.New("Missing Intention ID")
)
const (

View File

@ -0,0 +1,62 @@
package structs
import (
"time"
)
// Intention defines an intention for the Connect Service Graph. This defines
// the allowed or denied behavior of a connection between two services using
// Connect.
type Intention struct {
// ID is the UUID-based ID for the intention, always generated by Consul.
ID string
// SourceNS, SourceName are the namespace and name, respectively, of
// the source service. Either of these may be the wildcard "*", but only
// the full value can be a wildcard. Partial wildcards are not allowed.
// The source may also be a non-Consul service, as specified by SourceType.
//
// DestinationNS, DestinationName is the same, but for the destination
// service. The same rules apply. The destination is always a Consul
// service.
SourceNS, SourceName string
DestinationNS, DestinationName string
// SourceType is the type of the value for the source.
SourceType IntentionSourceType
// Action is whether this is a whitelist or blacklist intention.
Action IntentionAction
// DefaultAddr, DefaultPort of the local listening proxy (if any) to
// make this connection.
DefaultAddr string
DefaultPort int
// Meta is arbitrary metadata associated with the intention. This is
// opaque to Consul but is served in API responses.
Meta map[string]string
// CreatedAt and UpdatedAt keep track of when this record was created
// or modified.
CreatedAt, UpdatedAt time.Time
RaftIndex
}
// IntentionAction is the action that the intention represents. This
// can be "allow" or "deny" to whitelist or blacklist intentions.
type IntentionAction string
const (
IntentionActionAllow IntentionAction = "allow"
IntentionActionDeny IntentionAction = "deny"
)
// IntentionSourceType is the type of the source within an intention.
type IntentionSourceType string
const (
// IntentionSourceConsul is a service within the Consul catalog.
IntentionSourceConsul IntentionSourceType = "consul"
)