More HTTP endpoints

pull/19/head
Armon Dadgar 2013-12-23 16:20:51 -08:00
parent 9e7feea454
commit 39d9e3e78f
10 changed files with 168 additions and 15 deletions

View File

@ -44,6 +44,14 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) {
logOutput = os.Stderr logOutput = os.Stderr
} }
// Validate the config
if config.Datacenter == "" {
return nil, fmt.Errorf("Must configure a Datacenter")
}
if config.DataDir == "" {
return nil, fmt.Errorf("Must configure a DataDir")
}
agent := &Agent{ agent := &Agent{
config: config, config: config,
logger: log.New(logOutput, "", log.LstdFlags), logger: log.New(logOutput, "", log.LstdFlags),

View File

@ -16,6 +16,7 @@ func nextConfig() *Config {
idx := atomic.AddUint64(&offset, 1) idx := atomic.AddUint64(&offset, 1)
conf := DefaultConfig() conf := DefaultConfig()
conf.Datacenter = "dc1"
conf.HTTPAddr = fmt.Sprintf("127.0.0.1:%d", 8500+10*idx) conf.HTTPAddr = fmt.Sprintf("127.0.0.1:%d", 8500+10*idx)
conf.RPCAddr = fmt.Sprintf("127.0.0.1:%d", 8400+10*idx) conf.RPCAddr = fmt.Sprintf("127.0.0.1:%d", 8400+10*idx)
conf.SerfBindAddr = "127.0.0.1" conf.SerfBindAddr = "127.0.0.1"

View File

@ -1,13 +1,63 @@
package agent package agent
import ( import (
"fmt"
"github.com/hashicorp/consul/consul/structs"
"net/http" "net/http"
) )
func (s *HTTPServer) CatalogDatacenters(req *http.Request) (interface{}, error) { /*
* /v1/catalog/register : Registers a new service
* /v1/catalog/deregister : Deregisters a service or node
* /v1/catalog/datacenters : Lists known datacenters
* /v1/catalog/nodes : Lists nodes in a given DC
* /v1/catalog/services : Lists services in a given DC
* /v1/catalog/service/<service>/ : Lists the nodes in a given service
* /v1/catalog/node/<node>/ : Lists the services provided by a node
*/
func (s *HTTPServer) CatalogRegister(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var args structs.RegisterRequest
if err := decodeBody(req, &args); err != nil {
resp.WriteHeader(400)
resp.Write([]byte(fmt.Sprintf("Request decode failed: %v", err)))
return nil, nil
}
// Setup the default DC if not provided
if args.Datacenter == "" {
args.Datacenter = s.agent.config.Datacenter
}
s.logger.Printf("[DEBUG] ARGS: %#v %v %#v", args, args.Datacenter == "", s.agent.config)
// Forward to the servers
var out struct{}
if err := s.agent.RPC("Catalog.Register", &args, &out); err != nil {
return nil, err
}
return true, nil
}
func (s *HTTPServer) CatalogDatacenters(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var out []string var out []string
if err := s.agent.RPC("Catalog.ListDatacenters", struct{}{}, &out); err != nil { if err := s.agent.RPC("Catalog.ListDatacenters", struct{}{}, &out); err != nil {
return nil, err return nil, err
} }
return out, nil return out, nil
} }
func (s *HTTPServer) CatalogNodes(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
// Set default DC
dc := s.agent.config.Datacenter
// Check for other DC
if other := req.URL.Query().Get("dc"); other != "" {
dc = other
}
var out structs.Nodes
if err := s.agent.RPC("Catalog.ListNodes", dc, &out); err != nil {
return nil, err
}
return out, nil
}

View File

@ -1,17 +1,54 @@
package agent package agent
import ( import (
"github.com/hashicorp/consul/consul/structs"
"net/http"
"os" "os"
"testing" "testing"
"time"
) )
func TestCatalogRegister(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
// Wait for a leader
time.Sleep(100 * time.Millisecond)
// Register node
req, err := http.NewRequest("GET", "/v1/catalog/register", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
args := &structs.RegisterRequest{
Node: "foo",
Address: "127.0.0.1",
}
req.Body = encodeReq(args)
obj, err := srv.CatalogRegister(nil, req)
if err != nil {
t.Fatalf("err: %v", err)
}
res := obj.(bool)
if res != true {
t.Fatalf("bad: %v", res)
}
}
func TestCatalogDatacenters(t *testing.T) { func TestCatalogDatacenters(t *testing.T) {
dir, srv := makeHTTPServer(t) dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown() defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
obj, err := srv.CatalogDatacenters(nil) // Wait for initialization
time.Sleep(10 * time.Millisecond)
obj, err := srv.CatalogDatacenters(nil, nil)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -21,3 +58,39 @@ func TestCatalogDatacenters(t *testing.T) {
t.Fatalf("bad: %v", obj) t.Fatalf("bad: %v", obj)
} }
} }
func TestCatalogNodes(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
// Wait for a leader
time.Sleep(100 * time.Millisecond)
// Register node
args := &structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
}
var out struct{}
if err := srv.agent.RPC("Catalog.Register", args, &out); err != nil {
t.Fatalf("err: %v", err)
}
req, err := http.NewRequest("GET", "/v1/catalog/nodes?dc=dc1", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
obj, err := srv.CatalogNodes(nil, req)
if err != nil {
t.Fatalf("err: %v", err)
}
nodes := obj.(structs.Nodes)
if len(nodes) != 1 {
t.Fatalf("bad: %v", obj)
}
}

View File

@ -56,17 +56,18 @@ func (s *HTTPServer) registerHandlers() {
s.mux.HandleFunc("/v1/status/peers", s.wrap(s.StatusPeers)) s.mux.HandleFunc("/v1/status/peers", s.wrap(s.StatusPeers))
s.mux.HandleFunc("/v1/catalog/datacenters", s.wrap(s.CatalogDatacenters)) s.mux.HandleFunc("/v1/catalog/datacenters", s.wrap(s.CatalogDatacenters))
s.mux.HandleFunc("/v1/catalog/nodes", s.wrap(s.CatalogNodes))
} }
// wrap is used to wrap functions to make them more convenient // wrap is used to wrap functions to make them more convenient
func (s *HTTPServer) wrap(handler func(req *http.Request) (interface{}, error)) func(resp http.ResponseWriter, req *http.Request) { func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Request) (interface{}, error)) func(resp http.ResponseWriter, req *http.Request) {
f := func(resp http.ResponseWriter, req *http.Request) { f := func(resp http.ResponseWriter, req *http.Request) {
// Invoke the handler // Invoke the handler
start := time.Now() start := time.Now()
defer func() { defer func() {
s.logger.Printf("[DEBUG] HTTP Request %v (%v)", req.URL, time.Now().Sub(start)) s.logger.Printf("[DEBUG] HTTP Request %v (%v)", req.URL, time.Now().Sub(start))
}() }()
obj, err := handler(req) obj, err := handler(resp, req)
// Check for an error // Check for an error
HAS_ERR: HAS_ERR:
@ -78,6 +79,7 @@ func (s *HTTPServer) wrap(handler func(req *http.Request) (interface{}, error))
} }
// Write out the JSON object // Write out the JSON object
if obj != nil {
var buf bytes.Buffer var buf bytes.Buffer
enc := json.NewEncoder(&buf) enc := json.NewEncoder(&buf)
if err = enc.Encode(obj); err != nil { if err = enc.Encode(obj); err != nil {
@ -85,5 +87,12 @@ func (s *HTTPServer) wrap(handler func(req *http.Request) (interface{}, error))
} }
resp.Write(buf.Bytes()) resp.Write(buf.Bytes())
} }
}
return f return f
} }
// decodeBody is used to decode a JSON request body
func decodeBody(req *http.Request, out interface{}) error {
dec := json.NewDecoder(req.Body)
return dec.Decode(out)
}

