diff --git a/command/agent/agent.go b/command/agent/agent.go index 827ccd2c40..1c492bca58 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -1,6 +1,7 @@ package agent import ( + "context" "crypto/sha512" "encoding/json" "errors" @@ -9,6 +10,7 @@ import ( "io/ioutil" "log" "net" + "net/http" "os" "path/filepath" "reflect" @@ -141,27 +143,62 @@ type Agent struct { // agent methods use this, so use with care and never override // outside of a unit test. endpoints map[string]string + + // dnsAddr is the address the DNS server binds to + dnsAddr net.Addr + + // dnsServer provides the DNS API + dnsServers []*DNSServer + + // httpAddrs are the addresses per protocol the HTTP server binds to + httpAddrs map[string][]net.Addr + + // httpServers provides the HTTP API on various endpoints + httpServers []*HTTPServer + + // wgServers is the wait group for all HTTP and DNS servers + wgServers sync.WaitGroup } // Create is used to create a new Agent. Returns // the agent or potentially an error. -func Create(config *Config, logOutput io.Writer, logWriter *logger.LogWriter, reloadCh chan chan error) (*Agent, error) { - // Ensure we have a log sink +func Create(c *Config, logOutput io.Writer, logWriter *logger.LogWriter, reloadCh chan chan error) (*Agent, error) { + a, err := NewAgent(c, logOutput, logWriter, reloadCh) + if err != nil { + return nil, err + } + if err := a.Start(); err != nil { + return nil, err + } + return a, nil +} + +func NewAgent(c *Config, logOutput io.Writer, logWriter *logger.LogWriter, reloadCh chan chan error) (*Agent, error) { if logOutput == nil { logOutput = os.Stderr } - - // Validate the config - if config.Datacenter == "" { + if c.Datacenter == "" { return nil, fmt.Errorf("Must configure a Datacenter") } - if config.DataDir == "" && !config.DevMode { + if c.DataDir == "" && !c.DevMode { return nil, fmt.Errorf("Must configure a DataDir") } + dnsAddr, err := c.ClientListener(c.Addresses.DNS, c.Ports.DNS) + if err != nil { + return nil, fmt.Errorf("Invalid DNS bind address: %s", err) + } + httpAddrs, err := c.HTTPAddrs() + if err != nil { + return nil, fmt.Errorf("Invalid HTTP bind address: %s", err) + } + acls, err := newACLManager(c) + if err != nil { + return nil, err + } - agent := &Agent{ - config: config, - logger: log.New(logOutput, "", log.LstdFlags), + a := &Agent{ + config: c, + acls: acls, logOutput: logOutput, logWriter: logWriter, checkReapAfter: make(map[types.CheckID]time.Duration), @@ -175,79 +212,200 @@ func Create(config *Config, logOutput io.Writer, logWriter *logger.LogWriter, re reloadCh: reloadCh, shutdownCh: make(chan struct{}), endpoints: make(map[string]string), + dnsAddr: dnsAddr, + httpAddrs: httpAddrs, } - if err := agent.resolveTmplAddrs(); err != nil { + if err := a.resolveTmplAddrs(); err != nil { return nil, err } + return a, nil +} - // Initialize the ACL manager. - acls, err := newACLManager(config) - if err != nil { - return nil, err - } - agent.acls = acls +func (a *Agent) Start() error { + c := a.config + + a.logger = log.New(a.logOutput, "", log.LstdFlags) // Retrieve or generate the node ID before setting up the rest of the // agent, which depends on it. - if err := agent.setupNodeID(config); err != nil { - return nil, fmt.Errorf("Failed to setup node ID: %v", err) + if err := a.setupNodeID(c); err != nil { + return fmt.Errorf("Failed to setup node ID: %v", err) } // Initialize the local state. - agent.state.Init(config, agent.logger) + a.state.Init(c, a.logger) // Setup either the client or the server. - if config.Server { - err = agent.setupServer() - agent.state.SetIface(agent.delegate) + if c.Server { + server, err := a.makeServer() + if err != nil { + return err + } + + a.delegate = server + a.state.SetIface(server) // Automatically register the "consul" service on server nodes consulService := structs.NodeService{ Service: consul.ConsulServiceName, ID: consul.ConsulServiceID, - Port: agent.config.Ports.Server, + Port: c.Ports.Server, Tags: []string{}, } - agent.state.AddService(&consulService, agent.config.GetTokenForAgent()) + a.state.AddService(&consulService, c.GetTokenForAgent()) } else { - err = agent.setupClient() - agent.state.SetIface(agent.delegate) - } - if err != nil { - return nil, err + client, err := a.makeClient() + if err != nil { + return err + } + + a.delegate = client + a.state.SetIface(client) } // Load checks/services/metadata. - if err := agent.loadServices(config); err != nil { - return nil, err + if err := a.loadServices(c); err != nil { + return err } - if err := agent.loadChecks(config); err != nil { - return nil, err + if err := a.loadChecks(c); err != nil { + return err } - if err := agent.loadMetadata(config); err != nil { - return nil, err + if err := a.loadMetadata(c); err != nil { + return err } // Start watching for critical services to deregister, based on their // checks. - go agent.reapServices() + go a.reapServices() // Start handling events. - go agent.handleEvents() + go a.handleEvents() // Start sending network coordinate to the server. - if !config.DisableCoordinates { - go agent.sendCoordinate() + if !c.DisableCoordinates { + go a.sendCoordinate() } // Write out the PID file if necessary. - err = agent.storePid() - if err != nil { - return nil, err + if err := a.storePid(); err != nil { + return err } - return agent, nil + // start dns server + if c.Ports.DNS > 0 { + srv, err := NewDNSServer(a, &c.DNSConfig, a.logOutput, c.Domain, a.dnsAddr.String(), c.DNSRecursors) + if err != nil { + return fmt.Errorf("error starting DNS server: %s", err) + } + a.dnsServers = []*DNSServer{srv} + } + + // start HTTP servers + return a.startHTTP(a.httpAddrs) +} + +func (a *Agent) startHTTP(httpAddrs map[string][]net.Addr) error { + // ln contains the list of pending listeners until the + // actual server is created and the listeners are used. + var ln []net.Listener + + // cleanup the listeners on error. ln should be empty on success. + defer func() { + for _, l := range ln { + l.Close() + } + }() + + // bind to the listeners for all addresses and protocols + // before we start the servers so that we can fail early + // if we can't bind to one of the addresses. + for proto, addrs := range httpAddrs { + for _, addr := range addrs { + switch addr.(type) { + case *net.UnixAddr: + switch proto { + case "http": + if _, err := os.Stat(addr.String()); !os.IsNotExist(err) { + a.logger.Printf("[WARN] agent: Replacing socket %q", addr.String()) + } + l, err := ListenUnix(addr.String(), a.config.UnixSockets) + if err != nil { + return err + } + ln = append(ln, l) + + default: + return fmt.Errorf("invalid protocol: %q", proto) + } + + case *net.TCPAddr: + switch proto { + case "http": + l, err := ListenTCP(addr.String()) + if err != nil { + return err + } + ln = append(ln, l) + + case "https": + tlscfg, err := a.config.IncomingTLSConfig() + if err != nil { + return fmt.Errorf("invalid TLS configuration: %s", err) + } + l, err := ListenTLS(addr.String(), tlscfg) + if err != nil { + return err + } + ln = append(ln, l) + + default: + return fmt.Errorf("invalid protocol: %q", proto) + } + + default: + return fmt.Errorf("invalid address type: %T", addr) + } + } + } + + // https://github.com/golang/go/issues/20239 + // + // In go1.8.1 there is a race between Serve and Shutdown. If + // Shutdown is called before the Serve go routine was scheduled then + // the Serve go routine never returns. This deadlocks the agent + // shutdown for some tests since it will wait forever. + // + // We solve this with another WaitGroup which checks that the Serve + // go routine was called and after that it should be safe to call + // Shutdown on that server. + var up sync.WaitGroup + for _, l := range ln { + l := l // capture loop var + + // create a server per listener instead of a single + // server with multiple listeners to take advantage + // of the Addr field for logging. Since the server + // does not keep state and they all share the same + // agent there is no overhead. + addr := l.Addr().String() + srv := NewHTTPServer(addr, a) + a.httpServers = append(a.httpServers, srv) + + up.Add(1) + a.wgServers.Add(1) + go func() { + defer a.wgServers.Done() + up.Done() + a.logger.Printf("[INFO] agent: Starting HTTP server on %s", addr) + if err := srv.Serve(l); err != nil && err != http.ErrServerClosed { + a.logger.Print(err) + } + }() + } + up.Wait() + ln = nil + return nil } // consulConfig is used to return a consul configuration @@ -612,38 +770,36 @@ func (a *Agent) resolveTmplAddrs() error { return nil } -// setupServer is used to initialize the Consul server -func (a *Agent) setupServer() error { +// makeServer creates a new consul server. +func (a *Agent) makeServer() (*consul.Server, error) { config, err := a.consulConfig() if err != nil { - return err + return nil, err } if err := a.setupKeyrings(config); err != nil { - return fmt.Errorf("Failed to configure keyring: %v", err) + return nil, fmt.Errorf("Failed to configure keyring: %v", err) } server, err := consul.NewServer(config) if err != nil { - return fmt.Errorf("Failed to start Consul server: %v", err) + return nil, fmt.Errorf("Failed to start Consul server: %v", err) } - a.delegate = server - return nil + return server, nil } -// setupClient is used to initialize the Consul client -func (a *Agent) setupClient() error { +// makeClient creates a new consul client. +func (a *Agent) makeClient() (*consul.Client, error) { config, err := a.consulConfig() if err != nil { - return err + return nil, err } if err := a.setupKeyrings(config); err != nil { - return fmt.Errorf("Failed to configure keyring: %v", err) + return nil, fmt.Errorf("Failed to configure keyring: %v", err) } client, err := consul.NewClient(config) if err != nil { - return fmt.Errorf("Failed to start Consul client: %v", err) + return nil, fmt.Errorf("Failed to start Consul client: %v", err) } - a.delegate = client - return nil + return client, nil } // makeRandomID will generate a random UUID for a node. @@ -830,6 +986,27 @@ func (a *Agent) Shutdown() error { if a.shutdown { return nil } + a.logger.Println("[INFO] agent: Requesting shutdown") + + // Stop all API endpoints + a.logger.Println("[INFO] agent: Stopping DNS endpoints") + for _, srv := range a.dnsServers { + srv.Shutdown() + } + for _, srv := range a.httpServers { + a.logger.Println("[INFO] agent: Stopping HTTP endpoint", srv.Addr) + + // old behavior: just die + // srv.Close() + + // graceful shutdown + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + srv.Shutdown(ctx) + } + a.logger.Println("[INFO] agent: Waiting for endpoints to shut down") + a.wgServers.Wait() + a.logger.Print("[INFO] agent: Endpoints down") // Stop all the checks a.checkLock.Lock() @@ -849,8 +1026,8 @@ func (a *Agent) Shutdown() error { chk.Stop() } - a.logger.Println("[INFO] agent: requesting shutdown") err := a.delegate.Shutdown() + a.logger.Print("[INFO] agent: delegate down") pidErr := a.deletePid() if pidErr != nil { diff --git a/command/agent/agent_endpoint.go b/command/agent/agent_endpoint.go index 628ad403dc..f67805a312 100644 --- a/command/agent/agent_endpoint.go +++ b/command/agent/agent_endpoint.go @@ -223,7 +223,7 @@ func (s *HTTPServer) AgentForceLeave(resp http.ResponseWriter, req *http.Request // only warn because the write did succeed and anti-entropy will sync later. func (s *HTTPServer) syncChanges() { if err := s.agent.state.syncChanges(); err != nil { - s.logger.Printf("[ERR] agent: failed to sync changes: %v", err) + s.agent.logger.Printf("[ERR] agent: failed to sync changes: %v", err) } } @@ -654,7 +654,7 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) ( handler := &httpLogHandler{ filter: filter, logCh: make(chan string, 512), - logger: s.logger, + logger: s.agent.logger, } s.agent.logWriter.RegisterHandler(handler) defer s.agent.logWriter.DeregisterHandler(handler) diff --git a/command/agent/agent_endpoint_test.go b/command/agent/agent_endpoint_test.go index a876c617d6..825c55fc8b 100644 --- a/command/agent/agent_endpoint_test.go +++ b/command/agent/agent_endpoint_test.go @@ -43,7 +43,6 @@ func makeReadOnlyAgentACL(t *testing.T, srv *HTTPServer) string { func TestAgent_Services(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() srv1 := &structs.NodeService{ @@ -71,7 +70,6 @@ func TestAgent_Services(t *testing.T) { func TestAgent_Services_ACLFilter(t *testing.T) { dir, srv := makeHTTPServerWithACLs(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() t.Run("no token", func(t *testing.T) { @@ -102,7 +100,6 @@ func TestAgent_Services_ACLFilter(t *testing.T) { func TestAgent_Checks(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() chk1 := &structs.HealthCheck{ @@ -130,7 +127,6 @@ func TestAgent_Checks(t *testing.T) { func TestAgent_Checks_ACLFilter(t *testing.T) { dir, srv := makeHTTPServerWithACLs(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() chk1 := &structs.HealthCheck{ @@ -174,7 +170,6 @@ func TestAgent_Self(t *testing.T) { conf.Meta = meta }) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() req, _ := http.NewRequest("GET", "/v1/agent/self", nil) @@ -216,7 +211,6 @@ func TestAgent_Self(t *testing.T) { func TestAgent_Self_ACLDeny(t *testing.T) { dir, srv := makeHTTPServerWithACLs(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() t.Run("no token", func(t *testing.T) { @@ -285,7 +279,10 @@ func TestAgent_Reload(t *testing.T) { }() retry.Run(t, func(r *retry.R) { - if got, want := len(cmd.httpServers), 1; got != want { + if cmd == nil || cmd.agent == nil { + r.Fatal("waiting for agent") + } + if got, want := len(cmd.agent.httpServers), 1; got != want { r.Fatalf("got %d servers want %d", got, want) } }) @@ -299,7 +296,7 @@ func TestAgent_Reload(t *testing.T) { t.Fatalf("err: %v", err) } - srv := cmd.httpServers[0] + srv := cmd.agent.httpServers[0] req, _ := http.NewRequest("PUT", "/v1/agent/reload", nil) if _, err := srv.AgentReload(nil, req); err != nil { t.Fatalf("Err: %v", err) @@ -313,7 +310,6 @@ func TestAgent_Reload(t *testing.T) { func TestAgent_Reload_ACLDeny(t *testing.T) { dir, srv := makeHTTPServerWithACLs(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() t.Run("no token", func(t *testing.T) { @@ -340,7 +336,6 @@ func TestAgent_Reload_ACLDeny(t *testing.T) { func TestAgent_Members(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() req, _ := http.NewRequest("GET", "/v1/agent/members", nil) @@ -361,7 +356,6 @@ func TestAgent_Members(t *testing.T) { func TestAgent_Members_WAN(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() req, _ := http.NewRequest("GET", "/v1/agent/members?wan=true", nil) @@ -382,7 +376,6 @@ func TestAgent_Members_WAN(t *testing.T) { func TestAgent_Members_ACLFilter(t *testing.T) { dir, srv := makeHTTPServerWithACLs(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() t.Run("no token", func(t *testing.T) { @@ -413,7 +406,6 @@ func TestAgent_Members_ACLFilter(t *testing.T) { func TestAgent_Join(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() dir2, a2 := makeAgent(t, nextConfig()) @@ -444,7 +436,6 @@ func TestAgent_Join(t *testing.T) { func TestAgent_Join_WAN(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() dir2, a2 := makeAgent(t, nextConfig()) @@ -475,7 +466,6 @@ func TestAgent_Join_WAN(t *testing.T) { func TestAgent_Join_ACLDeny(t *testing.T) { dir, srv := makeHTTPServerWithACLs(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() dir2, a2 := makeAgent(t, nextConfig()) @@ -510,7 +500,6 @@ func TestAgent_Join_ACLDeny(t *testing.T) { func TestAgent_Leave(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() dir2, srv2 := makeHTTPServerWithConfig(t, func(c *Config) { @@ -518,7 +507,7 @@ func TestAgent_Leave(t *testing.T) { c.Bootstrap = false }) defer os.RemoveAll(dir2) - defer srv2.Shutdown() + defer srv2.agent.Shutdown() // Join first addr := fmt.Sprintf("127.0.0.1:%d", srv2.agent.config.Ports.SerfLan) @@ -547,7 +536,6 @@ func TestAgent_Leave(t *testing.T) { func TestAgent_Leave_ACLDeny(t *testing.T) { dir, srv := makeHTTPServerWithACLs(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() t.Run("no token", func(t *testing.T) { @@ -578,7 +566,6 @@ func TestAgent_Leave_ACLDeny(t *testing.T) { func TestAgent_ForceLeave(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() dir2, a2 := makeAgent(t, nextConfig()) @@ -615,7 +602,6 @@ func TestAgent_ForceLeave(t *testing.T) { func TestAgent_ForceLeave_ACLDeny(t *testing.T) { dir, srv := makeHTTPServerWithACLs(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() t.Run("no token", func(t *testing.T) { @@ -644,7 +630,6 @@ func TestAgent_ForceLeave_ACLDeny(t *testing.T) { func TestAgent_RegisterCheck(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() // Register node @@ -686,7 +671,6 @@ func TestAgent_RegisterCheck(t *testing.T) { func TestAgent_RegisterCheck_Passing(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() // Register node @@ -723,7 +707,6 @@ func TestAgent_RegisterCheck_Passing(t *testing.T) { func TestAgent_RegisterCheck_BadStatus(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() // Register node @@ -745,7 +728,6 @@ func TestAgent_RegisterCheck_BadStatus(t *testing.T) { func TestAgent_RegisterCheck_ACLDeny(t *testing.T) { dir, srv := makeHTTPServerWithACLs(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() args := &CheckDefinition{ @@ -771,7 +753,6 @@ func TestAgent_RegisterCheck_ACLDeny(t *testing.T) { func TestAgent_DeregisterCheck(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() chk := &structs.HealthCheck{Name: "test", CheckID: "test"} @@ -798,7 +779,6 @@ func TestAgent_DeregisterCheck(t *testing.T) { func TestAgent_DeregisterCheckACLDeny(t *testing.T) { dir, srv := makeHTTPServerWithACLs(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() chk := &structs.HealthCheck{Name: "test", CheckID: "test"} @@ -824,7 +804,6 @@ func TestAgent_DeregisterCheckACLDeny(t *testing.T) { func TestAgent_PassCheck(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() chk := &structs.HealthCheck{Name: "test", CheckID: "test"} @@ -852,7 +831,6 @@ func TestAgent_PassCheck(t *testing.T) { func TestAgent_PassCheck_ACLDeny(t *testing.T) { dir, srv := makeHTTPServerWithACLs(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() chk := &structs.HealthCheck{Name: "test", CheckID: "test"} @@ -879,7 +857,6 @@ func TestAgent_PassCheck_ACLDeny(t *testing.T) { func TestAgent_WarnCheck(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() chk := &structs.HealthCheck{Name: "test", CheckID: "test"} @@ -907,7 +884,6 @@ func TestAgent_WarnCheck(t *testing.T) { func TestAgent_WarnCheck_ACLDeny(t *testing.T) { dir, srv := makeHTTPServerWithACLs(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() chk := &structs.HealthCheck{Name: "test", CheckID: "test"} @@ -934,7 +910,6 @@ func TestAgent_WarnCheck_ACLDeny(t *testing.T) { func TestAgent_FailCheck(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() chk := &structs.HealthCheck{Name: "test", CheckID: "test"} @@ -962,7 +937,6 @@ func TestAgent_FailCheck(t *testing.T) { func TestAgent_FailCheck_ACLDeny(t *testing.T) { dir, srv := makeHTTPServerWithACLs(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() chk := &structs.HealthCheck{Name: "test", CheckID: "test"} @@ -989,7 +963,6 @@ func TestAgent_FailCheck_ACLDeny(t *testing.T) { func TestAgent_UpdateCheck(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() chk := &structs.HealthCheck{Name: "test", CheckID: "test"} @@ -1089,7 +1062,6 @@ func TestAgent_UpdateCheck(t *testing.T) { func TestAgent_UpdateCheck_ACLDeny(t *testing.T) { dir, srv := makeHTTPServerWithACLs(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() chk := &structs.HealthCheck{Name: "test", CheckID: "test"} @@ -1118,7 +1090,6 @@ func TestAgent_UpdateCheck_ACLDeny(t *testing.T) { func TestAgent_RegisterService(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() args := &ServiceDefinition{ @@ -1171,7 +1142,6 @@ func TestAgent_RegisterService(t *testing.T) { func TestAgent_RegisterService_ACLDeny(t *testing.T) { dir, srv := makeHTTPServerWithACLs(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() args := &ServiceDefinition{ @@ -1209,7 +1179,6 @@ func TestAgent_RegisterService_ACLDeny(t *testing.T) { func TestAgent_RegisterService_InvalidAddress(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() for _, addr := range []string{"0.0.0.0", "::", "[::]"} { @@ -1238,7 +1207,6 @@ func TestAgent_RegisterService_InvalidAddress(t *testing.T) { func TestAgent_DeregisterService(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() service := &structs.NodeService{ @@ -1271,7 +1239,6 @@ func TestAgent_DeregisterService(t *testing.T) { func TestAgent_DeregisterService_ACLDeny(t *testing.T) { dir, srv := makeHTTPServerWithACLs(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() service := &structs.NodeService{ @@ -1300,7 +1267,6 @@ func TestAgent_DeregisterService_ACLDeny(t *testing.T) { func TestAgent_ServiceMaintenance_BadRequest(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() t.Run("not PUT", func(t *testing.T) { @@ -1351,7 +1317,6 @@ func TestAgent_ServiceMaintenance_BadRequest(t *testing.T) { func TestAgent_ServiceMaintenance_Enable(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() // Register the service @@ -1394,7 +1359,6 @@ func TestAgent_ServiceMaintenance_Enable(t *testing.T) { func TestAgent_ServiceMaintenance_Disable(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() // Register the service @@ -1431,7 +1395,6 @@ func TestAgent_ServiceMaintenance_Disable(t *testing.T) { func TestAgent_ServiceMaintenance_ACLDeny(t *testing.T) { dir, srv := makeHTTPServerWithACLs(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() // Register the service. @@ -1461,7 +1424,6 @@ func TestAgent_ServiceMaintenance_ACLDeny(t *testing.T) { func TestAgent_NodeMaintenance_BadRequest(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() // Fails on non-PUT @@ -1488,7 +1450,6 @@ func TestAgent_NodeMaintenance_BadRequest(t *testing.T) { func TestAgent_NodeMaintenance_Enable(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() // Force the node into maintenance mode @@ -1521,7 +1482,6 @@ func TestAgent_NodeMaintenance_Enable(t *testing.T) { func TestAgent_NodeMaintenance_Disable(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() // Force the node into maintenance mode @@ -1546,7 +1506,6 @@ func TestAgent_NodeMaintenance_Disable(t *testing.T) { func TestAgent_NodeMaintenance_ACLDeny(t *testing.T) { dir, srv := makeHTTPServerWithACLs(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() t.Run("no token", func(t *testing.T) { @@ -1567,7 +1526,6 @@ func TestAgent_NodeMaintenance_ACLDeny(t *testing.T) { func TestAgent_RegisterCheck_Service(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() args := &ServiceDefinition{ @@ -1616,7 +1574,6 @@ func TestAgent_Monitor(t *testing.T) { dir, srv := makeHTTPServerWithConfigLog(t, nil, logger, logWriter) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() // Try passing an invalid log level @@ -1678,7 +1635,6 @@ func (r *closableRecorder) CloseNotify() <-chan bool { func TestAgent_Monitor_ACLDeny(t *testing.T) { dir, srv := makeHTTPServerWithACLs(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() // Try without a token. diff --git a/command/agent/catalog_endpoint_test.go b/command/agent/catalog_endpoint_test.go index d0bc340d5f..41556b8f92 100644 --- a/command/agent/catalog_endpoint_test.go +++ b/command/agent/catalog_endpoint_test.go @@ -17,7 +17,6 @@ import ( func TestCatalogRegister(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -53,7 +52,6 @@ func TestCatalogRegister(t *testing.T) { func TestCatalogRegister_Service_InvalidAddress(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -81,7 +79,6 @@ func TestCatalogRegister_Service_InvalidAddress(t *testing.T) { func TestCatalogDeregister(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -103,7 +100,6 @@ func TestCatalogDeregister(t *testing.T) { func TestCatalogDatacenters(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() retry.Run(t, func(r *retry.R) { @@ -122,7 +118,6 @@ func TestCatalogDatacenters(t *testing.T) { func TestCatalogNodes(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -158,7 +153,6 @@ func TestCatalogNodes(t *testing.T) { func TestCatalogNodes_MetaFilter(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -206,7 +200,6 @@ func TestCatalogNodes_WanTranslation(t *testing.T) { c.ACLDatacenter = "" }) defer os.RemoveAll(dir1) - defer srv1.Shutdown() defer srv1.agent.Shutdown() testrpc.WaitForLeader(t, srv1.agent.RPC, "dc1") @@ -217,7 +210,6 @@ func TestCatalogNodes_WanTranslation(t *testing.T) { c.ACLDatacenter = "" }) defer os.RemoveAll(dir2) - defer srv2.Shutdown() defer srv2.agent.Shutdown() testrpc.WaitForLeader(t, srv2.agent.RPC, "dc2") @@ -302,7 +294,6 @@ func TestCatalogNodes_WanTranslation(t *testing.T) { func TestCatalogNodes_Blocking(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -358,7 +349,6 @@ func TestCatalogNodes_Blocking(t *testing.T) { func TestCatalogNodes_DistanceSort(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -445,7 +435,6 @@ func TestCatalogNodes_DistanceSort(t *testing.T) { func TestCatalogServices(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -483,7 +472,6 @@ func TestCatalogServices(t *testing.T) { func TestCatalogServices_NodeMetaFilter(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -527,7 +515,6 @@ func TestCatalogServices_NodeMetaFilter(t *testing.T) { func TestCatalogServiceNodes(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -583,7 +570,6 @@ func TestCatalogServiceNodes(t *testing.T) { func TestCatalogServiceNodes_NodeMetaFilter(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -646,7 +632,6 @@ func TestCatalogServiceNodes_WanTranslation(t *testing.T) { c.ACLDatacenter = "" }) defer os.RemoveAll(dir1) - defer srv1.Shutdown() defer srv1.agent.Shutdown() testrpc.WaitForLeader(t, srv1.agent.RPC, "dc1") @@ -657,7 +642,6 @@ func TestCatalogServiceNodes_WanTranslation(t *testing.T) { c.ACLDatacenter = "" }) defer os.RemoveAll(dir2) - defer srv2.Shutdown() defer srv2.agent.Shutdown() testrpc.WaitForLeader(t, srv2.agent.RPC, "dc2") @@ -734,7 +718,6 @@ func TestCatalogServiceNodes_WanTranslation(t *testing.T) { func TestCatalogServiceNodes_DistanceSort(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -824,7 +807,6 @@ func TestCatalogServiceNodes_DistanceSort(t *testing.T) { func TestCatalogNodeServices(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -867,7 +849,6 @@ func TestCatalogNodeServices_WanTranslation(t *testing.T) { c.ACLDatacenter = "" }) defer os.RemoveAll(dir1) - defer srv1.Shutdown() defer srv1.agent.Shutdown() testrpc.WaitForLeader(t, srv1.agent.RPC, "dc1") @@ -878,7 +859,6 @@ func TestCatalogNodeServices_WanTranslation(t *testing.T) { c.ACLDatacenter = "" }) defer os.RemoveAll(dir2) - defer srv2.Shutdown() defer srv2.agent.Shutdown() testrpc.WaitForLeader(t, srv2.agent.RPC, "dc2") diff --git a/command/agent/command.go b/command/agent/command.go index 4bc415e2d7..fa7ff547ef 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -50,8 +50,6 @@ type Command struct { logFilter *logutils.LevelFilter logOutput io.Writer agent *Agent - httpServers []*HTTPServer - dnsServer *DNSServer } // readConfig is responsible for setup of our configuration using @@ -455,70 +453,6 @@ func (c *Command) readConfig() *Config { return config } -// setupAgent is used to start the agent and various interfaces -func (c *Command) setupAgent(config *Config, logOutput io.Writer, logWriter *logger.LogWriter) error { - c.UI.Output("Starting Consul agent...") - agent, err := Create(config, logOutput, logWriter, c.configReloadCh) - if err != nil { - c.UI.Error(fmt.Sprintf("Error starting agent: %s", err)) - return err - } - c.agent = agent - - if config.Ports.HTTP > 0 || config.Ports.HTTPS > 0 { - servers, err := NewHTTPServers(agent) - if err != nil { - agent.Shutdown() - c.UI.Error(fmt.Sprintf("Error starting http servers: %s", err)) - return err - } - c.httpServers = servers - } - - if config.Ports.DNS > 0 { - dnsAddr, err := config.ClientListener(config.Addresses.DNS, config.Ports.DNS) - if err != nil { - agent.Shutdown() - c.UI.Error(fmt.Sprintf("Invalid DNS bind address: %s", err)) - return err - } - - server, err := NewDNSServer(agent, &config.DNSConfig, logOutput, - config.Domain, dnsAddr.String(), config.DNSRecursors) - if err != nil { - agent.Shutdown() - c.UI.Error(fmt.Sprintf("Error starting dns server: %s", err)) - return err - } - c.dnsServer = server - } - - // Setup update checking - if !config.DisableUpdateCheck { - version := config.Version - if config.VersionPrerelease != "" { - version += fmt.Sprintf("-%s", config.VersionPrerelease) - } - updateParams := &checkpoint.CheckParams{ - Product: "consul", - Version: version, - } - if !config.DisableAnonymousSignature { - updateParams.SignatureFile = filepath.Join(config.DataDir, "checkpoint-signature") - } - - // Schedule a periodic check with expected interval of 24 hours - checkpoint.CheckInterval(updateParams, 24*time.Hour, c.checkpointResults) - - // Do an immediate check within the next 30 seconds - go func() { - time.Sleep(lib.RandomStagger(30 * time.Second)) - c.checkpointResults(checkpoint.Check(updateParams)) - }() - } - return nil -} - // checkpointResults is used to handler periodic results from our update checker func (c *Command) checkpointResults(results *checkpoint.CheckResponse, err error) { if err != nil { @@ -806,16 +740,39 @@ func (c *Command) Run(args []string) int { } // Create the agent - if err := c.setupAgent(config, logOutput, logWriter); err != nil { + c.UI.Output("Starting Consul agent...") + agent, err := Create(config, logOutput, logWriter, c.configReloadCh) + if err != nil { + c.UI.Error(fmt.Sprintf("Error starting agent: %s", err)) return 1 } + c.agent = agent + + // Setup update checking + if !config.DisableUpdateCheck { + version := config.Version + if config.VersionPrerelease != "" { + version += fmt.Sprintf("-%s", config.VersionPrerelease) + } + updateParams := &checkpoint.CheckParams{ + Product: "consul", + Version: version, + } + if !config.DisableAnonymousSignature { + updateParams.SignatureFile = filepath.Join(config.DataDir, "checkpoint-signature") + } + + // Schedule a periodic check with expected interval of 24 hours + checkpoint.CheckInterval(updateParams, 24*time.Hour, c.checkpointResults) + + // Do an immediate check within the next 30 seconds + go func() { + time.Sleep(lib.RandomStagger(30 * time.Second)) + c.checkpointResults(checkpoint.Check(updateParams)) + }() + } + defer c.agent.Shutdown() - if c.dnsServer != nil { - defer c.dnsServer.Shutdown() - } - for _, server := range c.httpServers { - defer server.Shutdown() - } // Join startup nodes if specified if err := c.startupJoin(config); err != nil { @@ -831,7 +788,6 @@ func (c *Command) Run(args []string) int { // Get the new client http listener addr var httpAddr net.Addr - var err error if config.Ports.HTTP != -1 { httpAddr, err = config.ClientListener(config.Addresses.HTTP, config.Ports.HTTP) } else if config.Ports.HTTPS != -1 { diff --git a/command/agent/config.go b/command/agent/config.go index c49af33f00..07e67e1424 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -1,6 +1,7 @@ package agent import ( + "crypto/tls" "encoding/base64" "encoding/json" "fmt" @@ -760,6 +761,47 @@ type Config struct { DeprecatedAtlasEndpoint string `mapstructure:"atlas_endpoint" json:"-"` } +// IncomingTLSConfig returns the TLS configuration for TLS +// connections to consul. +func (c *Config) IncomingTLSConfig() (*tls.Config, error) { + tc := &tlsutil.Config{ + VerifyIncoming: c.VerifyIncoming || c.VerifyIncomingHTTPS, + VerifyOutgoing: c.VerifyOutgoing, + CAFile: c.CAFile, + CAPath: c.CAPath, + CertFile: c.CertFile, + KeyFile: c.KeyFile, + NodeName: c.NodeName, + ServerName: c.ServerName, + TLSMinVersion: c.TLSMinVersion, + CipherSuites: c.TLSCipherSuites, + PreferServerCipherSuites: c.TLSPreferServerCipherSuites, + } + return tc.IncomingTLSConfig() +} + +// HTTPAddrs returns the bind addresses for the HTTP server and +// the application protocol which should be served, e.g. 'http' +// or 'https'. +func (c *Config) HTTPAddrs() (map[string][]net.Addr, error) { + m := map[string][]net.Addr{} + if c.Ports.HTTP > 0 { + a, err := c.ClientListener(c.Addresses.HTTP, c.Ports.HTTP) + if err != nil { + return nil, err + } + m["http"] = []net.Addr{a} + } + if c.Ports.HTTPS > 0 { + a, err := c.ClientListener(c.Addresses.HTTPS, c.Ports.HTTPS) + if err != nil { + return nil, err + } + m["https"] = []net.Addr{a} + } + return m, nil +} + // Bool is used to initialize bool pointers in struct literals. func Bool(b bool) *bool { return &b @@ -914,13 +956,10 @@ func (c *Config) EncryptBytes() ([]byte, error) { // ClientListener is used to format a listener for a // port on a ClientAddr func (c *Config) ClientListener(override string, port int) (net.Addr, error) { - var addr string + addr := c.ClientAddr if override != "" { addr = override - } else { - addr = c.ClientAddr } - if path := socketPath(addr); path != "" { return &net.UnixAddr{Name: path, Net: "unix"}, nil } diff --git a/command/agent/coordinate_endpoint_test.go b/command/agent/coordinate_endpoint_test.go index b24c1a1fca..64fafce66f 100644 --- a/command/agent/coordinate_endpoint_test.go +++ b/command/agent/coordinate_endpoint_test.go @@ -15,7 +15,6 @@ import ( func TestCoordinate_Datacenters(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -39,7 +38,6 @@ func TestCoordinate_Datacenters(t *testing.T) { func TestCoordinate_Nodes(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") diff --git a/command/agent/dns_test.go b/command/agent/dns_test.go index 9e23586c85..21941fb126 100644 --- a/command/agent/dns_test.go +++ b/command/agent/dns_test.go @@ -34,35 +34,25 @@ func makeDNSServer(t *testing.T) (string, *DNSServer) { return makeDNSServerConfig(t, nil, nil) } -func makeDNSServerConfig( - t *testing.T, - agentFn func(c *Config), - dnsFn func(*DNSConfig)) (string, *DNSServer) { +func makeDNSServerConfig(t *testing.T, agentFn func(c *Config), dnsFn func(*DNSConfig)) (string, *DNSServer) { // Create the configs and apply the functions - agentConf := nextConfig() + c := nextConfig() if agentFn != nil { - agentFn(agentConf) + agentFn(c) } - dnsConf := &DefaultConfig().DNSConfig + c.DNSConfig = DefaultConfig().DNSConfig if dnsFn != nil { - dnsFn(dnsConf) + dnsFn(&c.DNSConfig) } // Add in the recursor if any - if r := agentConf.DNSRecursor; r != "" { - agentConf.DNSRecursors = append(agentConf.DNSRecursors, r) + if r := c.DNSRecursor; r != "" { + c.DNSRecursors = append(c.DNSRecursors, r) } // Start the server - addr, _ := agentConf.ClientListener(agentConf.Addresses.DNS, agentConf.Ports.DNS) - dir, agent := makeAgent(t, agentConf) - server, err := NewDNSServer(agent, dnsConf, agent.logOutput, - agentConf.Domain, addr.String(), agentConf.DNSRecursors) - if err != nil { - t.Fatalf("err: %v", err) - } - - return dir, server + dir, agent := makeAgent(t, c) + return dir, agent.dnsServers[0] } // makeRecursor creates a generic DNS server which always returns @@ -1283,7 +1273,7 @@ func TestDNS_ServiceLookup_WanAddress(t *testing.T) { c.ACLDatacenter = "" }, nil) defer os.RemoveAll(dir1) - defer srv1.Shutdown() + defer srv1.agent.Shutdown() dir2, srv2 := makeDNSServerConfig(t, func(c *Config) { c.Datacenter = "dc2" @@ -1291,7 +1281,7 @@ func TestDNS_ServiceLookup_WanAddress(t *testing.T) { c.ACLDatacenter = "" }, nil) defer os.RemoveAll(dir2) - defer srv2.Shutdown() + defer srv2.agent.Shutdown() testrpc.WaitForLeader(t, srv1.agent.RPC, "dc1") testrpc.WaitForLeader(t, srv2.agent.RPC, "dc2") @@ -2421,6 +2411,9 @@ func TestDNS_ServiceLookup_OnlyPassing(t *testing.T) { } // Only 1 is passing, so we should only get 1 answer + for _, a := range in.Answer { + fmt.Println(question, a) + } if len(in.Answer) != 1 { t.Fatalf("Bad: %#v", in) } @@ -3364,14 +3357,14 @@ func TestDNS_PreparedQuery_Failover(t *testing.T) { c.TranslateWanAddrs = true }, nil) defer os.RemoveAll(dir1) - defer srv1.Shutdown() + defer srv1.agent.Shutdown() dir2, srv2 := makeDNSServerConfig(t, func(c *Config) { c.Datacenter = "dc2" c.TranslateWanAddrs = true }, nil) defer os.RemoveAll(dir2) - defer srv2.Shutdown() + defer srv2.agent.Shutdown() testrpc.WaitForLeader(t, srv1.agent.RPC, "dc1") testrpc.WaitForLeader(t, srv2.agent.RPC, "dc2") diff --git a/command/agent/event_endpoint_test.go b/command/agent/event_endpoint_test.go index ee8a7484db..c04e9c6a3c 100644 --- a/command/agent/event_endpoint_test.go +++ b/command/agent/event_endpoint_test.go @@ -178,7 +178,6 @@ func TestEventList_Filter(t *testing.T) { func TestEventList_ACLFilter(t *testing.T) { dir, srv := makeHTTPServerWithACLs(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() // Fire an event. diff --git a/command/agent/health_endpoint_test.go b/command/agent/health_endpoint_test.go index 13f165d65e..e364b249bd 100644 --- a/command/agent/health_endpoint_test.go +++ b/command/agent/health_endpoint_test.go @@ -98,7 +98,6 @@ func TestHealthChecksInState_NodeMetaFilter(t *testing.T) { func TestHealthChecksInState_DistanceSort(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -175,7 +174,6 @@ func TestHealthChecksInState_DistanceSort(t *testing.T) { func TestHealthNodeChecks(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -212,7 +210,6 @@ func TestHealthNodeChecks(t *testing.T) { func TestHealthServiceChecks(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -266,7 +263,6 @@ func TestHealthServiceChecks(t *testing.T) { func TestHealthServiceChecks_NodeMetaFilter(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -321,7 +317,6 @@ func TestHealthServiceChecks_NodeMetaFilter(t *testing.T) { func TestHealthServiceChecks_DistanceSort(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -403,7 +398,6 @@ func TestHealthServiceChecks_DistanceSort(t *testing.T) { func TestHealthServiceNodes(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -472,7 +466,6 @@ func TestHealthServiceNodes(t *testing.T) { func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -527,7 +520,6 @@ func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) { func TestHealthServiceNodes_DistanceSort(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -609,7 +601,6 @@ func TestHealthServiceNodes_DistanceSort(t *testing.T) { func TestHealthServiceNodes_PassingFilter(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -656,7 +647,6 @@ func TestHealthServiceNodes_WanTranslation(t *testing.T) { c.ACLDatacenter = "" }) defer os.RemoveAll(dir1) - defer srv1.Shutdown() defer srv1.agent.Shutdown() testrpc.WaitForLeader(t, srv1.agent.RPC, "dc1") @@ -667,7 +657,6 @@ func TestHealthServiceNodes_WanTranslation(t *testing.T) { c.ACLDatacenter = "" }) defer os.RemoveAll(dir2) - defer srv2.Shutdown() defer srv2.agent.Shutdown() testrpc.WaitForLeader(t, srv2.agent.RPC, "dc2") diff --git a/command/agent/http.go b/command/agent/http.go index e81bf7b7b6..91d98bead3 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -1,300 +1,161 @@ package agent import ( - "crypto/tls" "encoding/json" "fmt" - "log" - "net" "net/http" "net/http/pprof" "net/url" - "os" "strconv" "strings" "time" "github.com/armon/go-metrics" "github.com/hashicorp/consul/consul/structs" - "github.com/hashicorp/consul/tlsutil" "github.com/mitchellh/mapstructure" ) -// HTTPServer is used to wrap an Agent and expose various API's -// in a RESTful manner +// HTTPServer provides an HTTP api for an agent. type HTTPServer struct { - agent *Agent - mux *http.ServeMux - listener net.Listener - logger *log.Logger - addr string + *http.Server + agent *Agent } -// NewHTTPServers starts new HTTP servers to provide an interface to -// the agent. -func NewHTTPServers(agent *Agent) ([]*HTTPServer, error) { - config := agent.config - logOutput := agent.logOutput - - var servers []*HTTPServer - - if config.Ports.HTTPS > 0 { - httpAddr, err := config.ClientListener(config.Addresses.HTTPS, config.Ports.HTTPS) - if err != nil { - return nil, err - } - - tlsConf := &tlsutil.Config{ - VerifyIncoming: config.VerifyIncoming || config.VerifyIncomingHTTPS, - VerifyOutgoing: config.VerifyOutgoing, - CAFile: config.CAFile, - CAPath: config.CAPath, - CertFile: config.CertFile, - KeyFile: config.KeyFile, - NodeName: config.NodeName, - ServerName: config.ServerName, - TLSMinVersion: config.TLSMinVersion, - CipherSuites: config.TLSCipherSuites, - PreferServerCipherSuites: config.TLSPreferServerCipherSuites, - } - - tlsConfig, err := tlsConf.IncomingTLSConfig() - if err != nil { - return nil, err - } - - ln, err := net.Listen(httpAddr.Network(), httpAddr.String()) - if err != nil { - return nil, fmt.Errorf("Failed to get Listen on %s: %v", httpAddr.String(), err) - } - - list := tls.NewListener(tcpKeepAliveListener{ln.(*net.TCPListener)}, tlsConfig) - - // Create the mux - mux := http.NewServeMux() - - // Create the server - srv := &HTTPServer{ - agent: agent, - mux: mux, - listener: list, - logger: log.New(logOutput, "", log.LstdFlags), - addr: httpAddr.String(), - } - srv.registerHandlers(config.EnableDebug) - - // Start the server - go http.Serve(list, mux) - servers = append(servers, srv) - } - - if config.Ports.HTTP > 0 { - httpAddr, err := config.ClientListener(config.Addresses.HTTP, config.Ports.HTTP) - if err != nil { - return nil, fmt.Errorf("Failed to get ClientListener address:port: %v", err) - } - - // Error if we are trying to bind a domain socket to an existing path - path := socketPath(config.Addresses.HTTP) - if path != "" { - if _, err := os.Stat(path); !os.IsNotExist(err) { - agent.logger.Printf("[WARN] agent: Replacing socket %q", path) - } - if err := os.Remove(path); err != nil && !os.IsNotExist(err) { - return nil, fmt.Errorf("error removing socket file: %s", err) - } - } - - ln, err := net.Listen(httpAddr.Network(), httpAddr.String()) - if err != nil { - return nil, fmt.Errorf("Failed to get Listen on %s: %v", httpAddr.String(), err) - } - - var list net.Listener - if path != "" { - // Set up ownership/permission bits on the socket file - if err := setFilePermissions(path, config.UnixSockets); err != nil { - return nil, fmt.Errorf("Failed setting up HTTP socket: %s", err) - } - list = ln - } else { - list = tcpKeepAliveListener{ln.(*net.TCPListener)} - } - - // Create the mux - mux := http.NewServeMux() - - // Create the server - srv := &HTTPServer{ - agent: agent, - mux: mux, - listener: list, - logger: log.New(logOutput, "", log.LstdFlags), - addr: httpAddr.String(), - } - srv.registerHandlers(config.EnableDebug) - - // Start the server - go http.Serve(list, mux) - servers = append(servers, srv) - } - - return servers, nil +func NewHTTPServer(addr string, a *Agent) *HTTPServer { + s := &HTTPServer{&http.Server{Addr: addr}, a} + s.Server.Handler = s.handler(s.agent.config.EnableDebug) + return s } -// tcpKeepAliveListener sets TCP keep-alive timeouts on accepted -// connections. It's used by NewHttpServer so -// dead TCP connections eventually go away. -type tcpKeepAliveListener struct { - *net.TCPListener -} +// handler is used to attach our handlers to the mux +func (s *HTTPServer) handler(enableDebug bool) http.Handler { + mux := http.NewServeMux() -func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) { - tc, err := ln.AcceptTCP() - if err != nil { - return - } - tc.SetKeepAlive(true) - tc.SetKeepAlivePeriod(30 * time.Second) - return tc, nil -} - -// Shutdown is used to shutdown the HTTP server -func (s *HTTPServer) Shutdown() { - if s != nil { - s.logger.Printf("[DEBUG] http: Shutting down http server (%v)", s.addr) - s.listener.Close() - } -} - -// handleFuncMetrics takes the given pattern and handler and wraps to produce -// metrics based on the pattern and request. -func (s *HTTPServer) handleFuncMetrics(pattern string, handler func(http.ResponseWriter, *http.Request)) { - // Get the parts of the pattern. We omit any initial empty for the - // leading slash, and put an underscore as a "thing" placeholder if we - // see a trailing slash, which means the part after is parsed. This lets - // us distinguish from things like /v1/query and /v1/query/. - var parts []string - for i, part := range strings.Split(pattern, "/") { - if part == "" { - if i == 0 { - continue - } else { + // handleFuncMetrics takes the given pattern and handler and wraps to produce + // metrics based on the pattern and request. + handleFuncMetrics := func(pattern string, handler http.HandlerFunc) { + // Get the parts of the pattern. We omit any initial empty for the + // leading slash, and put an underscore as a "thing" placeholder if we + // see a trailing slash, which means the part after is parsed. This lets + // us distinguish from things like /v1/query and /v1/query/. + var parts []string + for i, part := range strings.Split(pattern, "/") { + if part == "" { + if i == 0 { + continue + } part = "_" } + parts = append(parts, part) } - parts = append(parts, part) + + // Register the wrapper, which will close over the expensive-to-compute + // parts from above. + wrapper := func(resp http.ResponseWriter, req *http.Request) { + start := time.Now() + handler(resp, req) + key := append([]string{"consul", "http", req.Method}, parts...) + metrics.MeasureSince(key, start) + } + mux.HandleFunc(pattern, wrapper) } - // Register the wrapper, which will close over the expensive-to-compute - // parts from above. - wrapper := func(resp http.ResponseWriter, req *http.Request) { - start := time.Now() - handler(resp, req) - - key := append([]string{"consul", "http", req.Method}, parts...) - metrics.MeasureSince(key, start) - } - s.mux.HandleFunc(pattern, wrapper) -} - -// registerHandlers is used to attach our handlers to the mux -func (s *HTTPServer) registerHandlers(enableDebug bool) { - s.mux.HandleFunc("/", s.Index) + mux.HandleFunc("/", s.Index) // API V1. if s.agent.config.ACLDatacenter != "" { - s.handleFuncMetrics("/v1/acl/create", s.wrap(s.ACLCreate)) - s.handleFuncMetrics("/v1/acl/update", s.wrap(s.ACLUpdate)) - s.handleFuncMetrics("/v1/acl/destroy/", s.wrap(s.ACLDestroy)) - s.handleFuncMetrics("/v1/acl/info/", s.wrap(s.ACLGet)) - s.handleFuncMetrics("/v1/acl/clone/", s.wrap(s.ACLClone)) - s.handleFuncMetrics("/v1/acl/list", s.wrap(s.ACLList)) - s.handleFuncMetrics("/v1/acl/replication", s.wrap(s.ACLReplicationStatus)) + handleFuncMetrics("/v1/acl/create", s.wrap(s.ACLCreate)) + handleFuncMetrics("/v1/acl/update", s.wrap(s.ACLUpdate)) + handleFuncMetrics("/v1/acl/destroy/", s.wrap(s.ACLDestroy)) + handleFuncMetrics("/v1/acl/info/", s.wrap(s.ACLGet)) + handleFuncMetrics("/v1/acl/clone/", s.wrap(s.ACLClone)) + handleFuncMetrics("/v1/acl/list", s.wrap(s.ACLList)) + handleFuncMetrics("/v1/acl/replication", s.wrap(s.ACLReplicationStatus)) } else { - s.handleFuncMetrics("/v1/acl/create", s.wrap(ACLDisabled)) - s.handleFuncMetrics("/v1/acl/update", s.wrap(ACLDisabled)) - s.handleFuncMetrics("/v1/acl/destroy/", s.wrap(ACLDisabled)) - s.handleFuncMetrics("/v1/acl/info/", s.wrap(ACLDisabled)) - s.handleFuncMetrics("/v1/acl/clone/", s.wrap(ACLDisabled)) - s.handleFuncMetrics("/v1/acl/list", s.wrap(ACLDisabled)) - s.handleFuncMetrics("/v1/acl/replication", s.wrap(ACLDisabled)) + handleFuncMetrics("/v1/acl/create", s.wrap(ACLDisabled)) + handleFuncMetrics("/v1/acl/update", s.wrap(ACLDisabled)) + handleFuncMetrics("/v1/acl/destroy/", s.wrap(ACLDisabled)) + handleFuncMetrics("/v1/acl/info/", s.wrap(ACLDisabled)) + handleFuncMetrics("/v1/acl/clone/", s.wrap(ACLDisabled)) + handleFuncMetrics("/v1/acl/list", s.wrap(ACLDisabled)) + handleFuncMetrics("/v1/acl/replication", s.wrap(ACLDisabled)) } - s.handleFuncMetrics("/v1/agent/self", s.wrap(s.AgentSelf)) - s.handleFuncMetrics("/v1/agent/maintenance", s.wrap(s.AgentNodeMaintenance)) - s.handleFuncMetrics("/v1/agent/reload", s.wrap(s.AgentReload)) - s.handleFuncMetrics("/v1/agent/monitor", s.wrap(s.AgentMonitor)) - s.handleFuncMetrics("/v1/agent/services", s.wrap(s.AgentServices)) - s.handleFuncMetrics("/v1/agent/checks", s.wrap(s.AgentChecks)) - s.handleFuncMetrics("/v1/agent/members", s.wrap(s.AgentMembers)) - s.handleFuncMetrics("/v1/agent/join/", s.wrap(s.AgentJoin)) - s.handleFuncMetrics("/v1/agent/leave", s.wrap(s.AgentLeave)) - s.handleFuncMetrics("/v1/agent/force-leave/", s.wrap(s.AgentForceLeave)) - s.handleFuncMetrics("/v1/agent/check/register", s.wrap(s.AgentRegisterCheck)) - s.handleFuncMetrics("/v1/agent/check/deregister/", s.wrap(s.AgentDeregisterCheck)) - s.handleFuncMetrics("/v1/agent/check/pass/", s.wrap(s.AgentCheckPass)) - s.handleFuncMetrics("/v1/agent/check/warn/", s.wrap(s.AgentCheckWarn)) - s.handleFuncMetrics("/v1/agent/check/fail/", s.wrap(s.AgentCheckFail)) - s.handleFuncMetrics("/v1/agent/check/update/", s.wrap(s.AgentCheckUpdate)) - s.handleFuncMetrics("/v1/agent/service/register", s.wrap(s.AgentRegisterService)) - s.handleFuncMetrics("/v1/agent/service/deregister/", s.wrap(s.AgentDeregisterService)) - s.handleFuncMetrics("/v1/agent/service/maintenance/", s.wrap(s.AgentServiceMaintenance)) - s.handleFuncMetrics("/v1/catalog/register", s.wrap(s.CatalogRegister)) - s.handleFuncMetrics("/v1/catalog/deregister", s.wrap(s.CatalogDeregister)) - s.handleFuncMetrics("/v1/catalog/datacenters", s.wrap(s.CatalogDatacenters)) - s.handleFuncMetrics("/v1/catalog/nodes", s.wrap(s.CatalogNodes)) - s.handleFuncMetrics("/v1/catalog/services", s.wrap(s.CatalogServices)) - s.handleFuncMetrics("/v1/catalog/service/", s.wrap(s.CatalogServiceNodes)) - s.handleFuncMetrics("/v1/catalog/node/", s.wrap(s.CatalogNodeServices)) + handleFuncMetrics("/v1/agent/self", s.wrap(s.AgentSelf)) + handleFuncMetrics("/v1/agent/maintenance", s.wrap(s.AgentNodeMaintenance)) + handleFuncMetrics("/v1/agent/reload", s.wrap(s.AgentReload)) + handleFuncMetrics("/v1/agent/monitor", s.wrap(s.AgentMonitor)) + handleFuncMetrics("/v1/agent/services", s.wrap(s.AgentServices)) + handleFuncMetrics("/v1/agent/checks", s.wrap(s.AgentChecks)) + handleFuncMetrics("/v1/agent/members", s.wrap(s.AgentMembers)) + handleFuncMetrics("/v1/agent/join/", s.wrap(s.AgentJoin)) + handleFuncMetrics("/v1/agent/leave", s.wrap(s.AgentLeave)) + handleFuncMetrics("/v1/agent/force-leave/", s.wrap(s.AgentForceLeave)) + handleFuncMetrics("/v1/agent/check/register", s.wrap(s.AgentRegisterCheck)) + handleFuncMetrics("/v1/agent/check/deregister/", s.wrap(s.AgentDeregisterCheck)) + handleFuncMetrics("/v1/agent/check/pass/", s.wrap(s.AgentCheckPass)) + handleFuncMetrics("/v1/agent/check/warn/", s.wrap(s.AgentCheckWarn)) + handleFuncMetrics("/v1/agent/check/fail/", s.wrap(s.AgentCheckFail)) + handleFuncMetrics("/v1/agent/check/update/", s.wrap(s.AgentCheckUpdate)) + handleFuncMetrics("/v1/agent/service/register", s.wrap(s.AgentRegisterService)) + handleFuncMetrics("/v1/agent/service/deregister/", s.wrap(s.AgentDeregisterService)) + handleFuncMetrics("/v1/agent/service/maintenance/", s.wrap(s.AgentServiceMaintenance)) + handleFuncMetrics("/v1/catalog/register", s.wrap(s.CatalogRegister)) + handleFuncMetrics("/v1/catalog/deregister", s.wrap(s.CatalogDeregister)) + handleFuncMetrics("/v1/catalog/datacenters", s.wrap(s.CatalogDatacenters)) + handleFuncMetrics("/v1/catalog/nodes", s.wrap(s.CatalogNodes)) + handleFuncMetrics("/v1/catalog/services", s.wrap(s.CatalogServices)) + handleFuncMetrics("/v1/catalog/service/", s.wrap(s.CatalogServiceNodes)) + handleFuncMetrics("/v1/catalog/node/", s.wrap(s.CatalogNodeServices)) if !s.agent.config.DisableCoordinates { - s.handleFuncMetrics("/v1/coordinate/datacenters", s.wrap(s.CoordinateDatacenters)) - s.handleFuncMetrics("/v1/coordinate/nodes", s.wrap(s.CoordinateNodes)) + handleFuncMetrics("/v1/coordinate/datacenters", s.wrap(s.CoordinateDatacenters)) + handleFuncMetrics("/v1/coordinate/nodes", s.wrap(s.CoordinateNodes)) } else { - s.handleFuncMetrics("/v1/coordinate/datacenters", s.wrap(coordinateDisabled)) - s.handleFuncMetrics("/v1/coordinate/nodes", s.wrap(coordinateDisabled)) + handleFuncMetrics("/v1/coordinate/datacenters", s.wrap(coordinateDisabled)) + handleFuncMetrics("/v1/coordinate/nodes", s.wrap(coordinateDisabled)) } - s.handleFuncMetrics("/v1/event/fire/", s.wrap(s.EventFire)) - s.handleFuncMetrics("/v1/event/list", s.wrap(s.EventList)) - s.handleFuncMetrics("/v1/health/node/", s.wrap(s.HealthNodeChecks)) - s.handleFuncMetrics("/v1/health/checks/", s.wrap(s.HealthServiceChecks)) - s.handleFuncMetrics("/v1/health/state/", s.wrap(s.HealthChecksInState)) - s.handleFuncMetrics("/v1/health/service/", s.wrap(s.HealthServiceNodes)) - s.handleFuncMetrics("/v1/internal/ui/nodes", s.wrap(s.UINodes)) - s.handleFuncMetrics("/v1/internal/ui/node/", s.wrap(s.UINodeInfo)) - s.handleFuncMetrics("/v1/internal/ui/services", s.wrap(s.UIServices)) - s.handleFuncMetrics("/v1/kv/", s.wrap(s.KVSEndpoint)) - s.handleFuncMetrics("/v1/operator/raft/configuration", s.wrap(s.OperatorRaftConfiguration)) - s.handleFuncMetrics("/v1/operator/raft/peer", s.wrap(s.OperatorRaftPeer)) - s.handleFuncMetrics("/v1/operator/keyring", s.wrap(s.OperatorKeyringEndpoint)) - s.handleFuncMetrics("/v1/operator/autopilot/configuration", s.wrap(s.OperatorAutopilotConfiguration)) - s.handleFuncMetrics("/v1/operator/autopilot/health", s.wrap(s.OperatorServerHealth)) - s.handleFuncMetrics("/v1/query", s.wrap(s.PreparedQueryGeneral)) - s.handleFuncMetrics("/v1/query/", s.wrap(s.PreparedQuerySpecific)) - s.handleFuncMetrics("/v1/session/create", s.wrap(s.SessionCreate)) - s.handleFuncMetrics("/v1/session/destroy/", s.wrap(s.SessionDestroy)) - s.handleFuncMetrics("/v1/session/renew/", s.wrap(s.SessionRenew)) - s.handleFuncMetrics("/v1/session/info/", s.wrap(s.SessionGet)) - s.handleFuncMetrics("/v1/session/node/", s.wrap(s.SessionsForNode)) - s.handleFuncMetrics("/v1/session/list", s.wrap(s.SessionList)) - s.handleFuncMetrics("/v1/status/leader", s.wrap(s.StatusLeader)) - s.handleFuncMetrics("/v1/status/peers", s.wrap(s.StatusPeers)) - s.handleFuncMetrics("/v1/snapshot", s.wrap(s.Snapshot)) - s.handleFuncMetrics("/v1/txn", s.wrap(s.Txn)) + handleFuncMetrics("/v1/event/fire/", s.wrap(s.EventFire)) + handleFuncMetrics("/v1/event/list", s.wrap(s.EventList)) + handleFuncMetrics("/v1/health/node/", s.wrap(s.HealthNodeChecks)) + handleFuncMetrics("/v1/health/checks/", s.wrap(s.HealthServiceChecks)) + handleFuncMetrics("/v1/health/state/", s.wrap(s.HealthChecksInState)) + handleFuncMetrics("/v1/health/service/", s.wrap(s.HealthServiceNodes)) + handleFuncMetrics("/v1/internal/ui/nodes", s.wrap(s.UINodes)) + handleFuncMetrics("/v1/internal/ui/node/", s.wrap(s.UINodeInfo)) + handleFuncMetrics("/v1/internal/ui/services", s.wrap(s.UIServices)) + handleFuncMetrics("/v1/kv/", s.wrap(s.KVSEndpoint)) + handleFuncMetrics("/v1/operator/raft/configuration", s.wrap(s.OperatorRaftConfiguration)) + handleFuncMetrics("/v1/operator/raft/peer", s.wrap(s.OperatorRaftPeer)) + handleFuncMetrics("/v1/operator/keyring", s.wrap(s.OperatorKeyringEndpoint)) + handleFuncMetrics("/v1/operator/autopilot/configuration", s.wrap(s.OperatorAutopilotConfiguration)) + handleFuncMetrics("/v1/operator/autopilot/health", s.wrap(s.OperatorServerHealth)) + handleFuncMetrics("/v1/query", s.wrap(s.PreparedQueryGeneral)) + handleFuncMetrics("/v1/query/", s.wrap(s.PreparedQuerySpecific)) + handleFuncMetrics("/v1/session/create", s.wrap(s.SessionCreate)) + handleFuncMetrics("/v1/session/destroy/", s.wrap(s.SessionDestroy)) + handleFuncMetrics("/v1/session/renew/", s.wrap(s.SessionRenew)) + handleFuncMetrics("/v1/session/info/", s.wrap(s.SessionGet)) + handleFuncMetrics("/v1/session/node/", s.wrap(s.SessionsForNode)) + handleFuncMetrics("/v1/session/list", s.wrap(s.SessionList)) + handleFuncMetrics("/v1/status/leader", s.wrap(s.StatusLeader)) + handleFuncMetrics("/v1/status/peers", s.wrap(s.StatusPeers)) + handleFuncMetrics("/v1/snapshot", s.wrap(s.Snapshot)) + handleFuncMetrics("/v1/txn", s.wrap(s.Txn)) // Debug endpoints. if enableDebug { - s.handleFuncMetrics("/debug/pprof/", pprof.Index) - s.handleFuncMetrics("/debug/pprof/cmdline", pprof.Cmdline) - s.handleFuncMetrics("/debug/pprof/profile", pprof.Profile) - s.handleFuncMetrics("/debug/pprof/symbol", pprof.Symbol) + handleFuncMetrics("/debug/pprof/", pprof.Index) + handleFuncMetrics("/debug/pprof/cmdline", pprof.Cmdline) + handleFuncMetrics("/debug/pprof/profile", pprof.Profile) + handleFuncMetrics("/debug/pprof/symbol", pprof.Symbol) } // Use the custom UI dir if provided. if s.agent.config.UIDir != "" { - s.mux.Handle("/ui/", http.StripPrefix("/ui/", http.FileServer(http.Dir(s.agent.config.UIDir)))) + mux.Handle("/ui/", http.StripPrefix("/ui/", http.FileServer(http.Dir(s.agent.config.UIDir)))) } else if s.agent.config.EnableUI { - s.mux.Handle("/ui/", http.StripPrefix("/ui/", http.FileServer(assetFS()))) + mux.Handle("/ui/", http.StripPrefix("/ui/", http.FileServer(assetFS()))) } + return mux } // wrap is used to wrap functions to make them more convenient @@ -306,7 +167,7 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque // Obfuscate any tokens from appearing in the logs formVals, err := url.ParseQuery(req.URL.RawQuery) if err != nil { - s.logger.Printf("[ERR] http: Failed to decode query: %s from=%s", err, req.RemoteAddr) + s.agent.logger.Printf("[ERR] http: Failed to decode query: %s from=%s", err, req.RemoteAddr) resp.WriteHeader(http.StatusInternalServerError) // 500 return } @@ -322,7 +183,7 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque } handleErr := func(err error) { - s.logger.Printf("[ERR] http: Request %s %v, error: %v from=%s", req.Method, logURL, err, req.RemoteAddr) + s.agent.logger.Printf("[ERR] http: Request %s %v, error: %v from=%s", req.Method, logURL, err, req.RemoteAddr) code := http.StatusInternalServerError // 500 errMsg := err.Error() if strings.Contains(errMsg, "Permission denied") || strings.Contains(errMsg, "ACL not found") { @@ -344,7 +205,7 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque // Invoke the handler start := time.Now() defer func() { - s.logger.Printf("[DEBUG] http: Request %s %v (%v) from=%s", req.Method, logURL, time.Now().Sub(start), req.RemoteAddr) + s.agent.logger.Printf("[DEBUG] http: Request %s %v (%v) from=%s", req.Method, logURL, time.Now().Sub(start), req.RemoteAddr) }() obj, err := handler(resp, req) if err != nil { diff --git a/command/agent/http_test.go b/command/agent/http_test.go index db4fa1068e..d759fef7b7 100644 --- a/command/agent/http_test.go +++ b/command/agent/http_test.go @@ -51,26 +51,13 @@ func makeHTTPServerWithACLs(t *testing.T) (string, *HTTPServer) { } func makeHTTPServerWithConfigLog(t *testing.T, cb func(c *Config), l io.Writer, logWriter *logger.LogWriter) (string, *HTTPServer) { - configTry := 0 -RECONF: - configTry++ conf := nextConfig() if cb != nil { cb(conf) } dir, agent := makeAgentLog(t, conf, l, logWriter) - servers, err := NewHTTPServers(agent) - if err != nil { - if configTry < 3 { - goto RECONF - } - t.Fatalf("err: %v", err) - } - if len(servers) == 0 { - t.Fatalf(fmt.Sprintf("Failed to make HTTP server")) - } - return dir, servers[0] + return dir, agent.httpServers[0] } func TestHTTPServer_UnixSocket(t *testing.T) { @@ -91,7 +78,6 @@ func TestHTTPServer_UnixSocket(t *testing.T) { c.UnixSockets.Perms = "0777" }) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() // Ensure the socket was created @@ -158,12 +144,9 @@ func TestHTTPServer_UnixSocket_FileExists(t *testing.T) { dir, agent := makeAgent(t, conf) defer os.RemoveAll(dir) + defer agent.Shutdown() - // Try to start the server with the same path anyways. - if _, err := NewHTTPServers(agent); err != nil { - t.Fatalf("err: %s", err) - } - + defer agent.Shutdown() // Ensure the file was replaced by the socket fi, err = os.Stat(socket) if err != nil { @@ -238,7 +221,6 @@ func TestHTTPAPI_TranslateAddrHeader(t *testing.T) { { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() resp := httptest.NewRecorder() @@ -260,7 +242,6 @@ func TestHTTPAPI_TranslateAddrHeader(t *testing.T) { dir, srv := makeHTTPServer(t) srv.agent.config.TranslateWanAddrs = true defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() resp := httptest.NewRecorder() @@ -286,7 +267,6 @@ func TestHTTPAPIResponseHeaders(t *testing.T) { } defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() resp := httptest.NewRecorder() @@ -313,7 +293,6 @@ func TestContentTypeIsJSON(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() resp := httptest.NewRecorder() @@ -336,12 +315,11 @@ func TestContentTypeIsJSON(t *testing.T) { func TestHTTP_wrap_obfuscateLog(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() // Attach a custom logger so we can inspect it buf := &bytes.Buffer{} - srv.logger = log.New(buf, "", log.LstdFlags) + srv.agent.logger = log.New(buf, "", log.LstdFlags) resp := httptest.NewRecorder() req, _ := http.NewRequest("GET", "/some/url?token=secret1&token=secret2", nil) @@ -367,7 +345,6 @@ func TestPrettyPrintBare(t *testing.T) { func testPrettyPrint(pretty string, t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() r := &structs.DirEntry{Key: "key"} @@ -396,7 +373,6 @@ func testPrettyPrint(pretty string, t *testing.T) { func TestParseSource(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() // Default is agent's DC and no node (since the user didn't care, then @@ -578,7 +554,7 @@ func TestEnableWebUI(t *testing.T) { req, _ := http.NewRequest("GET", "/ui/", nil) // Perform the request resp := httptest.NewRecorder() - s.mux.ServeHTTP(resp, req) + s.Handler.ServeHTTP(resp, req) // Check the result if resp.Code != 200 { @@ -626,7 +602,6 @@ func httpTest(t *testing.T, f func(srv *HTTPServer)) { func httpTestWithConfig(t *testing.T, f func(srv *HTTPServer), cb func(c *Config)) { dir, srv := makeHTTPServerWithConfig(t, cb) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") f(srv) diff --git a/command/agent/kvs_endpoint_test.go b/command/agent/kvs_endpoint_test.go index d66acbd7f5..7701006b25 100644 --- a/command/agent/kvs_endpoint_test.go +++ b/command/agent/kvs_endpoint_test.go @@ -16,7 +16,6 @@ import ( func TestKVSEndpoint_PUT_GET_DELETE(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -78,7 +77,6 @@ func TestKVSEndpoint_PUT_GET_DELETE(t *testing.T) { func TestKVSEndpoint_Recurse(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -157,7 +155,6 @@ func TestKVSEndpoint_Recurse(t *testing.T) { func TestKVSEndpoint_DELETE_CAS(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -226,7 +223,6 @@ func TestKVSEndpoint_DELETE_CAS(t *testing.T) { func TestKVSEndpoint_CAS(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -305,7 +301,6 @@ func TestKVSEndpoint_CAS(t *testing.T) { func TestKVSEndpoint_ListKeys(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") diff --git a/command/agent/listen.go b/command/agent/listen.go new file mode 100644 index 0000000000..ebe984f2e9 --- /dev/null +++ b/command/agent/listen.go @@ -0,0 +1,61 @@ +package agent + +import ( + "crypto/tls" + "fmt" + "net" + "os" + "time" +) + +func ListenTCP(addr string) (net.Listener, error) { + l, err := net.Listen("tcp", addr) + if err != nil { + return nil, err + } + l = tcpKeepAliveListener{l.(*net.TCPListener)} + return l, nil +} + +func ListenTLS(addr string, cfg *tls.Config) (net.Listener, error) { + l, err := ListenTCP(addr) + if err != nil { + return nil, err + } + return tls.NewListener(l, cfg), nil +} + +func ListenUnix(addr string, perm FilePermissions) (net.Listener, error) { + // todo(fs): move this somewhere else + // if _, err := os.Stat(addr); !os.IsNotExist(err) { + // s.agent.logger.Printf("[WARN] agent: Replacing socket %q", addr) + // } + if err := os.Remove(addr); err != nil && !os.IsNotExist(err) { + return nil, fmt.Errorf("error removing socket file: %s", err) + } + l, err := net.Listen("unix", addr) + if err != nil { + return nil, err + } + if err := setFilePermissions(addr, perm); err != nil { + return nil, fmt.Errorf("Failed setting up HTTP socket: %s", err) + } + return l, nil +} + +// tcpKeepAliveListener sets TCP keep-alive timeouts on accepted +// connections. It's used by NewHttpServer so +// dead TCP connections eventually go away. +type tcpKeepAliveListener struct { + *net.TCPListener +} + +func (ln tcpKeepAliveListener) Accept() (c net.Conn, err error) { + tc, err := ln.AcceptTCP() + if err != nil { + return + } + tc.SetKeepAlive(true) + tc.SetKeepAlivePeriod(30 * time.Second) + return tc, nil +} diff --git a/command/agent/status_endpoint_test.go b/command/agent/status_endpoint_test.go index 74a4341fd7..0e354d9774 100644 --- a/command/agent/status_endpoint_test.go +++ b/command/agent/status_endpoint_test.go @@ -10,7 +10,6 @@ import ( func TestStatusLeader(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -28,7 +27,6 @@ func TestStatusLeader(t *testing.T) { func TestStatusPeers(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() obj, err := srv.StatusPeers(nil, nil) diff --git a/command/agent/ui_endpoint_test.go b/command/agent/ui_endpoint_test.go index 75fdd593c2..83dd637468 100644 --- a/command/agent/ui_endpoint_test.go +++ b/command/agent/ui_endpoint_test.go @@ -29,7 +29,6 @@ func TestUiIndex(t *testing.T) { c.UIDir = uiDir }) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() // Create file @@ -41,7 +40,7 @@ func TestUiIndex(t *testing.T) { // Register node req, _ := http.NewRequest("GET", "/ui/my-file", nil) req.URL.Scheme = "http" - req.URL.Host = srv.listener.Addr().String() + req.URL.Host = srv.Addr // Make the request client := cleanhttp.DefaultClient() @@ -66,7 +65,6 @@ func TestUiIndex(t *testing.T) { func TestUiNodes(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") @@ -106,7 +104,6 @@ func TestUiNodes(t *testing.T) { func TestUiNodeInfo(t *testing.T) { dir, srv := makeHTTPServer(t) defer os.RemoveAll(dir) - defer srv.Shutdown() defer srv.agent.Shutdown() testrpc.WaitForLeader(t, srv.agent.RPC, "dc1") diff --git a/command/util_test.go b/command/util_test.go index b81282b460..4cace08a49 100644 --- a/command/util_test.go +++ b/command/util_test.go @@ -20,8 +20,6 @@ import ( "github.com/mitchellh/cli" ) -var offset uint64 - func init() { // Seed the random number generator rand.Seed(time.Now().UnixNano()) @@ -29,25 +27,23 @@ func init() { version.Version = "0.8.0" } -type agentWrapper struct { - dir string - config *agent.Config +type server struct { agent *agent.Agent - http *agent.HTTPServer + config *agent.Config httpAddr string + dir string } -func (a *agentWrapper) Shutdown() { +func (a *server) Shutdown() { a.agent.Shutdown() - a.http.Shutdown() os.RemoveAll(a.dir) } -func testAgent(t *testing.T) *agentWrapper { +func testAgent(t *testing.T) *server { return testAgentWithConfig(t, nil) } -func testAgentWithAPIClient(t *testing.T) (*agentWrapper, *api.Client) { +func testAgentWithAPIClient(t *testing.T) (*server, *api.Client) { agent := testAgentWithConfig(t, func(c *agent.Config) {}) client, err := api.NewClient(&api.Config{Address: agent.httpAddr}) if err != nil { @@ -56,71 +52,54 @@ func testAgentWithAPIClient(t *testing.T) (*agentWrapper, *api.Client) { return agent, client } -func testAgentWithConfig(t *testing.T, cb func(c *agent.Config)) *agentWrapper { +func testAgentWithConfig(t *testing.T, cb func(c *agent.Config)) *server { return testAgentWithConfigReload(t, cb, nil) } -func testAgentWithConfigReload(t *testing.T, cb func(c *agent.Config), reloadCh chan chan error) *agentWrapper { - lw := logger.NewLogWriter(512) +func testAgentWithConfigReload(t *testing.T, cb func(c *agent.Config), reloadCh chan chan error) *server { conf := nextConfig() if cb != nil { cb(conf) } - dir := testutil.TempDir(t, "agent") - conf.DataDir = dir - - a, err := agent.Create(conf, lw, nil, reloadCh) + conf.DataDir = testutil.TempDir(t, "agent") + a, err := agent.Create(conf, logger.NewLogWriter(512), nil, reloadCh) if err != nil { - os.RemoveAll(dir) - t.Fatalf(fmt.Sprintf("err: %v", err)) + os.RemoveAll(conf.DataDir) + t.Fatalf("err: %v", err) } conf.Addresses.HTTP = "127.0.0.1" - httpAddr := fmt.Sprintf("127.0.0.1:%d", conf.Ports.HTTP) - http, err := agent.NewHTTPServers(a) - if err != nil { - os.RemoveAll(dir) - t.Fatalf(fmt.Sprintf("err: %v", err)) - } - - if http == nil || len(http) == 0 { - os.RemoveAll(dir) - t.Fatalf(fmt.Sprintf("Could not create HTTP server to listen on: %s", httpAddr)) - } - - return &agentWrapper{ - dir: dir, - config: conf, - agent: a, - http: http[0], - httpAddr: httpAddr, - } + addr := fmt.Sprintf("%s:%d", conf.Addresses.HTTP, conf.Ports.HTTP) + return &server{agent: a, config: conf, httpAddr: addr, dir: conf.DataDir} } -func nextConfig() *agent.Config { - idx := int(atomic.AddUint64(&offset, 1)) - conf := agent.DefaultConfig() +var nextPort uint64 = 10000 +func nextConfig() *agent.Config { nodeID, err := uuid.GenerateUUID() if err != nil { panic(err) } + port := int(atomic.AddUint64(&nextPort, 10)) + + conf := agent.DefaultConfig() conf.Bootstrap = true conf.Datacenter = "dc1" - conf.NodeName = fmt.Sprintf("Node %d", idx) + conf.NodeName = fmt.Sprintf("Node %d", port) conf.NodeID = types.NodeID(nodeID) conf.BindAddr = "127.0.0.1" conf.Server = true - conf.Version = version.Version - - conf.Ports.HTTP = 10000 + 10*idx - conf.Ports.HTTPS = 10401 + 10*idx - conf.Ports.SerfLan = 10201 + 10*idx - conf.Ports.SerfWan = 10202 + 10*idx - conf.Ports.Server = 10300 + 10*idx + conf.Ports = agent.PortConfig{ + DNS: port + 1, + HTTP: port + 2, + HTTPS: port + 3, + SerfLan: port + 4, + SerfWan: port + 5, + Server: port + 6, + } cons := consul.DefaultConfig() conf.ConsulConfig = cons