From 2e6ccded2c472812e3bf417f88cd2dcd4581d00a Mon Sep 17 00:00:00 2001 From: Ryan Uber Date: Tue, 25 Aug 2015 16:40:55 -0700 Subject: [PATCH 1/8] agent: scada client and HTTP server are tracked separately --- command/agent/command.go | 40 ++++++++++++++++++++++++++++++++----- command/agent/http.go | 41 +++++++++++++++++++------------------- command/agent/http_test.go | 4 ++-- command/util_test.go | 2 +- 4 files changed, 58 insertions(+), 29 deletions(-) diff --git a/command/agent/command.go b/command/agent/command.go index 008eed42d8..05f4c4374b 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -48,6 +48,7 @@ type Command struct { httpServers []*HTTPServer dnsServer *DNSServer scadaProvider *scada.Provider + scadaHttp *HTTPServer } // readConfig is responsible for setup of our configuration using @@ -357,8 +358,8 @@ func (c *Command) setupAgent(config *Config, logOutput io.Writer, logWriter *log scadaList = list } - if config.Ports.HTTP > 0 || config.Ports.HTTPS > 0 || scadaList != nil { - servers, err := NewHTTPServers(agent, config, scadaList, logOutput) + if config.Ports.HTTP > 0 || config.Ports.HTTPS > 0 { + servers, err := NewHTTPServers(agent, config, logOutput) if err != nil { agent.Shutdown() c.Ui.Error(fmt.Sprintf("Error starting http servers: %s", err)) @@ -367,6 +368,10 @@ func (c *Command) setupAgent(config *Config, logOutput io.Writer, logWriter *log c.httpServers = servers } + if scadaList != nil { + c.scadaHttp = newScadaHttp(agent, scadaList) + } + if config.Ports.DNS > 0 { dnsAddr, err := config.ClientListener(config.Addresses.DNS, config.Ports.DNS) if err != nil { @@ -684,9 +689,16 @@ AFTER_MIGRATE: for _, server := range c.httpServers { defer server.Shutdown() } - if c.scadaProvider != nil { - defer c.scadaProvider.Shutdown() - } + + // Check and shut down the SCADA listeners at the end + defer func() { + if c.scadaHttp != nil { + c.scadaHttp.Shutdown() + } + if c.scadaProvider != nil { + c.scadaProvider.Shutdown() + } + }() // Join startup nodes if specified if err := c.startupJoin(config); err != nil { @@ -904,6 +916,24 @@ func (c *Command) handleReload(config *Config) *Config { }(wp) } + // Reload the SCADA client + if c.scadaProvider != nil { + // Shut down the existing SCADA listeners + c.scadaProvider.Shutdown() + if c.scadaHttp != nil { + c.scadaHttp.Shutdown() + } + + // Create the new provider and listener + provider, list, err := NewProvider(newConf, c.logOutput) + if err != nil { + c.Ui.Error(fmt.Sprintf("Failed reloading SCADA client: %s", err)) + return nil + } + c.scadaProvider = provider + c.scadaHttp = newScadaHttp(c.agent, list) + } + return newConf } diff --git a/command/agent/http.go b/command/agent/http.go index 8e5ea7c24e..7b99286ea1 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -41,7 +41,7 @@ type HTTPServer struct { // NewHTTPServers starts new HTTP servers to provide an interface to // the agent. -func NewHTTPServers(agent *Agent, config *Config, scada net.Listener, logOutput io.Writer) ([]*HTTPServer, error) { +func NewHTTPServers(agent *Agent, config *Config, logOutput io.Writer) ([]*HTTPServer, error) { var servers []*HTTPServer if config.Ports.HTTPS > 0 { @@ -142,29 +142,28 @@ func NewHTTPServers(agent *Agent, config *Config, scada net.Listener, logOutput servers = append(servers, srv) } - if scada != nil { - // Create the mux - mux := http.NewServeMux() - - // Create the server - srv := &HTTPServer{ - agent: agent, - mux: mux, - listener: scada, - logger: log.New(logOutput, "", log.LstdFlags), - uiDir: config.UiDir, - addr: scadaHTTPAddr, - } - srv.registerHandlers(false) // Never allow debug for SCADA - - // Start the server - go http.Serve(scada, mux) - servers = append(servers, srv) - } - return servers, nil } +func newScadaHttp(agent *Agent, list net.Listener) *HTTPServer { + // Create the mux + mux := http.NewServeMux() + + // Create the server + srv := &HTTPServer{ + agent: agent, + mux: mux, + listener: list, + logger: agent.logger, + addr: scadaHTTPAddr, + } + srv.registerHandlers(false) // Never allow debug for SCADA + + // Start the server + go http.Serve(list, mux) + return srv +} + // tcpKeepAliveListener sets TCP keep-alive timeouts on accepted // connections. It's used by NewHttpServer so // dead TCP connections eventually go away. diff --git a/command/agent/http_test.go b/command/agent/http_test.go index 7adff75fb7..5923b24a87 100644 --- a/command/agent/http_test.go +++ b/command/agent/http_test.go @@ -38,7 +38,7 @@ func makeHTTPServerWithConfig(t *testing.T, cb func(c *Config)) (string, *HTTPSe t.Fatalf("err: %v", err) } conf.UiDir = uiDir - servers, err := NewHTTPServers(agent, conf, nil, agent.logOutput) + servers, err := NewHTTPServers(agent, conf, agent.logOutput) if err != nil { t.Fatalf("err: %v", err) } @@ -148,7 +148,7 @@ func TestHTTPServer_UnixSocket_FileExists(t *testing.T) { defer os.RemoveAll(dir) // Try to start the server with the same path anyways. - if _, err := NewHTTPServers(agent, conf, nil, agent.logOutput); err != nil { + if _, err := NewHTTPServers(agent, conf, agent.logOutput); err != nil { t.Fatalf("err: %s", err) } diff --git a/command/util_test.go b/command/util_test.go index 294d171ec3..a48f33cb0c 100644 --- a/command/util_test.go +++ b/command/util_test.go @@ -70,7 +70,7 @@ func testAgentWithConfig(t *testing.T, cb func(c *agent.Config)) *agentWrapper { conf.Addresses.HTTP = "127.0.0.1" httpAddr := fmt.Sprintf("127.0.0.1:%d", conf.Ports.HTTP) - http, err := agent.NewHTTPServers(a, conf, nil, os.Stderr) + http, err := agent.NewHTTPServers(a, conf, os.Stderr) if err != nil { os.RemoveAll(dir) t.Fatalf(fmt.Sprintf("err: %v", err)) From 87c1e4fcd3ddb1255a5b149268d0afb4db1c4a0b Mon Sep 17 00:00:00 2001 From: Ryan Uber Date: Tue, 25 Aug 2015 17:19:11 -0700 Subject: [PATCH 2/8] agent: document the scada http creation func --- command/agent/http.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/command/agent/http.go b/command/agent/http.go index 7b99286ea1..7a61a0498b 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -145,6 +145,8 @@ func NewHTTPServers(agent *Agent, config *Config, logOutput io.Writer) ([]*HTTPS return servers, nil } +// newScadaHttp creates a new HTTP server wrapping the SCADA +// listener such that HTTP calls can be sent from the brokers. func newScadaHttp(agent *Agent, list net.Listener) *HTTPServer { // Create the mux mux := http.NewServeMux() From 96e7b1869cf02b22f468cb60071e7f16c6ac5e82 Mon Sep 17 00:00:00 2001 From: Ryan Uber Date: Tue, 25 Aug 2015 17:21:29 -0700 Subject: [PATCH 3/8] website: add atlas token/infra to reloadable config --- website/source/docs/agent/options.html.markdown | 2 ++ 1 file changed, 2 insertions(+) diff --git a/website/source/docs/agent/options.html.markdown b/website/source/docs/agent/options.html.markdown index d84c2cc902..e014dea043 100644 --- a/website/source/docs/agent/options.html.markdown +++ b/website/source/docs/agent/options.html.markdown @@ -643,3 +643,5 @@ items which are reloaded include: * Services * Watches * HTTP Client Address +* Atlas Token +* Atlas Infrastructure From eb8974160fd606391289b0336e1c8a6789b317b5 Mon Sep 17 00:00:00 2001 From: Ryan Uber Date: Tue, 25 Aug 2015 18:27:07 -0700 Subject: [PATCH 4/8] agent: clean up scada connection manager --- command/agent/command.go | 64 +++++++++++++++++++++------------------- 1 file changed, 34 insertions(+), 30 deletions(-) diff --git a/command/agent/command.go b/command/agent/command.go index 05f4c4374b..f6d8ac6337 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -346,16 +346,10 @@ func (c *Command) setupAgent(config *Config, logOutput io.Writer, logWriter *log c.rpcServer = NewAgentRPC(agent, rpcListener, logOutput, logWriter) // Enable the SCADA integration - var scadaList net.Listener - if config.AtlasInfrastructure != "" { - provider, list, err := NewProvider(config, logOutput) - if err != nil { - agent.Shutdown() - c.Ui.Error(fmt.Sprintf("Error starting SCADA connection: %s", err)) - return err - } - c.scadaProvider = provider - scadaList = list + if err := c.setupScadaConn(config); err != nil { + agent.Shutdown() + c.Ui.Error(fmt.Sprintf("Error starting SCADA connection: %s", err)) + return err } if config.Ports.HTTP > 0 || config.Ports.HTTPS > 0 { @@ -368,10 +362,6 @@ func (c *Command) setupAgent(config *Config, logOutput io.Writer, logWriter *log c.httpServers = servers } - if scadaList != nil { - c.scadaHttp = newScadaHttp(agent, scadaList) - } - if config.Ports.DNS > 0 { dnsAddr, err := config.ClientListener(config.Addresses.DNS, config.Ports.DNS) if err != nil { @@ -916,27 +906,41 @@ func (c *Command) handleReload(config *Config) *Config { }(wp) } - // Reload the SCADA client - if c.scadaProvider != nil { - // Shut down the existing SCADA listeners - c.scadaProvider.Shutdown() - if c.scadaHttp != nil { - c.scadaHttp.Shutdown() - } - - // Create the new provider and listener - provider, list, err := NewProvider(newConf, c.logOutput) - if err != nil { - c.Ui.Error(fmt.Sprintf("Failed reloading SCADA client: %s", err)) - return nil - } - c.scadaProvider = provider - c.scadaHttp = newScadaHttp(c.agent, list) + // Reload SCADA client + if err := c.setupScadaConn(newConf); err != nil { + c.Ui.Error(fmt.Sprintf("Failed reloading SCADA client: %s", err)) + return nil } return newConf } +// startScadaClient is used to start a new SCADA provider and listener, +// replacing any existing listeners. +func (c *Command) setupScadaConn(config *Config) error { + // Shut down existing SCADA listeners + if c.scadaProvider != nil { + c.scadaProvider.Shutdown() + } + if c.scadaHttp != nil { + c.scadaHttp.Shutdown() + } + + // No-op if we don't have an infrastructure + if config.AtlasInfrastructure == "" { + return nil + } + + // Create the new provider and listener + provider, list, err := NewProvider(config, c.logOutput) + if err != nil { + return err + } + c.scadaProvider = provider + c.scadaHttp = newScadaHttp(c.agent, list) + return nil +} + func (c *Command) Synopsis() string { return "Runs a Consul agent" } From 52a7206ff3787b5286f5d84f4e7aad57c1fa1fa7 Mon Sep 17 00:00:00 2001 From: Ryan Uber Date: Tue, 25 Aug 2015 18:51:04 -0700 Subject: [PATCH 5/8] agent: test scada HTTP server creation --- command/agent/http_test.go | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/command/agent/http_test.go b/command/agent/http_test.go index 5923b24a87..9869b258dd 100644 --- a/command/agent/http_test.go +++ b/command/agent/http_test.go @@ -516,6 +516,39 @@ func TestACLResolution(t *testing.T) { }) } +func TestScadaHTTP(t *testing.T) { + // Create the agent + dir, agent := makeAgent(t, nextConfig()) + defer os.RemoveAll(dir) + defer agent.Shutdown() + + // Create a generic listener + list, err := net.Listen("tcp", ":0") + if err != nil { + t.Fatalf("err: %s", err) + } + defer list.Close() + + // Create the SCADA HTTP server + scadaHttp := newScadaHttp(agent, list) + + // Returned server uses the listener and scada addr + if scadaHttp.listener != list { + t.Fatalf("bad listener: %#v", scadaHttp) + } + if scadaHttp.addr != scadaHTTPAddr { + t.Fatalf("expected %v, got: %v", scadaHttp.addr, scadaHTTPAddr) + } + + // Check that debug endpoints were not enabled. This will cause + // the serve mux to panic if the routes are already handled. + mockFn := func(w http.ResponseWriter, r *http.Request) {} + scadaHttp.mux.HandleFunc("/debug/pprof/", mockFn) + scadaHttp.mux.HandleFunc("/debug/pprof/cmdline", mockFn) + scadaHttp.mux.HandleFunc("/debug/pprof/profile", mockFn) + scadaHttp.mux.HandleFunc("/debug/pprof/symbol", mockFn) +} + // assertIndex tests that X-Consul-Index is set and non-zero func assertIndex(t *testing.T, resp *httptest.ResponseRecorder) { header := resp.Header().Get("X-Consul-Index") From ed70720d55c4b820ff8acd32589f10656b155f48 Mon Sep 17 00:00:00 2001 From: Ryan Uber Date: Tue, 25 Aug 2015 20:22:22 -0700 Subject: [PATCH 6/8] agent: testing scada client creation in command --- command/agent/agent_test.go | 2 ++ command/agent/command_test.go | 53 +++++++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+) diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index e08fcc51e2..e5fcfd09db 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -25,6 +25,8 @@ func nextConfig() *Config { idx := int(atomic.AddUint64(&offset, 1)) conf := DefaultConfig() + conf.Version = "a.b" + conf.VersionPrerelease = "c.d" conf.AdvertiseAddr = "127.0.0.1" conf.Bootstrap = true conf.Datacenter = "dc1" diff --git a/command/agent/command_test.go b/command/agent/command_test.go index b54bd45c5b..2b8a027f43 100644 --- a/command/agent/command_test.go +++ b/command/agent/command_test.go @@ -6,6 +6,7 @@ import ( "io/ioutil" "log" "os" + "strings" "testing" "github.com/hashicorp/consul/testutil" @@ -246,3 +247,55 @@ func TestSetupAgent_RPCUnixSocket_FileExists(t *testing.T) { t.Fatalf("bad permissions: %s", fi.Mode()) } } + +func TestSetupScadaConn(t *testing.T) { + // Create a config and assign an infra name + conf1 := nextConfig() + conf1.AtlasInfrastructure = "hashicorp/test1" + conf1.AtlasToken = "abc" + + dir, agent := makeAgent(t, conf1) + defer os.RemoveAll(dir) + defer agent.Shutdown() + + cmd := &Command{ + ShutdownCh: make(chan struct{}), + Ui: new(cli.MockUi), + agent: agent, + } + + // First start creates the scada conn + if err := cmd.setupScadaConn(conf1); err != nil { + t.Fatalf("err: %s", err) + } + list := cmd.scadaHttp.listener.(*scadaListener) + if list == nil || list.addr.infra != "hashicorp/test1" { + t.Fatalf("bad: %#v", list) + } + http1 := cmd.scadaHttp + provider1 := cmd.scadaProvider + + // Performing setup again tears down original and replaces + // with a new SCADA client. + conf2 := nextConfig() + conf2.AtlasInfrastructure = "hashicorp/test2" + conf2.AtlasToken = "123" + if err := cmd.setupScadaConn(conf2); err != nil { + t.Fatalf("err: %s", err) + } + if cmd.scadaHttp == http1 || cmd.scadaProvider == provider1 { + t.Fatalf("bad: %#v", cmd) + } + list = cmd.scadaHttp.listener.(*scadaListener) + if list == nil || list.addr.infra != "hashicorp/test2" { + t.Fatalf("bad: %#v", list) + } + + // Original provider and listener must be closed + if !provider1.IsShutdown() { + t.Fatalf("should be shutdown") + } + if _, err := http1.listener.Accept(); !strings.Contains(err.Error(), "closed") { + t.Fatalf("should be closed") + } +} From 4b715a7d2c7efa7cef614eb0d1869e6bf2690b29 Mon Sep 17 00:00:00 2001 From: Ryan Uber Date: Tue, 25 Aug 2015 20:43:57 -0700 Subject: [PATCH 7/8] agent: don't reload scada client if there is no config change --- command/agent/command.go | 11 +++++++---- command/agent/command_test.go | 2 +- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/command/agent/command.go b/command/agent/command.go index f6d8ac6337..e7c5d2641f 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -906,10 +906,13 @@ func (c *Command) handleReload(config *Config) *Config { }(wp) } - // Reload SCADA client - if err := c.setupScadaConn(newConf); err != nil { - c.Ui.Error(fmt.Sprintf("Failed reloading SCADA client: %s", err)) - return nil + // Reload SCADA client if we have a change + if newConf.AtlasInfrastructure != config.AtlasInfrastructure || + newConf.AtlasToken != config.AtlasToken { + if err := c.setupScadaConn(newConf); err != nil { + c.Ui.Error(fmt.Sprintf("Failed reloading SCADA client: %s", err)) + return nil + } } return newConf diff --git a/command/agent/command_test.go b/command/agent/command_test.go index 2b8a027f43..9839446981 100644 --- a/command/agent/command_test.go +++ b/command/agent/command_test.go @@ -284,7 +284,7 @@ func TestSetupScadaConn(t *testing.T) { t.Fatalf("err: %s", err) } if cmd.scadaHttp == http1 || cmd.scadaProvider == provider1 { - t.Fatalf("bad: %#v", cmd) + t.Fatalf("should change: %#v %#v", cmd.scadaHttp, cmd.scadaProvider) } list = cmd.scadaHttp.listener.(*scadaListener) if list == nil || list.addr.infra != "hashicorp/test2" { From 5ad8bfbd41e9f5dbf0159fe22b1c57b41fa49622 Mon Sep 17 00:00:00 2001 From: Ryan Uber Date: Tue, 25 Aug 2015 21:03:16 -0700 Subject: [PATCH 8/8] agent: log a message when making a new scada connection --- command/agent/command.go | 1 + 1 file changed, 1 insertion(+) diff --git a/command/agent/command.go b/command/agent/command.go index e7c5d2641f..4805345895 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -935,6 +935,7 @@ func (c *Command) setupScadaConn(config *Config) error { } // Create the new provider and listener + c.Ui.Output("Connecting to Atlas: " + config.AtlasInfrastructure) provider, list, err := NewProvider(config, c.logOutput) if err != nil { return err