diff --git a/consul/state_store.go b/consul/state_store.go index 04c99c48f8..762b3c7ade 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -5,7 +5,9 @@ import ( "fmt" "github.com/hashicorp/consul/rpc" _ "github.com/mattn/go-sqlite3" + "log" "sync/atomic" + "time" ) // nextDBIndex is used to generate a new ID @@ -25,6 +27,7 @@ const ( queryServices queryServiceNodes queryServiceTagNodes + queryAllServices ) // The StateStore is responsible for maintaining all the Consul @@ -76,6 +79,7 @@ func (s *StateStore) initialize() error { pragmas := []string{ "pragma journal_mode=memory;", "pragma foreign_keys=ON;", + "pragma read_uncommitted=true;", } for _, p := range pragmas { if _, err := s.db.Exec(p); err != nil { @@ -108,6 +112,7 @@ func (s *StateStore) initialize() error { queryServices: "SELECT DISTINCT service, tag FROM services", queryServiceNodes: "SELECT n.name, n.address, s.tag, s.port from nodes n, services s WHERE s.service=? AND s.node=n.name", queryServiceTagNodes: "SELECT n.name, n.address, s.tag, s.port from nodes n, services s WHERE s.service=? AND s.tag=? AND s.node=n.name", + queryAllServices: "SELECT * FROM services", } for name, query := range queries { stmt, err := s.db.Prepare(query) @@ -281,3 +286,59 @@ func parseServiceNodes(rows *sql.Rows, err error) rpc.ServiceNodes { } return nodes } + +// Snapshot is used to create a point in time snapshot +func (s *StateStore) Snapshot() (*StateStore, error) { + defer func(start time.Time) { + log.Printf("[INFO] StateStore Snapshot created in %v", time.Now().Sub(start)) + }(time.Now()) + + // Create a new state store + state, err := NewStateStore() + if err != nil { + return nil, err + } + + // Start a Tx on the new DB + tx, err := state.db.Begin() + if err != nil { + state.Close() + return nil, err + } + + // Create the new statements we need + ensureNode := tx.Stmt(state.prepared[queryEnsureNode]) + ensureService := tx.Stmt(state.prepared[queryEnsureService]) + + // Copy all the nodes + nodes := s.Nodes() + for i := 0; i < len(nodes); i += 2 { + if _, err := ensureNode.Exec(nodes[i], nodes[i+1]); err != nil { + state.Close() + return nil, err + } + } + + // Copy all the services + var node, service, tag string + var port int + rows, err := s.prepared[queryAllServices].Query() + for rows.Next() { + if err := rows.Scan(&node, &service, &tag, &port); err != nil { + state.Close() + return nil, err + } + if _, err := ensureService.Exec(node, service, tag, port); err != nil { + state.Close() + return nil, err + } + } + + // Commit the Txn + if err := tx.Commit(); err != nil { + state.Close() + return nil, err + } + + return state, nil +} diff --git a/consul/state_store_test.go b/consul/state_store_test.go index 4352751f13..ecdc19df58 100644 --- a/consul/state_store_test.go +++ b/consul/state_store_test.go @@ -295,3 +295,79 @@ func TestServiceTagNodes(t *testing.T) { t.Fatalf("bad: %v", nodes) } } + +func TestStoreSnapshot(t *testing.T) { + store, err := NewStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + if err := store.EnsureNode("foo", "127.0.0.1"); err != nil { + t.Fatalf("err: %v") + } + + if err := store.EnsureNode("bar", "127.0.0.2"); err != nil { + t.Fatalf("err: %v") + } + + if err := store.EnsureService("foo", "db", "master", 8000); err != nil { + t.Fatalf("err: %v") + } + + if err := store.EnsureService("bar", "db", "slave", 8000); err != nil { + t.Fatalf("err: %v") + } + + // Take a snapshot + snap, err := store.Snapshot() + if err != nil { + t.Fatalf("err: %v") + } + defer snap.Close() + + // Check snapshot has old values + nodes := snap.Nodes() + if len(nodes) != 4 { + t.Fatalf("bad: %v", nodes) + } + + // Ensure we get the service entries + services := snap.NodeServices("foo") + if services["db"].Tag != "master" { + t.Fatalf("bad: %v", services) + } + + services = snap.NodeServices("bar") + if services["db"].Tag != "slave" { + t.Fatalf("bad: %v", services) + } + + // Make some changes! + if err := store.EnsureService("foo", "db", "slave", 8000); err != nil { + t.Fatalf("err: %v", err) + } + if err := store.EnsureService("bar", "db", "master", 8000); err != nil { + t.Fatalf("err: %v", err) + } + if err := store.EnsureNode("baz", "127.0.0.3"); err != nil { + t.Fatalf("err: %v", err) + } + + // Check snapshot has old values + nodes = snap.Nodes() + if len(nodes) != 4 { + t.Fatalf("bad: %v", nodes) + } + + // Ensure old service entries + services = snap.NodeServices("foo") + if services["db"].Tag != "master" { + t.Fatalf("bad: %v", services) + } + + services = snap.NodeServices("bar") + if services["db"].Tag != "slave" { + t.Fatalf("bad: %v", services) + } +}