agent: move http/dns endpoints into agent

Move the HTTP and DNS endpoints into the agent and control
their lifespan via the agent.

This removes the requirement to manage HTTP and DNS servers
indpendent of the agent since the agent is mostly useless
without an endpoint and the endpoints without the agent.
pull/3037/head
Frank Schroeder 2017-05-19 11:53:41 +02:00
parent 74be791f9b
commit 82650f73e3
No known key found for this signature in database
GPG Key ID: 4D65C6EAEC87DECD
17 changed files with 543 additions and 590 deletions

View File

@ -1,6 +1,7 @@
package agent
import (
"context"
"crypto/sha512"
"encoding/json"
"errors"
@ -9,6 +10,7 @@ import (
"io/ioutil"
"log"
"net"
"net/http"
"os"
"path/filepath"
"reflect"
@ -141,27 +143,62 @@ type Agent struct {
// agent methods use this, so use with care and never override
// outside of a unit test.
endpoints map[string]string
// dnsAddr is the address the DNS server binds to
dnsAddr net.Addr
// dnsServer provides the DNS API
dnsServers []*DNSServer
// httpAddrs are the addresses per protocol the HTTP server binds to
httpAddrs map[string][]net.Addr
// httpServers provides the HTTP API on various endpoints
httpServers []*HTTPServer
// wgServers is the wait group for all HTTP and DNS servers
wgServers sync.WaitGroup
}
// Create is used to create a new Agent. Returns
// the agent or potentially an error.
func Create(config *Config, logOutput io.Writer, logWriter *logger.LogWriter, reloadCh chan chan error) (*Agent, error) {
// Ensure we have a log sink
func Create(c *Config, logOutput io.Writer, logWriter *logger.LogWriter, reloadCh chan chan error) (*Agent, error) {
a, err := NewAgent(c, logOutput, logWriter, reloadCh)
if err != nil {
return nil, err
}
if err := a.Start(); err != nil {
return nil, err
}
return a, nil
}
func NewAgent(c *Config, logOutput io.Writer, logWriter *logger.LogWriter, reloadCh chan chan error) (*Agent, error) {
if logOutput == nil {
logOutput = os.Stderr
}
// Validate the config
if config.Datacenter == "" {
if c.Datacenter == "" {
return nil, fmt.Errorf("Must configure a Datacenter")
}
if config.DataDir == "" && !config.DevMode {
if c.DataDir == "" && !c.DevMode {
return nil, fmt.Errorf("Must configure a DataDir")
}
dnsAddr, err := c.ClientListener(c.Addresses.DNS, c.Ports.DNS)
if err != nil {
return nil, fmt.Errorf("Invalid DNS bind address: %s", err)
}
httpAddrs, err := c.HTTPAddrs()
if err != nil {
return nil, fmt.Errorf("Invalid HTTP bind address: %s", err)
}
acls, err := newACLManager(c)
if err != nil {
return nil, err
}
agent := &Agent{
config: config,
logger: log.New(logOutput, "", log.LstdFlags),
a := &Agent{
config: c,
acls: acls,
logOutput: logOutput,
logWriter: logWriter,
checkReapAfter: make(map[types.CheckID]time.Duration),
@ -175,79 +212,200 @@ func Create(config *Config, logOutput io.Writer, logWriter *logger.LogWriter, re
reloadCh: reloadCh,
shutdownCh: make(chan struct{}),
endpoints: make(map[string]string),
dnsAddr: dnsAddr,
httpAddrs: httpAddrs,
}
if err := agent.resolveTmplAddrs(); err != nil {
if err := a.resolveTmplAddrs(); err != nil {
return nil, err
}
return a, nil
}
// Initialize the ACL manager.
acls, err := newACLManager(config)
if err != nil {
return nil, err
}
agent.acls = acls
func (a *Agent) Start() error {
c := a.config
a.logger = log.New(a.logOutput, "", log.LstdFlags)
// Retrieve or generate the node ID before setting up the rest of the
// agent, which depends on it.
if err := agent.setupNodeID(config); err != nil {
return nil, fmt.Errorf("Failed to setup node ID: %v", err)
if err := a.setupNodeID(c); err != nil {
return fmt.Errorf("Failed to setup node ID: %v", err)
}
// Initialize the local state.
agent.state.Init(config, agent.logger)
a.state.Init(c, a.logger)
// Setup either the client or the server.
if config.Server {
err = agent.setupServer()
agent.state.SetIface(agent.delegate)
if c.Server {
server, err := a.makeServer()
if err != nil {
return err
}
a.delegate = server
a.state.SetIface(server)
// Automatically register the "consul" service on server nodes
consulService := structs.NodeService{
Service: consul.ConsulServiceName,
ID: consul.ConsulServiceID,
Port: agent.config.Ports.Server,
Port: c.Ports.Server,
Tags: []string{},
}
agent.state.AddService(&consulService, agent.config.GetTokenForAgent())
a.state.AddService(&consulService, c.GetTokenForAgent())
} else {
err = agent.setupClient()
agent.state.SetIface(agent.delegate)
}
client, err := a.makeClient()
if err != nil {
return nil, err
return err
}
a.delegate = client
a.state.SetIface(client)
}
// Load checks/services/metadata.
if err := agent.loadServices(config); err != nil {
return nil, err
if err := a.loadServices(c); err != nil {
return err
}
if err := agent.loadChecks(config); err != nil {
return nil, err
if err := a.loadChecks(c); err != nil {
return err
}
if err := agent.loadMetadata(config); err != nil {
return nil, err
if err := a.loadMetadata(c); err != nil {
return err
}
// Start watching for critical services to deregister, based on their
// checks.
go agent.reapServices()
go a.reapServices()
// Start handling events.
go agent.handleEvents()
go a.handleEvents()
// Start sending network coordinate to the server.
if !config.DisableCoordinates {
go agent.sendCoordinate()
if !c.DisableCoordinates {
go a.sendCoordinate()
}
// Write out the PID file if necessary.
err = agent.storePid()
if err != nil {
return nil, err
if err := a.storePid(); err != nil {
return err
}
return agent, nil
// start dns server
if c.Ports.DNS > 0 {
srv, err := NewDNSServer(a, &c.DNSConfig, a.logOutput, c.Domain, a.dnsAddr.String(), c.DNSRecursors)
if err != nil {
return fmt.Errorf("error starting DNS server: %s", err)
}
a.dnsServers = []*DNSServer{srv}
}
// start HTTP servers
return a.startHTTP(a.httpAddrs)
}
func (a *Agent) startHTTP(httpAddrs map[string][]net.Addr) error {
// ln contains the list of pending listeners until the
// actual server is created and the listeners are used.
var ln []net.Listener
// cleanup the listeners on error. ln should be empty on success.
defer func() {
for _, l := range ln {
l.Close()
}
}()
// bind to the listeners for all addresses and protocols
// before we start the servers so that we can fail early
// if we can't bind to one of the addresses.
for proto, addrs := range httpAddrs {
for _, addr := range addrs {
switch addr.(type) {
case *net.UnixAddr:
switch proto {
case "http":
if _, err := os.Stat(addr.String()); !os.IsNotExist(err) {
a.logger.Printf("[WARN] agent: Replacing socket %q", addr.String())
}
l, err := ListenUnix(addr.String(), a.config.UnixSockets)
if err != nil {
return err
}
ln = append(ln, l)
default:
return fmt.Errorf("invalid protocol: %q", proto)
}
case *net.TCPAddr:
switch proto {
case "http":
l, err := ListenTCP(addr.String())
if err != nil {
return err
}
ln = append(ln, l)
case "https":
tlscfg, err := a.config.IncomingTLSConfig()
if err != nil {
return fmt.Errorf("invalid TLS configuration: %s", err)
}
l, err := ListenTLS(addr.String(), tlscfg)
if err != nil {
return err
}
ln = append(ln, l)
default:
return fmt.Errorf("invalid protocol: %q", proto)
}
default:
return fmt.Errorf("invalid address type: %T", addr)
}
}
}
// https://github.com/golang/go/issues/20239
//
// In go1.8.1 there is a race between Serve and Shutdown. If
// Shutdown is called before the Serve go routine was scheduled then
// the Serve go routine never returns. This deadlocks the agent
// shutdown for some tests since it will wait forever.
//
// We solve this with another WaitGroup which checks that the Serve
// go routine was called and after that it should be safe to call
// Shutdown on that server.
var up sync.WaitGroup
for _, l := range ln {
l := l // capture loop var
// create a server per listener instead of a single
// server with multiple listeners to take advantage
// of the Addr field for logging. Since the server
// does not keep state and they all share the same
// agent there is no overhead.
addr := l.Addr().String()
srv := NewHTTPServer(addr, a)
a.httpServers = append(a.httpServers, srv)
up.Add(1)
a.wgServers.Add(1)
go func() {
defer a.wgServers.Done()
up.Done()
a.logger.Printf("[INFO] agent: Starting HTTP server on %s", addr)
if err := srv.Serve(l); err != nil && err != http.ErrServerClosed {
a.logger.Print(err)
}
}()
}
up.Wait()
ln = nil
return nil
}
// consulConfig is used to return a consul configuration
@ -612,38 +770,36 @@ func (a *Agent) resolveTmplAddrs() error {
return nil
}
// setupServer is used to initialize the Consul server
func (a *Agent) setupServer() error {
// makeServer creates a new consul server.
func (a *Agent) makeServer() (*consul.Server, error) {
config, err := a.consulConfig()
if err != nil {
return err
return nil, err
}
if err := a.setupKeyrings(config); err != nil {
return fmt.Errorf("Failed to configure keyring: %v", err)
return nil, fmt.Errorf("Failed to configure keyring: %v", err)
}
server, err := consul.NewServer(config)
if err != nil {
return fmt.Errorf("Failed to start Consul server: %v", err)
return nil, fmt.Errorf("Failed to start Consul server: %v", err)
}
a.delegate = server
return nil
return server, nil
}
// setupClient is used to initialize the Consul client
func (a *Agent) setupClient() error {
// makeClient creates a new consul client.
func (a *Agent) makeClient() (*consul.Client, error) {
config, err := a.consulConfig()
if err != nil {
return err
return nil, err
}
if err := a.setupKeyrings(config); err != nil {
return fmt.Errorf("Failed to configure keyring: %v", err)
return nil, fmt.Errorf("Failed to configure keyring: %v", err)
}
client, err := consul.NewClient(config)
if err != nil {
return fmt.Errorf("Failed to start Consul client: %v", err)
return nil, fmt.Errorf("Failed to start Consul client: %v", err)
}
a.delegate = client
return nil
return client, nil
}
// makeRandomID will generate a random UUID for a node.
@ -830,6 +986,27 @@ func (a *Agent) Shutdown() error {
if a.shutdown {
return nil
}
a.logger.Println("[INFO] agent: Requesting shutdown")
// Stop all API endpoints
a.logger.Println("[INFO] agent: Stopping DNS endpoints")
for _, srv := range a.dnsServers {
srv.Shutdown()
}
for _, srv := range a.httpServers {
a.logger.Println("[INFO] agent: Stopping HTTP endpoint", srv.Addr)
// old behavior: just die
// srv.Close()
// graceful shutdown
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
srv.Shutdown(ctx)
}
a.logger.Println("[INFO] agent: Waiting for endpoints to shut down")
a.wgServers.Wait()
a.logger.Print("[INFO] agent: Endpoints down")
// Stop all the checks
a.checkLock.Lock()
@ -849,8 +1026,8 @@ func (a *Agent) Shutdown() error {
chk.Stop()
}
a.logger.Println("[INFO] agent: requesting shutdown")
err := a.delegate.Shutdown()
a.logger.Print("[INFO] agent: delegate down")
pidErr := a.deletePid()
if pidErr != nil {

View File

@ -223,7 +223,7 @@ func (s *HTTPServer) AgentForceLeave(resp http.ResponseWriter, req *http.Request
// only warn because the write did succeed and anti-entropy will sync later.
func (s *HTTPServer) syncChanges() {
if err := s.agent.state.syncChanges(); err != nil {
s.logger.Printf("[ERR] agent: failed to sync changes: %v", err)
s.agent.logger.Printf("[ERR] agent: failed to sync changes: %v", err)
}
}
@ -654,7 +654,7 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) (
handler := &httpLogHandler{
filter: filter,
logCh: make(chan string, 512),
logger: s.logger,
logger: s.agent.logger,
}
s.agent.logWriter.RegisterHandler(handler)
defer s.agent.logWriter.DeregisterHandler(handler)

View File

@ -43,7 +43,6 @@ func makeReadOnlyAgentACL(t *testing.T, srv *HTTPServer) string {
func TestAgent_Services(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
srv1 := &structs.NodeService{
@ -71,7 +70,6 @@ func TestAgent_Services(t *testing.T) {
func TestAgent_Services_ACLFilter(t *testing.T) {
dir, srv := makeHTTPServerWithACLs(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
t.Run("no token", func(t *testing.T) {
@ -102,7 +100,6 @@ func TestAgent_Services_ACLFilter(t *testing.T) {
func TestAgent_Checks(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
chk1 := &structs.HealthCheck{
@ -130,7 +127,6 @@ func TestAgent_Checks(t *testing.T) {
func TestAgent_Checks_ACLFilter(t *testing.T) {
dir, srv := makeHTTPServerWithACLs(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
chk1 := &structs.HealthCheck{
@ -174,7 +170,6 @@ func TestAgent_Self(t *testing.T) {
conf.Meta = meta
})
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
req, _ := http.NewRequest("GET", "/v1/agent/self", nil)
@ -216,7 +211,6 @@ func TestAgent_Self(t *testing.T) {
func TestAgent_Self_ACLDeny(t *testing.T) {
dir, srv := makeHTTPServerWithACLs(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
t.Run("no token", func(t *testing.T) {
@ -285,7 +279,10 @@ func TestAgent_Reload(t *testing.T) {
}()
retry.Run(t, func(r *retry.R) {
if got, want := len(cmd.httpServers), 1; got != want {
if cmd == nil || cmd.agent == nil {
r.Fatal("waiting for agent")
}
if got, want := len(cmd.agent.httpServers), 1; got != want {
r.Fatalf("got %d servers want %d", got, want)
}
})
@ -299,7 +296,7 @@ func TestAgent_Reload(t *testing.T) {
t.Fatalf("err: %v", err)
}
srv := cmd.httpServers[0]
srv := cmd.agent.httpServers[0]
req, _ := http.NewRequest("PUT", "/v1/agent/reload", nil)
if _, err := srv.AgentReload(nil, req); err != nil {
t.Fatalf("Err: %v", err)
@ -313,7 +310,6 @@ func TestAgent_Reload(t *testing.T) {
func TestAgent_Reload_ACLDeny(t *testing.T) {
dir, srv := makeHTTPServerWithACLs(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
t.Run("no token", func(t *testing.T) {
@ -340,7 +336,6 @@ func TestAgent_Reload_ACLDeny(t *testing.T) {
func TestAgent_Members(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
req, _ := http.NewRequest("GET", "/v1/agent/members", nil)
@ -361,7 +356,6 @@ func TestAgent_Members(t *testing.T) {
func TestAgent_Members_WAN(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
req, _ := http.NewRequest("GET", "/v1/agent/members?wan=true", nil)
@ -382,7 +376,6 @@ func TestAgent_Members_WAN(t *testing.T) {
func TestAgent_Members_ACLFilter(t *testing.T) {
dir, srv := makeHTTPServerWithACLs(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
t.Run("no token", func(t *testing.T) {
@ -413,7 +406,6 @@ func TestAgent_Members_ACLFilter(t *testing.T) {
func TestAgent_Join(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
dir2, a2 := makeAgent(t, nextConfig())
@ -444,7 +436,6 @@ func TestAgent_Join(t *testing.T) {
func TestAgent_Join_WAN(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
dir2, a2 := makeAgent(t, nextConfig())
@ -475,7 +466,6 @@ func TestAgent_Join_WAN(t *testing.T) {
func TestAgent_Join_ACLDeny(t *testing.T) {
dir, srv := makeHTTPServerWithACLs(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
dir2, a2 := makeAgent(t, nextConfig())
@ -510,7 +500,6 @@ func TestAgent_Join_ACLDeny(t *testing.T) {
func TestAgent_Leave(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
dir2, srv2 := makeHTTPServerWithConfig(t, func(c *Config) {
@ -518,7 +507,7 @@ func TestAgent_Leave(t *testing.T) {
c.Bootstrap = false
})
defer os.RemoveAll(dir2)
defer srv2.Shutdown()
defer srv2.agent.Shutdown()
// Join first
addr := fmt.Sprintf("127.0.0.1:%d", srv2.agent.config.Ports.SerfLan)
@ -547,7 +536,6 @@ func TestAgent_Leave(t *testing.T) {
func TestAgent_Leave_ACLDeny(t *testing.T) {
dir, srv := makeHTTPServerWithACLs(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
t.Run("no token", func(t *testing.T) {
@ -578,7 +566,6 @@ func TestAgent_Leave_ACLDeny(t *testing.T) {
func TestAgent_ForceLeave(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
dir2, a2 := makeAgent(t, nextConfig())
@ -615,7 +602,6 @@ func TestAgent_ForceLeave(t *testing.T) {
func TestAgent_ForceLeave_ACLDeny(t *testing.T) {
dir, srv := makeHTTPServerWithACLs(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
t.Run("no token", func(t *testing.T) {
@ -644,7 +630,6 @@ func TestAgent_ForceLeave_ACLDeny(t *testing.T) {
func TestAgent_RegisterCheck(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
// Register node
@ -686,7 +671,6 @@ func TestAgent_RegisterCheck(t *testing.T) {
func TestAgent_RegisterCheck_Passing(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
// Register node
@ -723,7 +707,6 @@ func TestAgent_RegisterCheck_Passing(t *testing.T) {
func TestAgent_RegisterCheck_BadStatus(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
// Register node
@ -745,7 +728,6 @@ func TestAgent_RegisterCheck_BadStatus(t *testing.T) {
func TestAgent_RegisterCheck_ACLDeny(t *testing.T) {
dir, srv := makeHTTPServerWithACLs(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
args := &CheckDefinition{
@ -771,7 +753,6 @@ func TestAgent_RegisterCheck_ACLDeny(t *testing.T) {
func TestAgent_DeregisterCheck(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
chk := &structs.HealthCheck{Name: "test", CheckID: "test"}
@ -798,7 +779,6 @@ func TestAgent_DeregisterCheck(t *testing.T) {
func TestAgent_DeregisterCheckACLDeny(t *testing.T) {
dir, srv := makeHTTPServerWithACLs(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
chk := &structs.HealthCheck{Name: "test", CheckID: "test"}
@ -824,7 +804,6 @@ func TestAgent_DeregisterCheckACLDeny(t *testing.T) {
func TestAgent_PassCheck(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
chk := &structs.HealthCheck{Name: "test", CheckID: "test"}
@ -852,7 +831,6 @@ func TestAgent_PassCheck(t *testing.T) {
func TestAgent_PassCheck_ACLDeny(t *testing.T) {
dir, srv := makeHTTPServerWithACLs(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
chk := &structs.HealthCheck{Name: "test", CheckID: "test"}
@ -879,7 +857,6 @@ func TestAgent_PassCheck_ACLDeny(t *testing.T) {
func TestAgent_WarnCheck(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
chk := &structs.HealthCheck{Name: "test", CheckID: "test"}
@ -907,7 +884,6 @@ func TestAgent_WarnCheck(t *testing.T) {
func TestAgent_WarnCheck_ACLDeny(t *testing.T) {
dir, srv := makeHTTPServerWithACLs(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
chk := &structs.HealthCheck{Name: "test", CheckID: "test"}
@ -934,7 +910,6 @@ func TestAgent_WarnCheck_ACLDeny(t *testing.T) {
func TestAgent_FailCheck(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
chk := &structs.HealthCheck{Name: "test", CheckID: "test"}
@ -962,7 +937,6 @@ func TestAgent_FailCheck(t *testing.T) {
func TestAgent_FailCheck_ACLDeny(t *testing.T) {
dir, srv := makeHTTPServerWithACLs(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
chk := &structs.HealthCheck{Name: "test", CheckID: "test"}
@ -989,7 +963,6 @@ func TestAgent_FailCheck_ACLDeny(t *testing.T) {
func TestAgent_UpdateCheck(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
chk := &structs.HealthCheck{Name: "test", CheckID: "test"}
@ -1089,7 +1062,6 @@ func TestAgent_UpdateCheck(t *testing.T) {
func TestAgent_UpdateCheck_ACLDeny(t *testing.T) {
dir, srv := makeHTTPServerWithACLs(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
chk := &structs.HealthCheck{Name: "test", CheckID: "test"}
@ -1118,7 +1090,6 @@ func TestAgent_UpdateCheck_ACLDeny(t *testing.T) {
func TestAgent_RegisterService(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
args := &ServiceDefinition{
@ -1171,7 +1142,6 @@ func TestAgent_RegisterService(t *testing.T) {
func TestAgent_RegisterService_ACLDeny(t *testing.T) {
dir, srv := makeHTTPServerWithACLs(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
args := &ServiceDefinition{
@ -1209,7 +1179,6 @@ func TestAgent_RegisterService_ACLDeny(t *testing.T) {
func TestAgent_RegisterService_InvalidAddress(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
for _, addr := range []string{"0.0.0.0", "::", "[::]"} {
@ -1238,7 +1207,6 @@ func TestAgent_RegisterService_InvalidAddress(t *testing.T) {
func TestAgent_DeregisterService(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
service := &structs.NodeService{
@ -1271,7 +1239,6 @@ func TestAgent_DeregisterService(t *testing.T) {
func TestAgent_DeregisterService_ACLDeny(t *testing.T) {
dir, srv := makeHTTPServerWithACLs(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
service := &structs.NodeService{
@ -1300,7 +1267,6 @@ func TestAgent_DeregisterService_ACLDeny(t *testing.T) {
func TestAgent_ServiceMaintenance_BadRequest(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
t.Run("not PUT", func(t *testing.T) {
@ -1351,7 +1317,6 @@ func TestAgent_ServiceMaintenance_BadRequest(t *testing.T) {
func TestAgent_ServiceMaintenance_Enable(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
// Register the service
@ -1394,7 +1359,6 @@ func TestAgent_ServiceMaintenance_Enable(t *testing.T) {
func TestAgent_ServiceMaintenance_Disable(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
// Register the service
@ -1431,7 +1395,6 @@ func TestAgent_ServiceMaintenance_Disable(t *testing.T) {
func TestAgent_ServiceMaintenance_ACLDeny(t *testing.T) {
dir, srv := makeHTTPServerWithACLs(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
// Register the service.
@ -1461,7 +1424,6 @@ func TestAgent_ServiceMaintenance_ACLDeny(t *testing.T) {
func TestAgent_NodeMaintenance_BadRequest(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
// Fails on non-PUT
@ -1488,7 +1450,6 @@ func TestAgent_NodeMaintenance_BadRequest(t *testing.T) {
func TestAgent_NodeMaintenance_Enable(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
// Force the node into maintenance mode
@ -1521,7 +1482,6 @@ func TestAgent_NodeMaintenance_Enable(t *testing.T) {
func TestAgent_NodeMaintenance_Disable(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
// Force the node into maintenance mode
@ -1546,7 +1506,6 @@ func TestAgent_NodeMaintenance_Disable(t *testing.T) {
func TestAgent_NodeMaintenance_ACLDeny(t *testing.T) {
dir, srv := makeHTTPServerWithACLs(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
t.Run("no token", func(t *testing.T) {
@ -1567,7 +1526,6 @@ func TestAgent_NodeMaintenance_ACLDeny(t *testing.T) {
func TestAgent_RegisterCheck_Service(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
args := &ServiceDefinition{
@ -1616,7 +1574,6 @@ func TestAgent_Monitor(t *testing.T) {
dir, srv := makeHTTPServerWithConfigLog(t, nil, logger, logWriter)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
// Try passing an invalid log level
@ -1678,7 +1635,6 @@ func (r *closableRecorder) CloseNotify() <-chan bool {
func TestAgent_Monitor_ACLDeny(t *testing.T) {
dir, srv := makeHTTPServerWithACLs(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
// Try without a token.

View File

@ -17,7 +17,6 @@ import (
func TestCatalogRegister(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -53,7 +52,6 @@ func TestCatalogRegister(t *testing.T) {
func TestCatalogRegister_Service_InvalidAddress(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -81,7 +79,6 @@ func TestCatalogRegister_Service_InvalidAddress(t *testing.T) {
func TestCatalogDeregister(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -103,7 +100,6 @@ func TestCatalogDeregister(t *testing.T) {
func TestCatalogDatacenters(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
retry.Run(t, func(r *retry.R) {
@ -122,7 +118,6 @@ func TestCatalogDatacenters(t *testing.T) {
func TestCatalogNodes(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -158,7 +153,6 @@ func TestCatalogNodes(t *testing.T) {
func TestCatalogNodes_MetaFilter(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -206,7 +200,6 @@ func TestCatalogNodes_WanTranslation(t *testing.T) {
c.ACLDatacenter = ""
})
defer os.RemoveAll(dir1)
defer srv1.Shutdown()
defer srv1.agent.Shutdown()
testrpc.WaitForLeader(t, srv1.agent.RPC, "dc1")
@ -217,7 +210,6 @@ func TestCatalogNodes_WanTranslation(t *testing.T) {
c.ACLDatacenter = ""
})
defer os.RemoveAll(dir2)
defer srv2.Shutdown()
defer srv2.agent.Shutdown()
testrpc.WaitForLeader(t, srv2.agent.RPC, "dc2")
@ -302,7 +294,6 @@ func TestCatalogNodes_WanTranslation(t *testing.T) {
func TestCatalogNodes_Blocking(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -358,7 +349,6 @@ func TestCatalogNodes_Blocking(t *testing.T) {
func TestCatalogNodes_DistanceSort(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -445,7 +435,6 @@ func TestCatalogNodes_DistanceSort(t *testing.T) {
func TestCatalogServices(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -483,7 +472,6 @@ func TestCatalogServices(t *testing.T) {
func TestCatalogServices_NodeMetaFilter(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -527,7 +515,6 @@ func TestCatalogServices_NodeMetaFilter(t *testing.T) {
func TestCatalogServiceNodes(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -583,7 +570,6 @@ func TestCatalogServiceNodes(t *testing.T) {
func TestCatalogServiceNodes_NodeMetaFilter(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -646,7 +632,6 @@ func TestCatalogServiceNodes_WanTranslation(t *testing.T) {
c.ACLDatacenter = ""
})
defer os.RemoveAll(dir1)
defer srv1.Shutdown()
defer srv1.agent.Shutdown()
testrpc.WaitForLeader(t, srv1.agent.RPC, "dc1")
@ -657,7 +642,6 @@ func TestCatalogServiceNodes_WanTranslation(t *testing.T) {
c.ACLDatacenter = ""
})
defer os.RemoveAll(dir2)
defer srv2.Shutdown()
defer srv2.agent.Shutdown()
testrpc.WaitForLeader(t, srv2.agent.RPC, "dc2")
@ -734,7 +718,6 @@ func TestCatalogServiceNodes_WanTranslation(t *testing.T) {
func TestCatalogServiceNodes_DistanceSort(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -824,7 +807,6 @@ func TestCatalogServiceNodes_DistanceSort(t *testing.T) {
func TestCatalogNodeServices(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -867,7 +849,6 @@ func TestCatalogNodeServices_WanTranslation(t *testing.T) {
c.ACLDatacenter = ""
})
defer os.RemoveAll(dir1)
defer srv1.Shutdown()
defer srv1.agent.Shutdown()
testrpc.WaitForLeader(t, srv1.agent.RPC, "dc1")
@ -878,7 +859,6 @@ func TestCatalogNodeServices_WanTranslation(t *testing.T) {
c.ACLDatacenter = ""
})
defer os.RemoveAll(dir2)
defer srv2.Shutdown()
defer srv2.agent.Shutdown()
testrpc.WaitForLeader(t, srv2.agent.RPC, "dc2")

View File

@ -50,8 +50,6 @@ type Command struct {
logFilter *logutils.LevelFilter
logOutput io.Writer
agent *Agent
httpServers []*HTTPServer
dnsServer *DNSServer
}
// readConfig is responsible for setup of our configuration using
@ -455,70 +453,6 @@ func (c *Command) readConfig() *Config {
return config
}
// setupAgent is used to start the agent and various interfaces
func (c *Command) setupAgent(config *Config, logOutput io.Writer, logWriter *logger.LogWriter) error {
c.UI.Output("Starting Consul agent...")
agent, err := Create(config, logOutput, logWriter, c.configReloadCh)
if err != nil {
c.UI.Error(fmt.Sprintf("Error starting agent: %s", err))
return err
}
c.agent = agent
if config.Ports.HTTP > 0 || config.Ports.HTTPS > 0 {
servers, err := NewHTTPServers(agent)
if err != nil {
agent.Shutdown()
c.UI.Error(fmt.Sprintf("Error starting http servers: %s", err))
return err
}
c.httpServers = servers
}
if config.Ports.DNS > 0 {
dnsAddr, err := config.ClientListener(config.Addresses.DNS, config.Ports.DNS)
if err != nil {
agent.Shutdown()
c.UI.Error(fmt.Sprintf("Invalid DNS bind address: %s", err))
return err
}
server, err := NewDNSServer(agent, &config.DNSConfig, logOutput,
config.Domain, dnsAddr.String(), config.DNSRecursors)
if err != nil {
agent.Shutdown()
c.UI.Error(fmt.Sprintf("Error starting dns server: %s", err))
return err
}
c.dnsServer = server
}
// Setup update checking
if !config.DisableUpdateCheck {
version := config.Version
if config.VersionPrerelease != "" {
version += fmt.Sprintf("-%s", config.VersionPrerelease)
}
updateParams := &checkpoint.CheckParams{
Product: "consul",
Version: version,
}
if !config.DisableAnonymousSignature {
updateParams.SignatureFile = filepath.Join(config.DataDir, "checkpoint-signature")
}
// Schedule a periodic check with expected interval of 24 hours
checkpoint.CheckInterval(updateParams, 24*time.Hour, c.checkpointResults)
// Do an immediate check within the next 30 seconds
go func() {
time.Sleep(lib.RandomStagger(30 * time.Second))
c.checkpointResults(checkpoint.Check(updateParams))
}()
}
return nil
}
// checkpointResults is used to handler periodic results from our update checker
func (c *Command) checkpointResults(results *checkpoint.CheckResponse, err error) {
if err != nil {
@ -806,16 +740,39 @@ func (c *Command) Run(args []string) int {
}
// Create the agent
if err := c.setupAgent(config, logOutput, logWriter); err != nil {
c.UI.Output("Starting Consul agent...")
agent, err := Create(config, logOutput, logWriter, c.configReloadCh)
if err != nil {
c.UI.Error(fmt.Sprintf("Error starting agent: %s", err))
return 1
}
c.agent = agent
// Setup update checking
if !config.DisableUpdateCheck {
version := config.Version
if config.VersionPrerelease != "" {
version += fmt.Sprintf("-%s", config.VersionPrerelease)
}
updateParams := &checkpoint.CheckParams{
Product: "consul",
Version: version,
}
if !config.DisableAnonymousSignature {
updateParams.SignatureFile = filepath.Join(config.DataDir, "checkpoint-signature")
}
// Schedule a periodic check with expected interval of 24 hours
checkpoint.CheckInterval(updateParams, 24*time.Hour, c.checkpointResults)
// Do an immediate check within the next 30 seconds
go func() {
time.Sleep(lib.RandomStagger(30 * time.Second))
c.checkpointResults(checkpoint.Check(updateParams))
}()
}
defer c.agent.Shutdown()
if c.dnsServer != nil {
defer c.dnsServer.Shutdown()
}
for _, server := range c.httpServers {
defer server.Shutdown()
}
// Join startup nodes if specified
if err := c.startupJoin(config); err != nil {
@ -831,7 +788,6 @@ func (c *Command) Run(args []string) int {
// Get the new client http listener addr
var httpAddr net.Addr
var err error
if config.Ports.HTTP != -1 {
httpAddr, err = config.ClientListener(config.Addresses.HTTP, config.Ports.HTTP)
} else if config.Ports.HTTPS != -1 {

View File

@ -1,6 +1,7 @@
package agent
import (
"crypto/tls"
"encoding/base64"
"encoding/json"
"fmt"
@ -760,6 +761,47 @@ type Config struct {
DeprecatedAtlasEndpoint string `mapstructure:"atlas_endpoint" json:"-"`
}
// IncomingTLSConfig returns the TLS configuration for TLS
// connections to consul.
func (c *Config) IncomingTLSConfig() (*tls.Config, error) {
tc := &tlsutil.Config{
VerifyIncoming: c.VerifyIncoming || c.VerifyIncomingHTTPS,
VerifyOutgoing: c.VerifyOutgoing,
CAFile: c.CAFile,
CAPath: c.CAPath,
CertFile: c.CertFile,
KeyFile: c.KeyFile,
NodeName: c.NodeName,
ServerName: c.ServerName,
TLSMinVersion: c.TLSMinVersion,
CipherSuites: c.TLSCipherSuites,
PreferServerCipherSuites: c.TLSPreferServerCipherSuites,
}
return tc.IncomingTLSConfig()
}
// HTTPAddrs returns the bind addresses for the HTTP server and
// the application protocol which should be served, e.g. 'http'
// or 'https'.
func (c *Config) HTTPAddrs() (map[string][]net.Addr, error) {
m := map[string][]net.Addr{}
if c.Ports.HTTP > 0 {
a, err := c.ClientListener(c.Addresses.HTTP, c.Ports.HTTP)
if err != nil {
return nil, err
}
m["http"] = []net.Addr{a}
}
if c.Ports.HTTPS > 0 {
a, err := c.ClientListener(c.Addresses.HTTPS, c.Ports.HTTPS)
if err != nil {
return nil, err
}
m["https"] = []net.Addr{a}
}
return m, nil
}
// Bool is used to initialize bool pointers in struct literals.
func Bool(b bool) *bool {
return &b
@ -914,13 +956,10 @@ func (c *Config) EncryptBytes() ([]byte, error) {
// ClientListener is used to format a listener for a
// port on a ClientAddr
func (c *Config) ClientListener(override string, port int) (net.Addr, error) {
var addr string
addr := c.ClientAddr
if override != "" {
addr = override
} else {
addr = c.ClientAddr
}
if path := socketPath(addr); path != "" {
return &net.UnixAddr{Name: path, Net: "unix"}, nil
}

View File

@ -15,7 +15,6 @@ import (
func TestCoordinate_Datacenters(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -39,7 +38,6 @@ func TestCoordinate_Datacenters(t *testing.T) {
func TestCoordinate_Nodes(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")

View File

@ -34,35 +34,25 @@ func makeDNSServer(t *testing.T) (string, *DNSServer) {
return makeDNSServerConfig(t, nil, nil)
}
func makeDNSServerConfig(
t *testing.T,
agentFn func(c *Config),
dnsFn func(*DNSConfig)) (string, *DNSServer) {
func makeDNSServerConfig(t *testing.T, agentFn func(c *Config), dnsFn func(*DNSConfig)) (string, *DNSServer) {
// Create the configs and apply the functions
agentConf := nextConfig()
c := nextConfig()
if agentFn != nil {
agentFn(agentConf)
agentFn(c)
}
dnsConf := &DefaultConfig().DNSConfig
c.DNSConfig = DefaultConfig().DNSConfig
if dnsFn != nil {
dnsFn(dnsConf)
dnsFn(&c.DNSConfig)
}
// Add in the recursor if any
if r := agentConf.DNSRecursor; r != "" {
agentConf.DNSRecursors = append(agentConf.DNSRecursors, r)
if r := c.DNSRecursor; r != "" {
c.DNSRecursors = append(c.DNSRecursors, r)
}
// Start the server
addr, _ := agentConf.ClientListener(agentConf.Addresses.DNS, agentConf.Ports.DNS)
dir, agent := makeAgent(t, agentConf)
server, err := NewDNSServer(agent, dnsConf, agent.logOutput,
agentConf.Domain, addr.String(), agentConf.DNSRecursors)
if err != nil {
t.Fatalf("err: %v", err)
}
return dir, server
dir, agent := makeAgent(t, c)
return dir, agent.dnsServers[0]
}
// makeRecursor creates a generic DNS server which always returns
@ -1283,7 +1273,7 @@ func TestDNS_ServiceLookup_WanAddress(t *testing.T) {
c.ACLDatacenter = ""
}, nil)
defer os.RemoveAll(dir1)
defer srv1.Shutdown()
defer srv1.agent.Shutdown()
dir2, srv2 := makeDNSServerConfig(t, func(c *Config) {
c.Datacenter = "dc2"
@ -1291,7 +1281,7 @@ func TestDNS_ServiceLookup_WanAddress(t *testing.T) {
c.ACLDatacenter = ""
}, nil)
defer os.RemoveAll(dir2)
defer srv2.Shutdown()
defer srv2.agent.Shutdown()
testrpc.WaitForLeader(t, srv1.agent.RPC, "dc1")
testrpc.WaitForLeader(t, srv2.agent.RPC, "dc2")
@ -2421,6 +2411,9 @@ func TestDNS_ServiceLookup_OnlyPassing(t *testing.T) {
}
// Only 1 is passing, so we should only get 1 answer
for _, a := range in.Answer {
fmt.Println(question, a)
}
if len(in.Answer) != 1 {
t.Fatalf("Bad: %#v", in)
}
@ -3364,14 +3357,14 @@ func TestDNS_PreparedQuery_Failover(t *testing.T) {
c.TranslateWanAddrs = true
}, nil)
defer os.RemoveAll(dir1)
defer srv1.Shutdown()
defer srv1.agent.Shutdown()
dir2, srv2 := makeDNSServerConfig(t, func(c *Config) {
c.Datacenter = "dc2"
c.TranslateWanAddrs = true
}, nil)
defer os.RemoveAll(dir2)
defer srv2.Shutdown()
defer srv2.agent.Shutdown()
testrpc.WaitForLeader(t, srv1.agent.RPC, "dc1")
testrpc.WaitForLeader(t, srv2.agent.RPC, "dc2")

View File

@ -178,7 +178,6 @@ func TestEventList_Filter(t *testing.T) {
func TestEventList_ACLFilter(t *testing.T) {
dir, srv := makeHTTPServerWithACLs(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
// Fire an event.

View File

@ -98,7 +98,6 @@ func TestHealthChecksInState_NodeMetaFilter(t *testing.T) {
func TestHealthChecksInState_DistanceSort(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -175,7 +174,6 @@ func TestHealthChecksInState_DistanceSort(t *testing.T) {
func TestHealthNodeChecks(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -212,7 +210,6 @@ func TestHealthNodeChecks(t *testing.T) {
func TestHealthServiceChecks(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -266,7 +263,6 @@ func TestHealthServiceChecks(t *testing.T) {
func TestHealthServiceChecks_NodeMetaFilter(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -321,7 +317,6 @@ func TestHealthServiceChecks_NodeMetaFilter(t *testing.T) {
func TestHealthServiceChecks_DistanceSort(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -403,7 +398,6 @@ func TestHealthServiceChecks_DistanceSort(t *testing.T) {
func TestHealthServiceNodes(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -472,7 +466,6 @@ func TestHealthServiceNodes(t *testing.T) {
func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -527,7 +520,6 @@ func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) {
func TestHealthServiceNodes_DistanceSort(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -609,7 +601,6 @@ func TestHealthServiceNodes_DistanceSort(t *testing.T) {
func TestHealthServiceNodes_PassingFilter(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -656,7 +647,6 @@ func TestHealthServiceNodes_WanTranslation(t *testing.T) {
c.ACLDatacenter = ""
})
defer os.RemoveAll(dir1)
defer srv1.Shutdown()
defer srv1.agent.Shutdown()
testrpc.WaitForLeader(t, srv1.agent.RPC, "dc1")
@ -667,7 +657,6 @@ func TestHealthServiceNodes_WanTranslation(t *testing.T) {
c.ACLDatacenter = ""
})
defer os.RemoveAll(dir2)
defer srv2.Shutdown()
defer srv2.agent.Shutdown()
testrpc.WaitForLeader(t, srv2.agent.RPC, "dc2")

View File

@ -1,175 +1,39 @@
package agent
import (
"crypto/tls"
"encoding/json"
"fmt"
"log"
"net"
"net/http"
"net/http/pprof"
"net/url"
"os"
"strconv"
"strings"
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/tlsutil"
"github.com/mitchellh/mapstructure"
)
// HTTPServer is used to wrap an Agent and expose various API's
// in a RESTful manner
// HTTPServer provides an HTTP api for an agent.
type HTTPServer struct {
*http.Server
agent *Agent
mux *http.ServeMux
listener net.Listener
logger *log.Logger
addr string
}
// NewHTTPServers starts new HTTP servers to provide an interface to
// the agent.
func NewHTTPServers(agent *Agent) ([]*HTTPServer, error) {
config := agent.config
logOutput := agent.logOutput
func NewHTTPServer(addr string, a *Agent) *HTTPServer {
s := &HTTPServer{&http.Server{Addr: addr}, a}
s.Server.Handler = s.handler(s.agent.config.EnableDebug)
return s
}
var servers []*HTTPServer
if config.Ports.HTTPS > 0 {
httpAddr, err := config.ClientListener(config.Addresses.HTTPS, config.Ports.HTTPS)
if err != nil {
return nil, err
}
tlsConf := &tlsutil.Config{
VerifyIncoming: config.VerifyIncoming || config.VerifyIncomingHTTPS,
VerifyOutgoing: config.VerifyOutgoing,
CAFile: config.CAFile,
CAPath: config.CAPath,
CertFile: config.CertFile,
KeyFile: config.KeyFile,
NodeName: config.NodeName,
ServerName: config.ServerName,
TLSMinVersion: config.TLSMinVersion,
CipherSuites: config.TLSCipherSuites,
PreferServerCipherSuites: config.TLSPreferServerCipherSuites,
}
tlsConfig, err := tlsConf.IncomingTLSConfig()
if err != nil {
return nil, err
}
ln, err := net.Listen(httpAddr.Network(), httpAddr.String())
if err != nil {
return nil, fmt.Errorf("Failed to get Listen on %s: %v", httpAddr.String(), err)
}
list := tls.NewListener(tcpKeepAliveListener{ln.(*net.TCPListener)}, tlsConfig)
// Create the mux
// handler is used to attach our handlers to the mux
func (s *HTTPServer) handler(enableDebug bool) http.Handler {
mux := http.NewServeMux()
// Create the server
srv := &HTTPServer{
agent: agent,
mux: mux,
listener: list,
logger: log.New(logOutput, "", log.LstdFlags),
addr: httpAddr.String(),
}
srv.registerHandlers(config.EnableDebug)
// Start the server
go http.Serve(list, mux)
servers = append(servers, srv)
}
if config.Ports.HTTP > 0 {
httpAddr, err := config.ClientListener(config.Addresses.HTTP, config.Ports.HTTP)
if err != nil {
return nil, fmt.Errorf("Failed to get ClientListener address:port: %v", err)
}
// Error if we are trying to bind a domain socket to an existing path
path := socketPath(config.Addresses.HTTP)
if path != "" {
if _, err := os.Stat(path); !os.IsNotExist(err) {
agent.logger.Printf("[WARN] agent: Replacing socket %q", path)
}
if err := os.Remove(path); err != nil && !os.IsNotExist(err) {
return nil, fmt.Errorf("error removing socket file: %s", err)
}
}
ln, err := net.Listen(httpAddr.Network(), httpAddr.String())
if err != nil {
return nil, fmt.Errorf("Failed to get Listen on %s: %v", httpAddr.String(), err)
}
var list net.Listener
if path != "" {
// Set up ownership/permission bits on the socket file
if err := setFilePermissions(path, config.UnixSockets); err != nil {
return nil, fmt.Errorf("Failed setting up HTTP socket: %s", err)
}
list = ln
} else {
list = tcpKeepAliveListener{ln.(*net.TCPListener)}
}
// Create the mux
mux := http.NewServeMux()
// Create the server
srv := &HTTPServer{
agent: agent,
mux: mux,
listener: list,
logger: log.New(logOutput, "", log.LstdFlags),
addr: httpAddr.String(),
}
srv.registerHandlers(config.EnableDebug)
// Start the server
go http.Serve(list, mux)
servers = append(servers, srv)
}
return servers, nil
}
// tcpKeepAliveListener sets TCP keep-alive timeouts on accepted
// connections. It's used by NewHttpServer so
// dead TCP connections eventually go away.
type tcpKeepAliveListener struct {
*net.TCPListener
}
func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) {
tc, err := ln.AcceptTCP()
if err != nil {
return
}
tc.SetKeepAlive(true)
tc.SetKeepAlivePeriod(30 * time.Second)
return tc, nil
}
// Shutdown is used to shutdown the HTTP server
func (s *HTTPServer) Shutdown() {
if s != nil {
s.logger.Printf("[DEBUG] http: Shutting down http server (%v)", s.addr)
s.listener.Close()
}
}
// handleFuncMetrics takes the given pattern and handler and wraps to produce
// metrics based on the pattern and request.
func (s *HTTPServer) handleFuncMetrics(pattern string, handler func(http.ResponseWriter, *http.Request)) {
// handleFuncMetrics takes the given pattern and handler and wraps to produce
// metrics based on the pattern and request.
handleFuncMetrics := func(pattern string, handler http.HandlerFunc) {
// Get the parts of the pattern. We omit any initial empty for the
// leading slash, and put an underscore as a "thing" placeholder if we
// see a trailing slash, which means the part after is parsed. This lets
@ -179,9 +43,8 @@ func (s *HTTPServer) handleFuncMetrics(pattern string, handler func(http.Respons
if part == "" {
if i == 0 {
continue
} else {
part = "_"
}
part = "_"
}
parts = append(parts, part)
}
@ -191,110 +54,108 @@ func (s *HTTPServer) handleFuncMetrics(pattern string, handler func(http.Respons
wrapper := func(resp http.ResponseWriter, req *http.Request) {
start := time.Now()
handler(resp, req)
key := append([]string{"consul", "http", req.Method}, parts...)
metrics.MeasureSince(key, start)
}
s.mux.HandleFunc(pattern, wrapper)
}
mux.HandleFunc(pattern, wrapper)
}
// registerHandlers is used to attach our handlers to the mux
func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/", s.Index)
mux.HandleFunc("/", s.Index)
// API V1.
if s.agent.config.ACLDatacenter != "" {
s.handleFuncMetrics("/v1/acl/create", s.wrap(s.ACLCreate))
s.handleFuncMetrics("/v1/acl/update", s.wrap(s.ACLUpdate))
s.handleFuncMetrics("/v1/acl/destroy/", s.wrap(s.ACLDestroy))
s.handleFuncMetrics("/v1/acl/info/", s.wrap(s.ACLGet))
s.handleFuncMetrics("/v1/acl/clone/", s.wrap(s.ACLClone))
s.handleFuncMetrics("/v1/acl/list", s.wrap(s.ACLList))
s.handleFuncMetrics("/v1/acl/replication", s.wrap(s.ACLReplicationStatus))
handleFuncMetrics("/v1/acl/create", s.wrap(s.ACLCreate))
handleFuncMetrics("/v1/acl/update", s.wrap(s.ACLUpdate))
handleFuncMetrics("/v1/acl/destroy/", s.wrap(s.ACLDestroy))
handleFuncMetrics("/v1/acl/info/", s.wrap(s.ACLGet))
handleFuncMetrics("/v1/acl/clone/", s.wrap(s.ACLClone))
handleFuncMetrics("/v1/acl/list", s.wrap(s.ACLList))
handleFuncMetrics("/v1/acl/replication", s.wrap(s.ACLReplicationStatus))
} else {
s.handleFuncMetrics("/v1/acl/create", s.wrap(ACLDisabled))
s.handleFuncMetrics("/v1/acl/update", s.wrap(ACLDisabled))
s.handleFuncMetrics("/v1/acl/destroy/", s.wrap(ACLDisabled))
s.handleFuncMetrics("/v1/acl/info/", s.wrap(ACLDisabled))
s.handleFuncMetrics("/v1/acl/clone/", s.wrap(ACLDisabled))
s.handleFuncMetrics("/v1/acl/list", s.wrap(ACLDisabled))
s.handleFuncMetrics("/v1/acl/replication", s.wrap(ACLDisabled))
handleFuncMetrics("/v1/acl/create", s.wrap(ACLDisabled))
handleFuncMetrics("/v1/acl/update", s.wrap(ACLDisabled))
handleFuncMetrics("/v1/acl/destroy/", s.wrap(ACLDisabled))
handleFuncMetrics("/v1/acl/info/", s.wrap(ACLDisabled))
handleFuncMetrics("/v1/acl/clone/", s.wrap(ACLDisabled))
handleFuncMetrics("/v1/acl/list", s.wrap(ACLDisabled))
handleFuncMetrics("/v1/acl/replication", s.wrap(ACLDisabled))
}
s.handleFuncMetrics("/v1/agent/self", s.wrap(s.AgentSelf))
s.handleFuncMetrics("/v1/agent/maintenance", s.wrap(s.AgentNodeMaintenance))
s.handleFuncMetrics("/v1/agent/reload", s.wrap(s.AgentReload))
s.handleFuncMetrics("/v1/agent/monitor", s.wrap(s.AgentMonitor))
s.handleFuncMetrics("/v1/agent/services", s.wrap(s.AgentServices))
s.handleFuncMetrics("/v1/agent/checks", s.wrap(s.AgentChecks))
s.handleFuncMetrics("/v1/agent/members", s.wrap(s.AgentMembers))
s.handleFuncMetrics("/v1/agent/join/", s.wrap(s.AgentJoin))
s.handleFuncMetrics("/v1/agent/leave", s.wrap(s.AgentLeave))
s.handleFuncMetrics("/v1/agent/force-leave/", s.wrap(s.AgentForceLeave))
s.handleFuncMetrics("/v1/agent/check/register", s.wrap(s.AgentRegisterCheck))
s.handleFuncMetrics("/v1/agent/check/deregister/", s.wrap(s.AgentDeregisterCheck))
s.handleFuncMetrics("/v1/agent/check/pass/", s.wrap(s.AgentCheckPass))
s.handleFuncMetrics("/v1/agent/check/warn/", s.wrap(s.AgentCheckWarn))
s.handleFuncMetrics("/v1/agent/check/fail/", s.wrap(s.AgentCheckFail))
s.handleFuncMetrics("/v1/agent/check/update/", s.wrap(s.AgentCheckUpdate))
s.handleFuncMetrics("/v1/agent/service/register", s.wrap(s.AgentRegisterService))
s.handleFuncMetrics("/v1/agent/service/deregister/", s.wrap(s.AgentDeregisterService))
s.handleFuncMetrics("/v1/agent/service/maintenance/", s.wrap(s.AgentServiceMaintenance))
s.handleFuncMetrics("/v1/catalog/register", s.wrap(s.CatalogRegister))
s.handleFuncMetrics("/v1/catalog/deregister", s.wrap(s.CatalogDeregister))
s.handleFuncMetrics("/v1/catalog/datacenters", s.wrap(s.CatalogDatacenters))
s.handleFuncMetrics("/v1/catalog/nodes", s.wrap(s.CatalogNodes))
s.handleFuncMetrics("/v1/catalog/services", s.wrap(s.CatalogServices))
s.handleFuncMetrics("/v1/catalog/service/", s.wrap(s.CatalogServiceNodes))
s.handleFuncMetrics("/v1/catalog/node/", s.wrap(s.CatalogNodeServices))
handleFuncMetrics("/v1/agent/self", s.wrap(s.AgentSelf))
handleFuncMetrics("/v1/agent/maintenance", s.wrap(s.AgentNodeMaintenance))
handleFuncMetrics("/v1/agent/reload", s.wrap(s.AgentReload))
handleFuncMetrics("/v1/agent/monitor", s.wrap(s.AgentMonitor))
handleFuncMetrics("/v1/agent/services", s.wrap(s.AgentServices))
handleFuncMetrics("/v1/agent/checks", s.wrap(s.AgentChecks))
handleFuncMetrics("/v1/agent/members", s.wrap(s.AgentMembers))
handleFuncMetrics("/v1/agent/join/", s.wrap(s.AgentJoin))
handleFuncMetrics("/v1/agent/leave", s.wrap(s.AgentLeave))
handleFuncMetrics("/v1/agent/force-leave/", s.wrap(s.AgentForceLeave))
handleFuncMetrics("/v1/agent/check/register", s.wrap(s.AgentRegisterCheck))
handleFuncMetrics("/v1/agent/check/deregister/", s.wrap(s.AgentDeregisterCheck))
handleFuncMetrics("/v1/agent/check/pass/", s.wrap(s.AgentCheckPass))
handleFuncMetrics("/v1/agent/check/warn/", s.wrap(s.AgentCheckWarn))
handleFuncMetrics("/v1/agent/check/fail/", s.wrap(s.AgentCheckFail))
handleFuncMetrics("/v1/agent/check/update/", s.wrap(s.AgentCheckUpdate))
handleFuncMetrics("/v1/agent/service/register", s.wrap(s.AgentRegisterService))
handleFuncMetrics("/v1/agent/service/deregister/", s.wrap(s.AgentDeregisterService))
handleFuncMetrics("/v1/agent/service/maintenance/", s.wrap(s.AgentServiceMaintenance))
handleFuncMetrics("/v1/catalog/register", s.wrap(s.CatalogRegister))
handleFuncMetrics("/v1/catalog/deregister", s.wrap(s.CatalogDeregister))
handleFuncMetrics("/v1/catalog/datacenters", s.wrap(s.CatalogDatacenters))
handleFuncMetrics("/v1/catalog/nodes", s.wrap(s.CatalogNodes))
handleFuncMetrics("/v1/catalog/services", s.wrap(s.CatalogServices))
handleFuncMetrics("/v1/catalog/service/", s.wrap(s.CatalogServiceNodes))
handleFuncMetrics("/v1/catalog/node/", s.wrap(s.CatalogNodeServices))
if !s.agent.config.DisableCoordinates {
s.handleFuncMetrics("/v1/coordinate/datacenters", s.wrap(s.CoordinateDatacenters))
s.handleFuncMetrics("/v1/coordinate/nodes", s.wrap(s.CoordinateNodes))
handleFuncMetrics("/v1/coordinate/datacenters", s.wrap(s.CoordinateDatacenters))
handleFuncMetrics("/v1/coordinate/nodes", s.wrap(s.CoordinateNodes))
} else {
s.handleFuncMetrics("/v1/coordinate/datacenters", s.wrap(coordinateDisabled))
s.handleFuncMetrics("/v1/coordinate/nodes", s.wrap(coordinateDisabled))
handleFuncMetrics("/v1/coordinate/datacenters", s.wrap(coordinateDisabled))
handleFuncMetrics("/v1/coordinate/nodes", s.wrap(coordinateDisabled))
}
s.handleFuncMetrics("/v1/event/fire/", s.wrap(s.EventFire))
s.handleFuncMetrics("/v1/event/list", s.wrap(s.EventList))
s.handleFuncMetrics("/v1/health/node/", s.wrap(s.HealthNodeChecks))
s.handleFuncMetrics("/v1/health/checks/", s.wrap(s.HealthServiceChecks))
s.handleFuncMetrics("/v1/health/state/", s.wrap(s.HealthChecksInState))
s.handleFuncMetrics("/v1/health/service/", s.wrap(s.HealthServiceNodes))
s.handleFuncMetrics("/v1/internal/ui/nodes", s.wrap(s.UINodes))
s.handleFuncMetrics("/v1/internal/ui/node/", s.wrap(s.UINodeInfo))
s.handleFuncMetrics("/v1/internal/ui/services", s.wrap(s.UIServices))
s.handleFuncMetrics("/v1/kv/", s.wrap(s.KVSEndpoint))
s.handleFuncMetrics("/v1/operator/raft/configuration", s.wrap(s.OperatorRaftConfiguration))
s.handleFuncMetrics("/v1/operator/raft/peer", s.wrap(s.OperatorRaftPeer))
s.handleFuncMetrics("/v1/operator/keyring", s.wrap(s.OperatorKeyringEndpoint))
s.handleFuncMetrics("/v1/operator/autopilot/configuration", s.wrap(s.OperatorAutopilotConfiguration))
s.handleFuncMetrics("/v1/operator/autopilot/health", s.wrap(s.OperatorServerHealth))
s.handleFuncMetrics("/v1/query", s.wrap(s.PreparedQueryGeneral))
s.handleFuncMetrics("/v1/query/", s.wrap(s.PreparedQuerySpecific))
s.handleFuncMetrics("/v1/session/create", s.wrap(s.SessionCreate))
s.handleFuncMetrics("/v1/session/destroy/", s.wrap(s.SessionDestroy))
s.handleFuncMetrics("/v1/session/renew/", s.wrap(s.SessionRenew))
s.handleFuncMetrics("/v1/session/info/", s.wrap(s.SessionGet))
s.handleFuncMetrics("/v1/session/node/", s.wrap(s.SessionsForNode))
s.handleFuncMetrics("/v1/session/list", s.wrap(s.SessionList))
s.handleFuncMetrics("/v1/status/leader", s.wrap(s.StatusLeader))
s.handleFuncMetrics("/v1/status/peers", s.wrap(s.StatusPeers))
s.handleFuncMetrics("/v1/snapshot", s.wrap(s.Snapshot))
s.handleFuncMetrics("/v1/txn", s.wrap(s.Txn))
handleFuncMetrics("/v1/event/fire/", s.wrap(s.EventFire))
handleFuncMetrics("/v1/event/list", s.wrap(s.EventList))
handleFuncMetrics("/v1/health/node/", s.wrap(s.HealthNodeChecks))
handleFuncMetrics("/v1/health/checks/", s.wrap(s.HealthServiceChecks))
handleFuncMetrics("/v1/health/state/", s.wrap(s.HealthChecksInState))
handleFuncMetrics("/v1/health/service/", s.wrap(s.HealthServiceNodes))
handleFuncMetrics("/v1/internal/ui/nodes", s.wrap(s.UINodes))
handleFuncMetrics("/v1/internal/ui/node/", s.wrap(s.UINodeInfo))
handleFuncMetrics("/v1/internal/ui/services", s.wrap(s.UIServices))
handleFuncMetrics("/v1/kv/", s.wrap(s.KVSEndpoint))
handleFuncMetrics("/v1/operator/raft/configuration", s.wrap(s.OperatorRaftConfiguration))
handleFuncMetrics("/v1/operator/raft/peer", s.wrap(s.OperatorRaftPeer))
handleFuncMetrics("/v1/operator/keyring", s.wrap(s.OperatorKeyringEndpoint))
handleFuncMetrics("/v1/operator/autopilot/configuration", s.wrap(s.OperatorAutopilotConfiguration))
handleFuncMetrics("/v1/operator/autopilot/health", s.wrap(s.OperatorServerHealth))
handleFuncMetrics("/v1/query", s.wrap(s.PreparedQueryGeneral))
handleFuncMetrics("/v1/query/", s.wrap(s.PreparedQuerySpecific))
handleFuncMetrics("/v1/session/create", s.wrap(s.SessionCreate))
handleFuncMetrics("/v1/session/destroy/", s.wrap(s.SessionDestroy))
handleFuncMetrics("/v1/session/renew/", s.wrap(s.SessionRenew))
handleFuncMetrics("/v1/session/info/", s.wrap(s.SessionGet))
handleFuncMetrics("/v1/session/node/", s.wrap(s.SessionsForNode))
handleFuncMetrics("/v1/session/list", s.wrap(s.SessionList))
handleFuncMetrics("/v1/status/leader", s.wrap(s.StatusLeader))
handleFuncMetrics("/v1/status/peers", s.wrap(s.StatusPeers))
handleFuncMetrics("/v1/snapshot", s.wrap(s.Snapshot))
handleFuncMetrics("/v1/txn", s.wrap(s.Txn))
// Debug endpoints.
if enableDebug {
s.handleFuncMetrics("/debug/pprof/", pprof.Index)
s.handleFuncMetrics("/debug/pprof/cmdline", pprof.Cmdline)
s.handleFuncMetrics("/debug/pprof/profile", pprof.Profile)
s.handleFuncMetrics("/debug/pprof/symbol", pprof.Symbol)
handleFuncMetrics("/debug/pprof/", pprof.Index)
handleFuncMetrics("/debug/pprof/cmdline", pprof.Cmdline)
handleFuncMetrics("/debug/pprof/profile", pprof.Profile)
handleFuncMetrics("/debug/pprof/symbol", pprof.Symbol)
}
// Use the custom UI dir if provided.
if s.agent.config.UIDir != "" {
s.mux.Handle("/ui/", http.StripPrefix("/ui/", http.FileServer(http.Dir(s.agent.config.UIDir))))
mux.Handle("/ui/", http.StripPrefix("/ui/", http.FileServer(http.Dir(s.agent.config.UIDir))))
} else if s.agent.config.EnableUI {
s.mux.Handle("/ui/", http.StripPrefix("/ui/", http.FileServer(assetFS())))
mux.Handle("/ui/", http.StripPrefix("/ui/", http.FileServer(assetFS())))
}
return mux
}
// wrap is used to wrap functions to make them more convenient
@ -306,7 +167,7 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque
// Obfuscate any tokens from appearing in the logs
formVals, err := url.ParseQuery(req.URL.RawQuery)
if err != nil {
s.logger.Printf("[ERR] http: Failed to decode query: %s from=%s", err, req.RemoteAddr)
s.agent.logger.Printf("[ERR] http: Failed to decode query: %s from=%s", err, req.RemoteAddr)
resp.WriteHeader(http.StatusInternalServerError) // 500
return
}
@ -322,7 +183,7 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque
}
handleErr := func(err error) {
s.logger.Printf("[ERR] http: Request %s %v, error: %v from=%s", req.Method, logURL, err, req.RemoteAddr)
s.agent.logger.Printf("[ERR] http: Request %s %v, error: %v from=%s", req.Method, logURL, err, req.RemoteAddr)
code := http.StatusInternalServerError // 500
errMsg := err.Error()
if strings.Contains(errMsg, "Permission denied") || strings.Contains(errMsg, "ACL not found") {
@ -344,7 +205,7 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque
// Invoke the handler
start := time.Now()
defer func() {
s.logger.Printf("[DEBUG] http: Request %s %v (%v) from=%s", req.Method, logURL, time.Now().Sub(start), req.RemoteAddr)
s.agent.logger.Printf("[DEBUG] http: Request %s %v (%v) from=%s", req.Method, logURL, time.Now().Sub(start), req.RemoteAddr)
}()
obj, err := handler(resp, req)
if err != nil {

View File

@ -51,26 +51,13 @@ func makeHTTPServerWithACLs(t *testing.T) (string, *HTTPServer) {
}
func makeHTTPServerWithConfigLog(t *testing.T, cb func(c *Config), l io.Writer, logWriter *logger.LogWriter) (string, *HTTPServer) {
configTry := 0
RECONF:
configTry++
conf := nextConfig()
if cb != nil {
cb(conf)
}
dir, agent := makeAgentLog(t, conf, l, logWriter)
servers, err := NewHTTPServers(agent)
if err != nil {
if configTry < 3 {
goto RECONF
}
t.Fatalf("err: %v", err)
}
if len(servers) == 0 {
t.Fatalf(fmt.Sprintf("Failed to make HTTP server"))
}
return dir, servers[0]
return dir, agent.httpServers[0]
}
func TestHTTPServer_UnixSocket(t *testing.T) {
@ -91,7 +78,6 @@ func TestHTTPServer_UnixSocket(t *testing.T) {
c.UnixSockets.Perms = "0777"
})
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
// Ensure the socket was created
@ -158,12 +144,9 @@ func TestHTTPServer_UnixSocket_FileExists(t *testing.T) {
dir, agent := makeAgent(t, conf)
defer os.RemoveAll(dir)
defer agent.Shutdown()
// Try to start the server with the same path anyways.
if _, err := NewHTTPServers(agent); err != nil {
t.Fatalf("err: %s", err)
}
defer agent.Shutdown()
// Ensure the file was replaced by the socket
fi, err = os.Stat(socket)
if err != nil {
@ -238,7 +221,6 @@ func TestHTTPAPI_TranslateAddrHeader(t *testing.T) {
{
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
resp := httptest.NewRecorder()
@ -260,7 +242,6 @@ func TestHTTPAPI_TranslateAddrHeader(t *testing.T) {
dir, srv := makeHTTPServer(t)
srv.agent.config.TranslateWanAddrs = true
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
resp := httptest.NewRecorder()
@ -286,7 +267,6 @@ func TestHTTPAPIResponseHeaders(t *testing.T) {
}
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
resp := httptest.NewRecorder()
@ -313,7 +293,6 @@ func TestContentTypeIsJSON(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
resp := httptest.NewRecorder()
@ -336,12 +315,11 @@ func TestContentTypeIsJSON(t *testing.T) {
func TestHTTP_wrap_obfuscateLog(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
// Attach a custom logger so we can inspect it
buf := &bytes.Buffer{}
srv.logger = log.New(buf, "", log.LstdFlags)
srv.agent.logger = log.New(buf, "", log.LstdFlags)
resp := httptest.NewRecorder()
req, _ := http.NewRequest("GET", "/some/url?token=secret1&token=secret2", nil)
@ -367,7 +345,6 @@ func TestPrettyPrintBare(t *testing.T) {
func testPrettyPrint(pretty string, t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
r := &structs.DirEntry{Key: "key"}
@ -396,7 +373,6 @@ func testPrettyPrint(pretty string, t *testing.T) {
func TestParseSource(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
// Default is agent's DC and no node (since the user didn't care, then
@ -578,7 +554,7 @@ func TestEnableWebUI(t *testing.T) {
req, _ := http.NewRequest("GET", "/ui/", nil)
// Perform the request
resp := httptest.NewRecorder()
s.mux.ServeHTTP(resp, req)
s.Handler.ServeHTTP(resp, req)
// Check the result
if resp.Code != 200 {
@ -626,7 +602,6 @@ func httpTest(t *testing.T, f func(srv *HTTPServer)) {
func httpTestWithConfig(t *testing.T, f func(srv *HTTPServer), cb func(c *Config)) {
dir, srv := makeHTTPServerWithConfig(t, cb)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
f(srv)

View File

@ -16,7 +16,6 @@ import (
func TestKVSEndpoint_PUT_GET_DELETE(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -78,7 +77,6 @@ func TestKVSEndpoint_PUT_GET_DELETE(t *testing.T) {
func TestKVSEndpoint_Recurse(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -157,7 +155,6 @@ func TestKVSEndpoint_Recurse(t *testing.T) {
func TestKVSEndpoint_DELETE_CAS(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -226,7 +223,6 @@ func TestKVSEndpoint_DELETE_CAS(t *testing.T) {
func TestKVSEndpoint_CAS(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -305,7 +301,6 @@ func TestKVSEndpoint_CAS(t *testing.T) {
func TestKVSEndpoint_ListKeys(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")

61
command/agent/listen.go Normal file
View File

@ -0,0 +1,61 @@
package agent
import (
"crypto/tls"
"fmt"
"net"
"os"
"time"
)
func ListenTCP(addr string) (net.Listener, error) {
l, err := net.Listen("tcp", addr)
if err != nil {
return nil, err
}
l = tcpKeepAliveListener{l.(*net.TCPListener)}
return l, nil
}
func ListenTLS(addr string, cfg *tls.Config) (net.Listener, error) {
l, err := ListenTCP(addr)
if err != nil {
return nil, err
}
return tls.NewListener(l, cfg), nil
}
func ListenUnix(addr string, perm FilePermissions) (net.Listener, error) {
// todo(fs): move this somewhere else
// if _, err := os.Stat(addr); !os.IsNotExist(err) {
// s.agent.logger.Printf("[WARN] agent: Replacing socket %q", addr)
// }
if err := os.Remove(addr); err != nil && !os.IsNotExist(err) {
return nil, fmt.Errorf("error removing socket file: %s", err)
}
l, err := net.Listen("unix", addr)
if err != nil {
return nil, err
}
if err := setFilePermissions(addr, perm); err != nil {
return nil, fmt.Errorf("Failed setting up HTTP socket: %s", err)
}
return l, nil
}
// tcpKeepAliveListener sets TCP keep-alive timeouts on accepted
// connections. It's used by NewHttpServer so
// dead TCP connections eventually go away.
type tcpKeepAliveListener struct {
*net.TCPListener
}
func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) {
tc, err := ln.AcceptTCP()
if err != nil {
return
}
tc.SetKeepAlive(true)
tc.SetKeepAlivePeriod(30 * time.Second)
return tc, nil
}

View File

@ -10,7 +10,6 @@ import (
func TestStatusLeader(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -28,7 +27,6 @@ func TestStatusLeader(t *testing.T) {
func TestStatusPeers(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
obj, err := srv.StatusPeers(nil, nil)

View File

@ -29,7 +29,6 @@ func TestUiIndex(t *testing.T) {
c.UIDir = uiDir
})
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
// Create file
@ -41,7 +40,7 @@ func TestUiIndex(t *testing.T) {
// Register node
req, _ := http.NewRequest("GET", "/ui/my-file", nil)
req.URL.Scheme = "http"
req.URL.Host = srv.listener.Addr().String()
req.URL.Host = srv.Addr
// Make the request
client := cleanhttp.DefaultClient()
@ -66,7 +65,6 @@ func TestUiIndex(t *testing.T) {
func TestUiNodes(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")
@ -106,7 +104,6 @@ func TestUiNodes(t *testing.T) {
func TestUiNodeInfo(t *testing.T) {
dir, srv := makeHTTPServer(t)
defer os.RemoveAll(dir)
defer srv.Shutdown()
defer srv.agent.Shutdown()
testrpc.WaitForLeader(t, srv.agent.RPC, "dc1")

View File

@ -20,8 +20,6 @@ import (
"github.com/mitchellh/cli"
)
var offset uint64
func init() {
// Seed the random number generator
rand.Seed(time.Now().UnixNano())
@ -29,25 +27,23 @@ func init() {
version.Version = "0.8.0"
}
type agentWrapper struct {
dir string
config *agent.Config
type server struct {
agent *agent.Agent
http *agent.HTTPServer
config *agent.Config
httpAddr string
dir string
}
func (a *agentWrapper) Shutdown() {
func (a *server) Shutdown() {
a.agent.Shutdown()
a.http.Shutdown()
os.RemoveAll(a.dir)
}
func testAgent(t *testing.T) *agentWrapper {
func testAgent(t *testing.T) *server {
return testAgentWithConfig(t, nil)
}
func testAgentWithAPIClient(t *testing.T) (*agentWrapper, *api.Client) {
func testAgentWithAPIClient(t *testing.T) (*server, *api.Client) {
agent := testAgentWithConfig(t, func(c *agent.Config) {})
client, err := api.NewClient(&api.Config{Address: agent.httpAddr})
if err != nil {
@ -56,71 +52,54 @@ func testAgentWithAPIClient(t *testing.T) (*agentWrapper, *api.Client) {
return agent, client
}
func testAgentWithConfig(t *testing.T, cb func(c *agent.Config)) *agentWrapper {
func testAgentWithConfig(t *testing.T, cb func(c *agent.Config)) *server {
return testAgentWithConfigReload(t, cb, nil)
}
func testAgentWithConfigReload(t *testing.T, cb func(c *agent.Config), reloadCh chan chan error) *agentWrapper {
lw := logger.NewLogWriter(512)
func testAgentWithConfigReload(t *testing.T, cb func(c *agent.Config), reloadCh chan chan error) *server {
conf := nextConfig()
if cb != nil {
cb(conf)
}
dir := testutil.TempDir(t, "agent")
conf.DataDir = dir
a, err := agent.Create(conf, lw, nil, reloadCh)
conf.DataDir = testutil.TempDir(t, "agent")
a, err := agent.Create(conf, logger.NewLogWriter(512), nil, reloadCh)
if err != nil {
os.RemoveAll(dir)
t.Fatalf(fmt.Sprintf("err: %v", err))
os.RemoveAll(conf.DataDir)
t.Fatalf("err: %v", err)
}
conf.Addresses.HTTP = "127.0.0.1"
httpAddr := fmt.Sprintf("127.0.0.1:%d", conf.Ports.HTTP)
http, err := agent.NewHTTPServers(a)
if err != nil {
os.RemoveAll(dir)
t.Fatalf(fmt.Sprintf("err: %v", err))
}
if http == nil || len(http) == 0 {
os.RemoveAll(dir)
t.Fatalf(fmt.Sprintf("Could not create HTTP server to listen on: %s", httpAddr))
}
return &agentWrapper{
dir: dir,
config: conf,
agent: a,
http: http[0],
httpAddr: httpAddr,
}
addr := fmt.Sprintf("%s:%d", conf.Addresses.HTTP, conf.Ports.HTTP)
return &server{agent: a, config: conf, httpAddr: addr, dir: conf.DataDir}
}
func nextConfig() *agent.Config {
idx := int(atomic.AddUint64(&offset, 1))
conf := agent.DefaultConfig()
var nextPort uint64 = 10000
func nextConfig() *agent.Config {
nodeID, err := uuid.GenerateUUID()
if err != nil {
panic(err)
}
port := int(atomic.AddUint64(&nextPort, 10))
conf := agent.DefaultConfig()
conf.Bootstrap = true
conf.Datacenter = "dc1"
conf.NodeName = fmt.Sprintf("Node %d", idx)
conf.NodeName = fmt.Sprintf("Node %d", port)
conf.NodeID = types.NodeID(nodeID)
conf.BindAddr = "127.0.0.1"
conf.Server = true
conf.Version = version.Version
conf.Ports.HTTP = 10000 + 10*idx
conf.Ports.HTTPS = 10401 + 10*idx
conf.Ports.SerfLan = 10201 + 10*idx
conf.Ports.SerfWan = 10202 + 10*idx
conf.Ports.Server = 10300 + 10*idx
conf.Ports = agent.PortConfig{
DNS: port + 1,
HTTP: port + 2,
HTTPS: port + 3,
SerfLan: port + 4,
SerfWan: port + 5,
Server: port + 6,
}
cons := consul.DefaultConfig()
conf.ConsulConfig = cons