mirror of https://github.com/hashicorp/consul
consul/state: node registration and retrieval works
parent
8181be18f2
commit
82039191a1
|
@ -5,19 +5,26 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"log"
|
"log"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/consul/structs"
|
||||||
"github.com/hashicorp/go-memdb"
|
"github.com/hashicorp/go-memdb"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// StateStore is where we store all of Consul's state, including
|
||||||
|
// records of node registrations, services, checks, key/value
|
||||||
|
// pairs and more. The DB is entirely in-memory and is constructed
|
||||||
|
// from the Raft log through the FSM.
|
||||||
type StateStore struct {
|
type StateStore struct {
|
||||||
logger *log.Logger
|
logger *log.Logger
|
||||||
db *memdb.MemDB
|
db *memdb.MemDB
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IndexEntry keeps a record of the last index per-table.
|
||||||
type IndexEntry struct {
|
type IndexEntry struct {
|
||||||
Key string
|
Key string
|
||||||
Value uint64
|
Value uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewStateStore creates a new in-memory state storage layer.
|
||||||
func NewStateStore(logOutput io.Writer) (*StateStore, error) {
|
func NewStateStore(logOutput io.Writer) (*StateStore, error) {
|
||||||
// Create the in-memory DB
|
// Create the in-memory DB
|
||||||
db, err := memdb.NewMemDB(stateStoreSchema())
|
db, err := memdb.NewMemDB(stateStoreSchema())
|
||||||
|
@ -32,3 +39,62 @@ func NewStateStore(logOutput io.Writer) (*StateStore, error) {
|
||||||
}
|
}
|
||||||
return s, nil
|
return s, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// EnsureNode is used to upsert node registration or modification.
|
||||||
|
func (s *StateStore) EnsureNode(idx uint64, node *structs.Node) error {
|
||||||
|
tx := s.db.Txn(true)
|
||||||
|
defer tx.Abort()
|
||||||
|
|
||||||
|
// Call the node upsert
|
||||||
|
if err := s.ensureNodeTxn(idx, node, tx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
tx.Commit()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ensureNodeTxn is the inner function called to actually create a node
|
||||||
|
// registration or modify an existing one in the state store. It allows
|
||||||
|
// passing in a memdb transaction so it may be part of a larger txn.
|
||||||
|
func (s *StateStore) ensureNodeTxn(idx uint64, node *structs.Node, tx *memdb.Txn) error {
|
||||||
|
// Check for an existing node
|
||||||
|
existing, err := tx.First("nodes", "id", node.Node)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("node lookup failed: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the indexes
|
||||||
|
if existing != nil {
|
||||||
|
node.CreateIndex = existing.(*structs.Node).CreateIndex
|
||||||
|
node.ModifyIndex = idx
|
||||||
|
} else {
|
||||||
|
node.CreateIndex = idx
|
||||||
|
node.ModifyIndex = idx
|
||||||
|
}
|
||||||
|
|
||||||
|
// Insert the node and update the index
|
||||||
|
if err := tx.Insert("nodes", node); err != nil {
|
||||||
|
return fmt.Errorf("failed inserting node: %s", err)
|
||||||
|
}
|
||||||
|
if err := tx.Insert("index", &IndexEntry{"nodes", idx}); err != nil {
|
||||||
|
return fmt.Errorf("failed updating index: %s", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetNode is used to retrieve a node registration by node ID.
|
||||||
|
func (s *StateStore) GetNode(id string) (*structs.Node, error) {
|
||||||
|
tx := s.db.Txn(true)
|
||||||
|
defer tx.Abort()
|
||||||
|
|
||||||
|
// Retrieve the node from the state store
|
||||||
|
node, err := tx.First("nodes", "id", id)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("node lookup failed: %s", err)
|
||||||
|
}
|
||||||
|
if node != nil {
|
||||||
|
return node.(*structs.Node), nil
|
||||||
|
}
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,67 @@
|
||||||
|
package state
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/consul/structs"
|
||||||
|
)
|
||||||
|
|
||||||
|
func testStateStore(t *testing.T) *StateStore {
|
||||||
|
s, err := NewStateStore(os.Stderr)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
if s == nil {
|
||||||
|
t.Fatalf("missing state store")
|
||||||
|
}
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStateStore_EnsureNode(t *testing.T) {
|
||||||
|
s := testStateStore(t)
|
||||||
|
|
||||||
|
// Create a node registration request
|
||||||
|
in := &structs.Node{
|
||||||
|
Node: "node1",
|
||||||
|
Address: "1.1.1.1",
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure the node is registered in the db
|
||||||
|
if err := s.EnsureNode(1, in); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Retrieve the node again
|
||||||
|
out, err := s.GetNode("node1")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Correct node was returned
|
||||||
|
if out.Node != "node1" || out.Address != "1.1.1.1" {
|
||||||
|
t.Fatalf("bad node returned: %#v", out)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Indexes are set properly
|
||||||
|
if out.CreateIndex != 1 || out.ModifyIndex != 1 {
|
||||||
|
t.Fatalf("bad node index: %#v", out)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update the node registration
|
||||||
|
in.Address = "1.1.1.2"
|
||||||
|
if err := s.EnsureNode(2, in); err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Retrieve the node
|
||||||
|
out, err = s.GetNode("node1")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Node and indexes were updated
|
||||||
|
if out.CreateIndex != 1 || out.ModifyIndex != 2 || out.Address != "1.1.1.2" {
|
||||||
|
t.Fatalf("bad: %#v", out)
|
||||||
|
}
|
||||||
|
}
|
|
@ -17,6 +17,13 @@ var (
|
||||||
|
|
||||||
type MessageType uint8
|
type MessageType uint8
|
||||||
|
|
||||||
|
// Index is used to track the index used while creating
|
||||||
|
// or modifying a given struct type.
|
||||||
|
type Index struct {
|
||||||
|
CreateIndex uint64
|
||||||
|
ModifyIndex uint64
|
||||||
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
RegisterRequestType MessageType = iota
|
RegisterRequestType MessageType = iota
|
||||||
DeregisterRequestType
|
DeregisterRequestType
|
||||||
|
@ -224,6 +231,8 @@ func (r *ChecksInStateRequest) RequestDatacenter() string {
|
||||||
type Node struct {
|
type Node struct {
|
||||||
Node string
|
Node string
|
||||||
Address string
|
Address string
|
||||||
|
|
||||||
|
Index
|
||||||
}
|
}
|
||||||
type Nodes []Node
|
type Nodes []Node
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue