Merge pull request #1199 from hashicorp/f-scada-reload

SCADA client is reload-able
pull/1201/head
Ryan Uber 2015-08-26 11:46:50 -07:00
commit b0fcb6c234
7 changed files with 168 additions and 39 deletions

View File

@ -25,6 +25,8 @@ func nextConfig() *Config {
idx := int(atomic.AddUint64(&offset, 1))
conf := DefaultConfig()
conf.Version = "a.b"
conf.VersionPrerelease = "c.d"
conf.AdvertiseAddr = "127.0.0.1"
conf.Bootstrap = true
conf.Datacenter = "dc1"

View File

@ -48,6 +48,7 @@ type Command struct {
httpServers []*HTTPServer
dnsServer *DNSServer
scadaProvider *scada.Provider
scadaHttp *HTTPServer
}
// readConfig is responsible for setup of our configuration using
@ -345,20 +346,14 @@ func (c *Command) setupAgent(config *Config, logOutput io.Writer, logWriter *log
c.rpcServer = NewAgentRPC(agent, rpcListener, logOutput, logWriter)
// Enable the SCADA integration
var scadaList net.Listener
if config.AtlasInfrastructure != "" {
provider, list, err := NewProvider(config, logOutput)
if err != nil {
agent.Shutdown()
c.Ui.Error(fmt.Sprintf("Error starting SCADA connection: %s", err))
return err
}
c.scadaProvider = provider
scadaList = list
if err := c.setupScadaConn(config); err != nil {
agent.Shutdown()
c.Ui.Error(fmt.Sprintf("Error starting SCADA connection: %s", err))
return err
}
if config.Ports.HTTP > 0 || config.Ports.HTTPS > 0 || scadaList != nil {
servers, err := NewHTTPServers(agent, config, scadaList, logOutput)
if config.Ports.HTTP > 0 || config.Ports.HTTPS > 0 {
servers, err := NewHTTPServers(agent, config, logOutput)
if err != nil {
agent.Shutdown()
c.Ui.Error(fmt.Sprintf("Error starting http servers: %s", err))
@ -684,9 +679,16 @@ AFTER_MIGRATE:
for _, server := range c.httpServers {
defer server.Shutdown()
}
if c.scadaProvider != nil {
defer c.scadaProvider.Shutdown()
}
// Check and shut down the SCADA listeners at the end
defer func() {
if c.scadaHttp != nil {
c.scadaHttp.Shutdown()
}
if c.scadaProvider != nil {
c.scadaProvider.Shutdown()
}
}()
// Join startup nodes if specified
if err := c.startupJoin(config); err != nil {
@ -904,9 +906,45 @@ func (c *Command) handleReload(config *Config) *Config {
}(wp)
}
// Reload SCADA client if we have a change
if newConf.AtlasInfrastructure != config.AtlasInfrastructure ||
newConf.AtlasToken != config.AtlasToken {
if err := c.setupScadaConn(newConf); err != nil {
c.Ui.Error(fmt.Sprintf("Failed reloading SCADA client: %s", err))
return nil
}
}
return newConf
}
// startScadaClient is used to start a new SCADA provider and listener,
// replacing any existing listeners.
func (c *Command) setupScadaConn(config *Config) error {
// Shut down existing SCADA listeners
if c.scadaProvider != nil {
c.scadaProvider.Shutdown()
}
if c.scadaHttp != nil {
c.scadaHttp.Shutdown()
}
// No-op if we don't have an infrastructure
if config.AtlasInfrastructure == "" {
return nil
}
// Create the new provider and listener
c.Ui.Output("Connecting to Atlas: " + config.AtlasInfrastructure)
provider, list, err := NewProvider(config, c.logOutput)
if err != nil {
return err
}
c.scadaProvider = provider
c.scadaHttp = newScadaHttp(c.agent, list)
return nil
}
func (c *Command) Synopsis() string {
return "Runs a Consul agent"
}

View File

@ -6,6 +6,7 @@ import (
"io/ioutil"
"log"
"os"
"strings"
"testing"
"github.com/hashicorp/consul/testutil"
@ -246,3 +247,55 @@ func TestSetupAgent_RPCUnixSocket_FileExists(t *testing.T) {
t.Fatalf("bad permissions: %s", fi.Mode())
}
}
func TestSetupScadaConn(t *testing.T) {
// Create a config and assign an infra name
conf1 := nextConfig()
conf1.AtlasInfrastructure = "hashicorp/test1"
conf1.AtlasToken = "abc"
dir, agent := makeAgent(t, conf1)
defer os.RemoveAll(dir)
defer agent.Shutdown()
cmd := &Command{
ShutdownCh: make(chan struct{}),
Ui: new(cli.MockUi),
agent: agent,
}
// First start creates the scada conn
if err := cmd.setupScadaConn(conf1); err != nil {
t.Fatalf("err: %s", err)
}
list := cmd.scadaHttp.listener.(*scadaListener)
if list == nil || list.addr.infra != "hashicorp/test1" {
t.Fatalf("bad: %#v", list)
}
http1 := cmd.scadaHttp
provider1 := cmd.scadaProvider
// Performing setup again tears down original and replaces
// with a new SCADA client.
conf2 := nextConfig()
conf2.AtlasInfrastructure = "hashicorp/test2"
conf2.AtlasToken = "123"
if err := cmd.setupScadaConn(conf2); err != nil {
t.Fatalf("err: %s", err)
}
if cmd.scadaHttp == http1 || cmd.scadaProvider == provider1 {
t.Fatalf("should change: %#v %#v", cmd.scadaHttp, cmd.scadaProvider)
}
list = cmd.scadaHttp.listener.(*scadaListener)
if list == nil || list.addr.infra != "hashicorp/test2" {
t.Fatalf("bad: %#v", list)
}
// Original provider and listener must be closed
if !provider1.IsShutdown() {
t.Fatalf("should be shutdown")
}
if _, err := http1.listener.Accept(); !strings.Contains(err.Error(), "closed") {
t.Fatalf("should be closed")
}
}

View File

@ -41,7 +41,7 @@ type HTTPServer struct {
// NewHTTPServers starts new HTTP servers to provide an interface to
// the agent.
func NewHTTPServers(agent *Agent, config *Config, scada net.Listener, logOutput io.Writer) ([]*HTTPServer, error) {
func NewHTTPServers(agent *Agent, config *Config, logOutput io.Writer) ([]*HTTPServer, error) {
var servers []*HTTPServer
if config.Ports.HTTPS > 0 {
@ -142,29 +142,30 @@ func NewHTTPServers(agent *Agent, config *Config, scada net.Listener, logOutput
servers = append(servers, srv)
}
if scada != nil {
// Create the mux
mux := http.NewServeMux()
// Create the server
srv := &HTTPServer{
agent: agent,
mux: mux,
listener: scada,
logger: log.New(logOutput, "", log.LstdFlags),
uiDir: config.UiDir,
addr: scadaHTTPAddr,
}
srv.registerHandlers(false) // Never allow debug for SCADA
// Start the server
go http.Serve(scada, mux)
servers = append(servers, srv)
}
return servers, nil
}
// newScadaHttp creates a new HTTP server wrapping the SCADA
// listener such that HTTP calls can be sent from the brokers.
func newScadaHttp(agent *Agent, list net.Listener) *HTTPServer {
// Create the mux
mux := http.NewServeMux()
// Create the server
srv := &HTTPServer{
agent: agent,
mux: mux,
listener: list,
logger: agent.logger,
addr: scadaHTTPAddr,
}
srv.registerHandlers(false) // Never allow debug for SCADA
// Start the server
go http.Serve(list, mux)
return srv
}
// tcpKeepAliveListener sets TCP keep-alive timeouts on accepted
// connections. It's used by NewHttpServer so
// dead TCP connections eventually go away.

View File

@ -38,7 +38,7 @@ func makeHTTPServerWithConfig(t *testing.T, cb func(c *Config)) (string, *HTTPSe
t.Fatalf("err: %v", err)
}
conf.UiDir = uiDir
servers, err := NewHTTPServers(agent, conf, nil, agent.logOutput)
servers, err := NewHTTPServers(agent, conf, agent.logOutput)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -148,7 +148,7 @@ func TestHTTPServer_UnixSocket_FileExists(t *testing.T) {
defer os.RemoveAll(dir)
// Try to start the server with the same path anyways.
if _, err := NewHTTPServers(agent, conf, nil, agent.logOutput); err != nil {
if _, err := NewHTTPServers(agent, conf, agent.logOutput); err != nil {
t.Fatalf("err: %s", err)
}
@ -516,6 +516,39 @@ func TestACLResolution(t *testing.T) {
})
}
func TestScadaHTTP(t *testing.T) {
// Create the agent
dir, agent := makeAgent(t, nextConfig())
defer os.RemoveAll(dir)
defer agent.Shutdown()
// Create a generic listener
list, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatalf("err: %s", err)
}
defer list.Close()
// Create the SCADA HTTP server
scadaHttp := newScadaHttp(agent, list)
// Returned server uses the listener and scada addr
if scadaHttp.listener != list {
t.Fatalf("bad listener: %#v", scadaHttp)
}
if scadaHttp.addr != scadaHTTPAddr {
t.Fatalf("expected %v, got: %v", scadaHttp.addr, scadaHTTPAddr)
}
// Check that debug endpoints were not enabled. This will cause
// the serve mux to panic if the routes are already handled.
mockFn := func(w http.ResponseWriter, r *http.Request) {}
scadaHttp.mux.HandleFunc("/debug/pprof/", mockFn)
scadaHttp.mux.HandleFunc("/debug/pprof/cmdline", mockFn)
scadaHttp.mux.HandleFunc("/debug/pprof/profile", mockFn)
scadaHttp.mux.HandleFunc("/debug/pprof/symbol", mockFn)
}
// assertIndex tests that X-Consul-Index is set and non-zero
func assertIndex(t *testing.T, resp *httptest.ResponseRecorder) {
header := resp.Header().Get("X-Consul-Index")

View File

@ -70,7 +70,7 @@ func testAgentWithConfig(t *testing.T, cb func(c *agent.Config)) *agentWrapper {
conf.Addresses.HTTP = "127.0.0.1"
httpAddr := fmt.Sprintf("127.0.0.1:%d", conf.Ports.HTTP)
http, err := agent.NewHTTPServers(a, conf, nil, os.Stderr)
http, err := agent.NewHTTPServers(a, conf, os.Stderr)
if err != nil {
os.RemoveAll(dir)
t.Fatalf(fmt.Sprintf("err: %v", err))

View File

@ -643,3 +643,5 @@ items which are reloaded include:
* Services
* Watches
* HTTP Client Address
* Atlas Token
* Atlas Infrastructure