View File

@ -1,6 +1,10 @@
package agent package agent
import ( import (
"bytes"
"encoding/json"
"io"
"io/ioutil"
"testing" "testing"
) )
@ -13,3 +17,10 @@ func makeHTTPServer(t *testing.T) (string, *HTTPServer) {
} }
return dir, server return dir, server
} }
func encodeReq(obj interface{}) io.ReadCloser {
buf := bytes.NewBuffer(nil)
enc := json.NewEncoder(buf)
enc.Encode(obj)
return ioutil.NopCloser(buf)
}

View File

@ -4,7 +4,7 @@ import (
"net/http" "net/http"
) )
func (s *HTTPServer) StatusLeader(req *http.Request) (interface{}, error) { func (s *HTTPServer) StatusLeader(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var out string var out string
if err := s.agent.RPC("Status.Leader", struct{}{}, &out); err != nil { if err := s.agent.RPC("Status.Leader", struct{}{}, &out); err != nil {
return nil, err return nil, err
@ -12,7 +12,7 @@ func (s *HTTPServer) StatusLeader(req *http.Request) (interface{}, error) {
return out, nil return out, nil
} }
func (s *HTTPServer) StatusPeers(req *http.Request) (interface{}, error) { func (s *HTTPServer) StatusPeers(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var out []string var out []string
if err := s.agent.RPC("Status.Peers", struct{}{}, &out); err != nil { if err := s.agent.RPC("Status.Peers", struct{}{}, &out); err != nil {
return nil, err return nil, err

View File

@ -15,7 +15,7 @@ func TestStatusLeader(t *testing.T) {
// Wait for a leader // Wait for a leader
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
obj, err := srv.StatusLeader(nil) obj, err := srv.StatusLeader(nil, nil)
if err != nil { if err != nil {
t.Fatalf("Err: %v", err) t.Fatalf("Err: %v", err)
} }
@ -31,7 +31,7 @@ func TestStatusPeers(t *testing.T) {
defer srv.Shutdown() defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
obj, err := srv.StatusPeers(nil) obj, err := srv.StatusPeers(nil, nil)
if err != nil { if err != nil {
t.Fatalf("Err: %v", err) t.Fatalf("Err: %v", err)
} }

View File

@ -117,6 +117,7 @@ func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{
servers := s.remoteConsuls[dc] servers := s.remoteConsuls[dc]
if len(servers) == 0 { if len(servers) == 0 {
s.remoteLock.RUnlock() s.remoteLock.RUnlock()
s.logger.Printf("[WARN] consul: RPC request for DC '%s', no path found", dc)
return structs.ErrNoDCPath return structs.ErrNoDCPath
} }

View File

@ -259,7 +259,7 @@ func (s *Server) Shutdown() error {
s.raftLayer.Close() s.raftLayer.Close()
future := s.raft.Shutdown() future := s.raft.Shutdown()
if err := future.Error(); err != nil { if err := future.Error(); err != nil {
s.logger.Printf("[WARN] Error shutting down raft: %s", err) s.logger.Printf("[WARN] consul: Error shutting down raft: %s", err)
} }
s.raftStore.Close() s.raftStore.Close()
} }
@ -324,7 +324,7 @@ func (s *Server) Leave() error {
s.logger.Printf("[ERR] Failed to leave Raft cluster: %v", err) s.logger.Printf("[ERR] Failed to leave Raft cluster: %v", err)
} }
case <-time.After(3 * time.Second): case <-time.After(3 * time.Second):
s.logger.Printf("[ERR] Timedout leaving Raft cluster") s.logger.Printf("[ERR] Timed out leaving Raft cluster")
} }
} }
AFTER_LEAVE: AFTER_LEAVE: