From 39d9e3e78fa05f7304c15be642faac27b27eb582 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 23 Dec 2013 16:20:51 -0800 Subject: [PATCH] More HTTP endpoints --- command/agent/agent.go | 8 +++ command/agent/agent_test.go | 1 + command/agent/catalog_endpoint.go | 52 +++++++++++++++++- command/agent/catalog_endpoint_test.go | 75 +++++++++++++++++++++++++- command/agent/http.go | 23 +++++--- command/agent/http_test.go | 11 ++++ command/agent/status_endpoint.go | 4 +- command/agent/status_endpoint_test.go | 4 +- consul/rpc.go | 1 + consul/server.go | 4 +- 10 files changed, 168 insertions(+), 15 deletions(-) diff --git a/command/agent/agent.go b/command/agent/agent.go index 4a64ff077d..32133d9fc2 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -44,6 +44,14 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) { logOutput = os.Stderr } + // Validate the config + if config.Datacenter == "" { + return nil, fmt.Errorf("Must configure a Datacenter") + } + if config.DataDir == "" { + return nil, fmt.Errorf("Must configure a DataDir") + } + agent := &Agent{ config: config, logger: log.New(logOutput, "", log.LstdFlags), diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index 09fc234342..6b21a62fb0 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -16,6 +16,7 @@ func nextConfig() *Config { idx := atomic.AddUint64(&offset, 1) conf := DefaultConfig() + conf.Datacenter = "dc1" conf.HTTPAddr = fmt.Sprintf("127.0.0.1:%d", 8500+10*idx) conf.RPCAddr = fmt.Sprintf("127.0.0.1:%d", 8400+10*idx) conf.SerfBindAddr = "127.0.0.1" diff --git a/command/agent/catalog_endpoint.go b/command/agent/catalog_endpoint.go index e234caf794..23593e45be 100644 --- a/command/agent/catalog_endpoint.go +++ b/command/agent/catalog_endpoint.go @@ -1,13 +1,63 @@ package agent import ( + "fmt" + "github.com/hashicorp/consul/consul/structs" "net/http" ) -func (s *HTTPServer) CatalogDatacenters(req *http.Request) (interface{}, error) { +/* +* /v1/catalog/register : Registers a new service +* /v1/catalog/deregister : Deregisters a service or node +* /v1/catalog/datacenters : Lists known datacenters +* /v1/catalog/nodes : Lists nodes in a given DC +* /v1/catalog/services : Lists services in a given DC +* /v1/catalog/service// : Lists the nodes in a given service +* /v1/catalog/node// : Lists the services provided by a node + */ + +func (s *HTTPServer) CatalogRegister(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + var args structs.RegisterRequest + if err := decodeBody(req, &args); err != nil { + resp.WriteHeader(400) + resp.Write([]byte(fmt.Sprintf("Request decode failed: %v", err))) + return nil, nil + } + + // Setup the default DC if not provided + if args.Datacenter == "" { + args.Datacenter = s.agent.config.Datacenter + } + s.logger.Printf("[DEBUG] ARGS: %#v %v %#v", args, args.Datacenter == "", s.agent.config) + + // Forward to the servers + var out struct{} + if err := s.agent.RPC("Catalog.Register", &args, &out); err != nil { + return nil, err + } + return true, nil +} + +func (s *HTTPServer) CatalogDatacenters(resp http.ResponseWriter, req *http.Request) (interface{}, error) { var out []string if err := s.agent.RPC("Catalog.ListDatacenters", struct{}{}, &out); err != nil { return nil, err } return out, nil } + +func (s *HTTPServer) CatalogNodes(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + // Set default DC + dc := s.agent.config.Datacenter + + // Check for other DC + if other := req.URL.Query().Get("dc"); other != "" { + dc = other + } + + var out structs.Nodes + if err := s.agent.RPC("Catalog.ListNodes", dc, &out); err != nil { + return nil, err + } + return out, nil +} diff --git a/command/agent/catalog_endpoint_test.go b/command/agent/catalog_endpoint_test.go index a5a4eda944..1649383e3e 100644 --- a/command/agent/catalog_endpoint_test.go +++ b/command/agent/catalog_endpoint_test.go @@ -1,17 +1,54 @@ package agent import ( + "github.com/hashicorp/consul/consul/structs" + "net/http" "os" "testing" + "time" ) +func TestCatalogRegister(t *testing.T) { + dir, srv := makeHTTPServer(t) + defer os.RemoveAll(dir) + defer srv.Shutdown() + defer srv.agent.Shutdown() + + // Wait for a leader + time.Sleep(100 * time.Millisecond) + + // Register node + req, err := http.NewRequest("GET", "/v1/catalog/register", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + args := &structs.RegisterRequest{ + Node: "foo", + Address: "127.0.0.1", + } + req.Body = encodeReq(args) + + obj, err := srv.CatalogRegister(nil, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + res := obj.(bool) + if res != true { + t.Fatalf("bad: %v", res) + } +} + func TestCatalogDatacenters(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) defer srv.Shutdown() defer srv.agent.Shutdown() - obj, err := srv.CatalogDatacenters(nil) + // Wait for initialization + time.Sleep(10 * time.Millisecond) + + obj, err := srv.CatalogDatacenters(nil, nil) if err != nil { t.Fatalf("err: %v", err) } @@ -21,3 +58,39 @@ func TestCatalogDatacenters(t *testing.T) { t.Fatalf("bad: %v", obj) } } + +func TestCatalogNodes(t *testing.T) { + dir, srv := makeHTTPServer(t) + defer os.RemoveAll(dir) + defer srv.Shutdown() + defer srv.agent.Shutdown() + + // Wait for a leader + time.Sleep(100 * time.Millisecond) + + // Register node + args := &structs.RegisterRequest{ + Datacenter: "dc1", + Node: "foo", + Address: "127.0.0.1", + } + var out struct{} + if err := srv.agent.RPC("Catalog.Register", args, &out); err != nil { + t.Fatalf("err: %v", err) + } + + req, err := http.NewRequest("GET", "/v1/catalog/nodes?dc=dc1", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + obj, err := srv.CatalogNodes(nil, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + nodes := obj.(structs.Nodes) + if len(nodes) != 1 { + t.Fatalf("bad: %v", obj) + } +} diff --git a/command/agent/http.go b/command/agent/http.go index e0ea2b6c0d..dd71d813f3 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -56,17 +56,18 @@ func (s *HTTPServer) registerHandlers() { s.mux.HandleFunc("/v1/status/peers", s.wrap(s.StatusPeers)) s.mux.HandleFunc("/v1/catalog/datacenters", s.wrap(s.CatalogDatacenters)) + s.mux.HandleFunc("/v1/catalog/nodes", s.wrap(s.CatalogNodes)) } // wrap is used to wrap functions to make them more convenient -func (s *HTTPServer) wrap(handler func(req *http.Request) (interface{}, error)) func(resp http.ResponseWriter, req *http.Request) { +func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Request) (interface{}, error)) func(resp http.ResponseWriter, req *http.Request) { f := func(resp http.ResponseWriter, req *http.Request) { // Invoke the handler start := time.Now() defer func() { s.logger.Printf("[DEBUG] HTTP Request %v (%v)", req.URL, time.Now().Sub(start)) }() - obj, err := handler(req) + obj, err := handler(resp, req) // Check for an error HAS_ERR: @@ -78,12 +79,20 @@ func (s *HTTPServer) wrap(handler func(req *http.Request) (interface{}, error)) } // Write out the JSON object - var buf bytes.Buffer - enc := json.NewEncoder(&buf) - if err = enc.Encode(obj); err != nil { - goto HAS_ERR + if obj != nil { + var buf bytes.Buffer + enc := json.NewEncoder(&buf) + if err = enc.Encode(obj); err != nil { + goto HAS_ERR + } + resp.Write(buf.Bytes()) } - resp.Write(buf.Bytes()) } return f } + +// decodeBody is used to decode a JSON request body +func decodeBody(req *http.Request, out interface{}) error { + dec := json.NewDecoder(req.Body) + return dec.Decode(out) +} diff --git a/command/agent/http_test.go b/command/agent/http_test.go index 0cf13f92c0..1b8b09556f 100644 --- a/command/agent/http_test.go +++ b/command/agent/http_test.go @@ -1,6 +1,10 @@ package agent import ( + "bytes" + "encoding/json" + "io" + "io/ioutil" "testing" ) @@ -13,3 +17,10 @@ func makeHTTPServer(t *testing.T) (string, *HTTPServer) { } return dir, server } + +func encodeReq(obj interface{}) io.ReadCloser { + buf := bytes.NewBuffer(nil) + enc := json.NewEncoder(buf) + enc.Encode(obj) + return ioutil.NopCloser(buf) +} diff --git a/command/agent/status_endpoint.go b/command/agent/status_endpoint.go index 6a98eeb869..75275800fd 100644 --- a/command/agent/status_endpoint.go +++ b/command/agent/status_endpoint.go @@ -4,7 +4,7 @@ import ( "net/http" ) -func (s *HTTPServer) StatusLeader(req *http.Request) (interface{}, error) { +func (s *HTTPServer) StatusLeader(resp http.ResponseWriter, req *http.Request) (interface{}, error) { var out string if err := s.agent.RPC("Status.Leader", struct{}{}, &out); err != nil { return nil, err @@ -12,7 +12,7 @@ func (s *HTTPServer) StatusLeader(req *http.Request) (interface{}, error) { return out, nil } -func (s *HTTPServer) StatusPeers(req *http.Request) (interface{}, error) { +func (s *HTTPServer) StatusPeers(resp http.ResponseWriter, req *http.Request) (interface{}, error) { var out []string if err := s.agent.RPC("Status.Peers", struct{}{}, &out); err != nil { return nil, err diff --git a/command/agent/status_endpoint_test.go b/command/agent/status_endpoint_test.go index aaf0e4559d..835b80bd9f 100644 --- a/command/agent/status_endpoint_test.go +++ b/command/agent/status_endpoint_test.go @@ -15,7 +15,7 @@ func TestStatusLeader(t *testing.T) { // Wait for a leader time.Sleep(100 * time.Millisecond) - obj, err := srv.StatusLeader(nil) + obj, err := srv.StatusLeader(nil, nil) if err != nil { t.Fatalf("Err: %v", err) } @@ -31,7 +31,7 @@ func TestStatusPeers(t *testing.T) { defer srv.Shutdown() defer srv.agent.Shutdown() - obj, err := srv.StatusPeers(nil) + obj, err := srv.StatusPeers(nil, nil) if err != nil { t.Fatalf("Err: %v", err) } diff --git a/consul/rpc.go b/consul/rpc.go index acfbd5db0a..bd2b88d65b 100644 --- a/consul/rpc.go +++ b/consul/rpc.go @@ -117,6 +117,7 @@ func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{ servers := s.remoteConsuls[dc] if len(servers) == 0 { s.remoteLock.RUnlock() + s.logger.Printf("[WARN] consul: RPC request for DC '%s', no path found", dc) return structs.ErrNoDCPath } diff --git a/consul/server.go b/consul/server.go index d05c0e529f..4473a05f22 100644 --- a/consul/server.go +++ b/consul/server.go @@ -259,7 +259,7 @@ func (s *Server) Shutdown() error { s.raftLayer.Close() future := s.raft.Shutdown() if err := future.Error(); err != nil { - s.logger.Printf("[WARN] Error shutting down raft: %s", err) + s.logger.Printf("[WARN] consul: Error shutting down raft: %s", err) } s.raftStore.Close() } @@ -324,7 +324,7 @@ func (s *Server) Leave() error { s.logger.Printf("[ERR] Failed to leave Raft cluster: %v", err) } case <-time.After(3 * time.Second): - s.logger.Printf("[ERR] Timedout leaving Raft cluster") + s.logger.Printf("[ERR] Timed out leaving Raft cluster") } } AFTER_LEAVE: