diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index e08fcc51e2..e5fcfd09db 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -25,6 +25,8 @@ func nextConfig() *Config { idx := int(atomic.AddUint64(&offset, 1)) conf := DefaultConfig() + conf.Version = "a.b" + conf.VersionPrerelease = "c.d" conf.AdvertiseAddr = "127.0.0.1" conf.Bootstrap = true conf.Datacenter = "dc1" diff --git a/command/agent/command.go b/command/agent/command.go index 008eed42d8..4805345895 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -48,6 +48,7 @@ type Command struct { httpServers []*HTTPServer dnsServer *DNSServer scadaProvider *scada.Provider + scadaHttp *HTTPServer } // readConfig is responsible for setup of our configuration using @@ -345,20 +346,14 @@ func (c *Command) setupAgent(config *Config, logOutput io.Writer, logWriter *log c.rpcServer = NewAgentRPC(agent, rpcListener, logOutput, logWriter) // Enable the SCADA integration - var scadaList net.Listener - if config.AtlasInfrastructure != "" { - provider, list, err := NewProvider(config, logOutput) - if err != nil { - agent.Shutdown() - c.Ui.Error(fmt.Sprintf("Error starting SCADA connection: %s", err)) - return err - } - c.scadaProvider = provider - scadaList = list + if err := c.setupScadaConn(config); err != nil { + agent.Shutdown() + c.Ui.Error(fmt.Sprintf("Error starting SCADA connection: %s", err)) + return err } - if config.Ports.HTTP > 0 || config.Ports.HTTPS > 0 || scadaList != nil { - servers, err := NewHTTPServers(agent, config, scadaList, logOutput) + if config.Ports.HTTP > 0 || config.Ports.HTTPS > 0 { + servers, err := NewHTTPServers(agent, config, logOutput) if err != nil { agent.Shutdown() c.Ui.Error(fmt.Sprintf("Error starting http servers: %s", err)) @@ -684,9 +679,16 @@ AFTER_MIGRATE: for _, server := range c.httpServers { defer server.Shutdown() } - if c.scadaProvider != nil { - defer c.scadaProvider.Shutdown() - } + + // Check and shut down the SCADA listeners at the end + defer func() { + if c.scadaHttp != nil { + c.scadaHttp.Shutdown() + } + if c.scadaProvider != nil { + c.scadaProvider.Shutdown() + } + }() // Join startup nodes if specified if err := c.startupJoin(config); err != nil { @@ -904,9 +906,45 @@ func (c *Command) handleReload(config *Config) *Config { }(wp) } + // Reload SCADA client if we have a change + if newConf.AtlasInfrastructure != config.AtlasInfrastructure || + newConf.AtlasToken != config.AtlasToken { + if err := c.setupScadaConn(newConf); err != nil { + c.Ui.Error(fmt.Sprintf("Failed reloading SCADA client: %s", err)) + return nil + } + } + return newConf } +// startScadaClient is used to start a new SCADA provider and listener, +// replacing any existing listeners. +func (c *Command) setupScadaConn(config *Config) error { + // Shut down existing SCADA listeners + if c.scadaProvider != nil { + c.scadaProvider.Shutdown() + } + if c.scadaHttp != nil { + c.scadaHttp.Shutdown() + } + + // No-op if we don't have an infrastructure + if config.AtlasInfrastructure == "" { + return nil + } + + // Create the new provider and listener + c.Ui.Output("Connecting to Atlas: " + config.AtlasInfrastructure) + provider, list, err := NewProvider(config, c.logOutput) + if err != nil { + return err + } + c.scadaProvider = provider + c.scadaHttp = newScadaHttp(c.agent, list) + return nil +} + func (c *Command) Synopsis() string { return "Runs a Consul agent" } diff --git a/command/agent/command_test.go b/command/agent/command_test.go index b54bd45c5b..9839446981 100644 --- a/command/agent/command_test.go +++ b/command/agent/command_test.go @@ -6,6 +6,7 @@ import ( "io/ioutil" "log" "os" + "strings" "testing" "github.com/hashicorp/consul/testutil" @@ -246,3 +247,55 @@ func TestSetupAgent_RPCUnixSocket_FileExists(t *testing.T) { t.Fatalf("bad permissions: %s", fi.Mode()) } } + +func TestSetupScadaConn(t *testing.T) { + // Create a config and assign an infra name + conf1 := nextConfig() + conf1.AtlasInfrastructure = "hashicorp/test1" + conf1.AtlasToken = "abc" + + dir, agent := makeAgent(t, conf1) + defer os.RemoveAll(dir) + defer agent.Shutdown() + + cmd := &Command{ + ShutdownCh: make(chan struct{}), + Ui: new(cli.MockUi), + agent: agent, + } + + // First start creates the scada conn + if err := cmd.setupScadaConn(conf1); err != nil { + t.Fatalf("err: %s", err) + } + list := cmd.scadaHttp.listener.(*scadaListener) + if list == nil || list.addr.infra != "hashicorp/test1" { + t.Fatalf("bad: %#v", list) + } + http1 := cmd.scadaHttp + provider1 := cmd.scadaProvider + + // Performing setup again tears down original and replaces + // with a new SCADA client. + conf2 := nextConfig() + conf2.AtlasInfrastructure = "hashicorp/test2" + conf2.AtlasToken = "123" + if err := cmd.setupScadaConn(conf2); err != nil { + t.Fatalf("err: %s", err) + } + if cmd.scadaHttp == http1 || cmd.scadaProvider == provider1 { + t.Fatalf("should change: %#v %#v", cmd.scadaHttp, cmd.scadaProvider) + } + list = cmd.scadaHttp.listener.(*scadaListener) + if list == nil || list.addr.infra != "hashicorp/test2" { + t.Fatalf("bad: %#v", list) + } + + // Original provider and listener must be closed + if !provider1.IsShutdown() { + t.Fatalf("should be shutdown") + } + if _, err := http1.listener.Accept(); !strings.Contains(err.Error(), "closed") { + t.Fatalf("should be closed") + } +} diff --git a/command/agent/http.go b/command/agent/http.go index 8e5ea7c24e..7a61a0498b 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -41,7 +41,7 @@ type HTTPServer struct { // NewHTTPServers starts new HTTP servers to provide an interface to // the agent. -func NewHTTPServers(agent *Agent, config *Config, scada net.Listener, logOutput io.Writer) ([]*HTTPServer, error) { +func NewHTTPServers(agent *Agent, config *Config, logOutput io.Writer) ([]*HTTPServer, error) { var servers []*HTTPServer if config.Ports.HTTPS > 0 { @@ -142,29 +142,30 @@ func NewHTTPServers(agent *Agent, config *Config, scada net.Listener, logOutput servers = append(servers, srv) } - if scada != nil { - // Create the mux - mux := http.NewServeMux() - - // Create the server - srv := &HTTPServer{ - agent: agent, - mux: mux, - listener: scada, - logger: log.New(logOutput, "", log.LstdFlags), - uiDir: config.UiDir, - addr: scadaHTTPAddr, - } - srv.registerHandlers(false) // Never allow debug for SCADA - - // Start the server - go http.Serve(scada, mux) - servers = append(servers, srv) - } - return servers, nil } +// newScadaHttp creates a new HTTP server wrapping the SCADA +// listener such that HTTP calls can be sent from the brokers. +func newScadaHttp(agent *Agent, list net.Listener) *HTTPServer { + // Create the mux + mux := http.NewServeMux() + + // Create the server + srv := &HTTPServer{ + agent: agent, + mux: mux, + listener: list, + logger: agent.logger, + addr: scadaHTTPAddr, + } + srv.registerHandlers(false) // Never allow debug for SCADA + + // Start the server + go http.Serve(list, mux) + return srv +} + // tcpKeepAliveListener sets TCP keep-alive timeouts on accepted // connections. It's used by NewHttpServer so // dead TCP connections eventually go away. diff --git a/command/agent/http_test.go b/command/agent/http_test.go index 7adff75fb7..9869b258dd 100644 --- a/command/agent/http_test.go +++ b/command/agent/http_test.go @@ -38,7 +38,7 @@ func makeHTTPServerWithConfig(t *testing.T, cb func(c *Config)) (string, *HTTPSe t.Fatalf("err: %v", err) } conf.UiDir = uiDir - servers, err := NewHTTPServers(agent, conf, nil, agent.logOutput) + servers, err := NewHTTPServers(agent, conf, agent.logOutput) if err != nil { t.Fatalf("err: %v", err) } @@ -148,7 +148,7 @@ func TestHTTPServer_UnixSocket_FileExists(t *testing.T) { defer os.RemoveAll(dir) // Try to start the server with the same path anyways. - if _, err := NewHTTPServers(agent, conf, nil, agent.logOutput); err != nil { + if _, err := NewHTTPServers(agent, conf, agent.logOutput); err != nil { t.Fatalf("err: %s", err) } @@ -516,6 +516,39 @@ func TestACLResolution(t *testing.T) { }) } +func TestScadaHTTP(t *testing.T) { + // Create the agent + dir, agent := makeAgent(t, nextConfig()) + defer os.RemoveAll(dir) + defer agent.Shutdown() + + // Create a generic listener + list, err := net.Listen("tcp", ":0") + if err != nil { + t.Fatalf("err: %s", err) + } + defer list.Close() + + // Create the SCADA HTTP server + scadaHttp := newScadaHttp(agent, list) + + // Returned server uses the listener and scada addr + if scadaHttp.listener != list { + t.Fatalf("bad listener: %#v", scadaHttp) + } + if scadaHttp.addr != scadaHTTPAddr { + t.Fatalf("expected %v, got: %v", scadaHttp.addr, scadaHTTPAddr) + } + + // Check that debug endpoints were not enabled. This will cause + // the serve mux to panic if the routes are already handled. + mockFn := func(w http.ResponseWriter, r *http.Request) {} + scadaHttp.mux.HandleFunc("/debug/pprof/", mockFn) + scadaHttp.mux.HandleFunc("/debug/pprof/cmdline", mockFn) + scadaHttp.mux.HandleFunc("/debug/pprof/profile", mockFn) + scadaHttp.mux.HandleFunc("/debug/pprof/symbol", mockFn) +} + // assertIndex tests that X-Consul-Index is set and non-zero func assertIndex(t *testing.T, resp *httptest.ResponseRecorder) { header := resp.Header().Get("X-Consul-Index") diff --git a/command/util_test.go b/command/util_test.go index 294d171ec3..a48f33cb0c 100644 --- a/command/util_test.go +++ b/command/util_test.go @@ -70,7 +70,7 @@ func testAgentWithConfig(t *testing.T, cb func(c *agent.Config)) *agentWrapper { conf.Addresses.HTTP = "127.0.0.1" httpAddr := fmt.Sprintf("127.0.0.1:%d", conf.Ports.HTTP) - http, err := agent.NewHTTPServers(a, conf, nil, os.Stderr) + http, err := agent.NewHTTPServers(a, conf, os.Stderr) if err != nil { os.RemoveAll(dir) t.Fatalf(fmt.Sprintf("err: %v", err)) diff --git a/website/source/docs/agent/options.html.markdown b/website/source/docs/agent/options.html.markdown index d84c2cc902..e014dea043 100644 --- a/website/source/docs/agent/options.html.markdown +++ b/website/source/docs/agent/options.html.markdown @@ -643,3 +643,5 @@ items which are reloaded include: * Services * Watches * HTTP Client Address +* Atlas Token +* Atlas Infrastructure