From 05d5eb08a8e45adc2ffd5a9e86bc78c43208fa4a Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Wed, 11 Dec 2013 14:27:27 -0800 Subject: [PATCH] Adding state store methods --- consul/state_store.go | 66 ++++++++++++++++++++++++++++++--- consul/state_store_test.go | 76 +++++++++++++++++++++++++++++++++++--- 2 files changed, 132 insertions(+), 10 deletions(-) diff --git a/consul/state_store.go b/consul/state_store.go index 41dedfb6f6..f7bbe02733 100644 --- a/consul/state_store.go +++ b/consul/state_store.go @@ -10,9 +10,19 @@ type namedQuery uint8 const ( queryEnsureNode namedQuery = iota - queryGetNodes + queryNode + queryNodes + queryEnsureService + queryNodeServices ) +// NoodeServices maps the Service name to a tag and port +type ServiceEntry struct { + Tag string + Port int +} +type NodeServices map[string]ServiceEntry + // The StateStore is responsible for maintaining all the Consul // state. It is manipulated by the FSM which maintains consistency // through the use of Raft. The goals of the StateStore are to provide @@ -78,8 +88,11 @@ func (s *StateStore) initialize() error { // Prepare the queries queries := map[namedQuery]string{ - queryEnsureNode: "INSERT OR REPLACE INTO nodes (name, address) VALUES (?, ?)", - queryGetNodes: "SELECT * FROM nodes", + queryEnsureNode: "INSERT OR REPLACE INTO nodes (name, address) VALUES (?, ?)", + queryNode: "SELECT address FROM nodes where name=?", + queryNodes: "SELECT * FROM nodes", + queryEnsureService: "INSERT OR REPLACE INTO services (node, service, tag, port) VALUES (?, ?, ?, ?)", + queryNodeServices: "SELECT service, tag, port from services where node=?", } for name, query := range queries { stmt, err := s.db.Prepare(query) @@ -111,10 +124,26 @@ func (s *StateStore) EnsureNode(name string, address string) error { return s.checkSet(stmt.Exec(name, address)) } +// GetNode returns all the address of the known and if it was found +func (s *StateStore) GetNode(name string) (bool, string) { + stmt := s.prepared[queryNode] + row := stmt.QueryRow(name) + + var addr string + if err := row.Scan(&addr); err != nil { + if err == sql.ErrNoRows { + return false, addr + } else { + panic(fmt.Errorf("Failed to get node: %v", err)) + } + } + return true, addr +} + // GetNodes returns all the known nodes, the slice alternates between // the node name and address -func (s *StateStore) GetNodes() []string { - stmt := s.prepared[queryGetNodes] +func (s *StateStore) Nodes() []string { + stmt := s.prepared[queryNodes] rows, err := stmt.Query() if err != nil { panic(fmt.Errorf("Failed to get nodes: %v", err)) @@ -130,3 +159,30 @@ func (s *StateStore) GetNodes() []string { } return data } + +// EnsureService is used to ensure a given node exposes a service +func (s *StateStore) EnsureService(name, service, tag string, port int) error { + stmt := s.prepared[queryEnsureService] + return s.checkSet(stmt.Exec(name, service, tag, port)) +} + +// NodeServices is used to return all the services of a given node +func (s *StateStore) NodeServices(name string) NodeServices { + stmt := s.prepared[queryNodeServices] + rows, err := stmt.Query(name) + if err != nil { + panic(fmt.Errorf("Failed to get node services: %v", err)) + } + + services := NodeServices(make(map[string]ServiceEntry)) + var service string + var entry ServiceEntry + for rows.Next() { + if err := rows.Scan(&service, &entry.Tag, &entry.Port); err != nil { + panic(fmt.Errorf("Failed to get node services: %v", err)) + } + services[service] = entry + } + + return services +} diff --git a/consul/state_store_test.go b/consul/state_store_test.go index ae633432a6..bbae9936ec 100644 --- a/consul/state_store_test.go +++ b/consul/state_store_test.go @@ -15,17 +15,83 @@ func TestEnsureNode(t *testing.T) { t.Fatalf("err: %v") } - nodes := store.GetNodes() - if nodes[0] != "foo" || nodes[1] != "127.0.0.1" { - t.Fatalf("Bad: %v", nodes) + found, addr := store.GetNode("foo") + if !found || addr != "127.0.0.1" { + t.Fatalf("Bad: %v %v", found, addr) } if err := store.EnsureNode("foo", "127.0.0.2"); err != nil { t.Fatalf("err: %v") } - nodes = store.GetNodes() - if nodes[0] != "foo" || nodes[1] != "127.0.0.2" { + found, addr = store.GetNode("foo") + if !found || addr != "127.0.0.2" { + t.Fatalf("Bad: %v %v", found, addr) + } +} + +func TestGetNodes(t *testing.T) { + store, err := NewStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + if err := store.EnsureNode("foo", "127.0.0.1"); err != nil { + t.Fatalf("err: %v") + } + + if err := store.EnsureNode("bar", "127.0.0.2"); err != nil { + t.Fatalf("err: %v") + } + + nodes := store.Nodes() + if len(nodes) != 4 { t.Fatalf("Bad: %v", nodes) } + if nodes[0] != "foo" && nodes[2] != "bar" { + t.Fatalf("Bad: %v", nodes) + } +} + +func TestEnsureService(t *testing.T) { + store, err := NewStateStore() + if err != nil { + t.Fatalf("err: %v", err) + } + defer store.Close() + + if err := store.EnsureNode("foo", "127.0.0.1"); err != nil { + t.Fatalf("err: %v") + } + + if err := store.EnsureService("foo", "api", "", 5000); err != nil { + t.Fatalf("err: %v") + } + + if err := store.EnsureService("foo", "api", "", 5001); err != nil { + t.Fatalf("err: %v") + } + + if err := store.EnsureService("foo", "db", "master", 8000); err != nil { + t.Fatalf("err: %v") + } + + services := store.NodeServices("foo") + + entry, ok := services["api"] + if !ok { + t.Fatalf("missing api: %#v", services) + } + if entry.Tag != "" || entry.Port != 5001 { + t.Fatalf("Bad entry: %#v", entry) + } + + entry, ok = services["db"] + if !ok { + t.Fatalf("missing db: %#v", services) + } + if entry.Tag != "master" || entry.Port != 8000 { + t.Fatalf("Bad entry: %#v", entry) + } }