From 6977e40077587e8c024f850cdacbf2ec595758c1 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Sat, 24 Jun 2017 12:52:41 -0700 Subject: [PATCH] Fixes watch tracking during reloads and fixes address issue. (#3189) This patch fixes watch registration through the config file and a broken log line when the watch registration fails. It also plumbs all the watch loading through a common function and tweaks the unit test to create the watch before the reload. --- agent/agent.go | 73 +++++++++++++++--------------------- agent/agent_endpoint_test.go | 27 ++++++++++--- agent/config.go | 2 +- command/agent.go | 7 ++-- watch/plan.go | 6 +++ 5 files changed, 63 insertions(+), 52 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 26f0416321..e2e5a7af42 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -29,7 +29,6 @@ import ( "github.com/hashicorp/consul/logger" "github.com/hashicorp/consul/types" "github.com/hashicorp/consul/watch" - multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/go-sockaddr/template" "github.com/hashicorp/go-uuid" "github.com/hashicorp/raft" @@ -174,6 +173,10 @@ type Agent struct { // wgServers is the wait group for all HTTP and DNS servers wgServers sync.WaitGroup + + // watchPlans tracks all the currently-running watch plans for the + // agent. + watchPlans []*watch.Plan } func New(c *Config) (*Agent, error) { @@ -317,7 +320,7 @@ func (a *Agent) Start() error { } // register watches - if err := a.registerWatches(); err != nil { + if err := a.reloadWatches(a.config); err != nil { return err } @@ -496,11 +499,11 @@ func (a *Agent) serveHTTP(l net.Listener, srv *HTTPServer) error { } } -func (a *Agent) registerWatches() error { - if len(a.config.WatchPlans) == 0 { - return nil - } - addrs, err := a.config.HTTPAddrs() +// reloadWatches stops any existing watch plans and attempts to load the given +// set of watches. +func (a *Agent) reloadWatches(cfg *Config) error { + // Watches use the API to talk to this agent, so that must be enabled. + addrs, err := cfg.HTTPAddrs() if err != nil { return err } @@ -508,7 +511,15 @@ func (a *Agent) registerWatches() error { return fmt.Errorf("watch plans require an HTTP or HTTPS endpoint") } - for _, wp := range a.config.WatchPlans { + // Stop the current watches. + for _, wp := range a.watchPlans { + wp.Stop() + } + a.watchPlans = nil + + // Fire off a goroutine for each new watch plan. + for _, wp := range cfg.WatchPlans { + a.watchPlans = append(a.watchPlans, wp) go func(wp *watch.Plan) { wp.Handler = makeWatchHandler(a.LogOutput, wp.Exempt["handler"]) wp.LogOutput = a.LogOutput @@ -517,7 +528,7 @@ func (a *Agent) registerWatches() error { addr = "unix://" + addr } if err := wp.Run(addr); err != nil { - a.logger.Println("[ERR] Failed to run watch: %v", err) + a.logger.Printf("[ERR] Failed to run watch: %v", err) } }(wp) } @@ -2302,9 +2313,7 @@ func (a *Agent) DisableNodeMaintenance() { a.logger.Printf("[INFO] agent: Node left maintenance mode") } -func (a *Agent) ReloadConfig(newCfg *Config) (bool, error) { - var errs error - +func (a *Agent) ReloadConfig(newCfg *Config) error { // Bulk update the services and checks a.PauseSync() defer a.ResumeSync() @@ -2316,50 +2325,28 @@ func (a *Agent) ReloadConfig(newCfg *Config) (bool, error) { // First unload all checks, services, and metadata. This lets us begin the reload // with a clean slate. if err := a.unloadServices(); err != nil { - errs = multierror.Append(errs, fmt.Errorf("Failed unloading services: %s", err)) - return false, errs + return fmt.Errorf("Failed unloading services: %s", err) } if err := a.unloadChecks(); err != nil { - errs = multierror.Append(errs, fmt.Errorf("Failed unloading checks: %s", err)) - return false, errs + return fmt.Errorf("Failed unloading checks: %s", err) } a.unloadMetadata() // Reload service/check definitions and metadata. if err := a.loadServices(newCfg); err != nil { - errs = multierror.Append(errs, fmt.Errorf("Failed reloading services: %s", err)) - return false, errs + return fmt.Errorf("Failed reloading services: %s", err) } if err := a.loadChecks(newCfg); err != nil { - errs = multierror.Append(errs, fmt.Errorf("Failed reloading checks: %s", err)) - return false, errs + return fmt.Errorf("Failed reloading checks: %s", err) } if err := a.loadMetadata(newCfg); err != nil { - errs = multierror.Append(errs, fmt.Errorf("Failed reloading metadata: %s", err)) - return false, errs + return fmt.Errorf("Failed reloading metadata: %s", err) } - // Get the new client listener addr - httpAddr, err := newCfg.ClientListener(a.config.Addresses.HTTP, a.config.Ports.HTTP) - if err != nil { - errs = multierror.Append(errs, fmt.Errorf("Failed to determine HTTP address: %v", err)) + // Reload the watches. + if err := a.reloadWatches(newCfg); err != nil { + return fmt.Errorf("Failed reloading watches: %v", err) } - // Deregister the old watches - for _, wp := range a.config.WatchPlans { - wp.Stop() - } - - // Register the new watches - for _, wp := range newCfg.WatchPlans { - go func(wp *watch.Plan) { - wp.Handler = makeWatchHandler(a.LogOutput, wp.Exempt["handler"]) - wp.LogOutput = a.LogOutput - if err := wp.Run(httpAddr.String()); err != nil { - errs = multierror.Append(errs, fmt.Errorf("Error running watch: %v", err)) - } - }(wp) - } - - return true, errs + return nil } diff --git a/agent/agent_endpoint_test.go b/agent/agent_endpoint_test.go index fe101a4256..31e264ac8f 100644 --- a/agent/agent_endpoint_test.go +++ b/agent/agent_endpoint_test.go @@ -18,6 +18,7 @@ import ( "github.com/hashicorp/consul/logger" "github.com/hashicorp/consul/testutil/retry" "github.com/hashicorp/consul/types" + "github.com/hashicorp/consul/watch" "github.com/hashicorp/serf/serf" ) @@ -237,6 +238,19 @@ func TestAgent_Reload(t *testing.T) { cfg.Services = []*structs.ServiceDefinition{ &structs.ServiceDefinition{Name: "redis"}, } + + params := map[string]interface{}{ + "datacenter": "dc1", + "type": "key", + "key": "test", + "handler": "true", + } + wp, err := watch.ParseExempt(params, []string{"handler"}) + if err != nil { + t.Fatalf("Expected watch.Parse to succeed %v", err) + } + cfg.WatchPlans = append(cfg.WatchPlans, wp) + a := NewTestAgent(t.Name(), cfg) defer a.Shutdown() @@ -250,16 +264,19 @@ func TestAgent_Reload(t *testing.T) { &structs.ServiceDefinition{Name: "redis-reloaded"}, } - ok, err := a.ReloadConfig(cfg2) - if err != nil { + if err := a.ReloadConfig(cfg2); err != nil { t.Fatalf("got error %v want nil", err) } - if !ok { - t.Fatalf("got ok %v want true") - } if _, ok := a.state.services["redis-reloaded"]; !ok { t.Fatalf("missing redis-reloaded service") } + + // Verify that previous config's watch plans were stopped. + for _, wp := range cfg.WatchPlans { + if !wp.IsStopped() { + t.Fatalf("Reloading configs should stop watch plans of the previous configuration") + } + } } func TestAgent_Reload_ACLDeny(t *testing.T) { diff --git a/agent/config.go b/agent/config.go index 380739b9b7..1d187df865 100644 --- a/agent/config.go +++ b/agent/config.go @@ -803,7 +803,7 @@ type ProtoAddr struct { } func (p ProtoAddr) String() string { - return p.Proto + "+" + p.Net + "://" + p.Addr + return p.Proto + "://" + p.Addr } func (c *Config) DNSAddrs() ([]ProtoAddr, error) { diff --git a/command/agent.go b/command/agent.go index b44f21668f..3a0aeb4d20 100644 --- a/command/agent.go +++ b/command/agent.go @@ -845,10 +845,11 @@ func (cmd *AgentCommand) handleReload(agent *agent.Agent, cfg *agent.Config) (*a newCfg.LogLevel = cfg.LogLevel } - ok, errs := agent.ReloadConfig(newCfg) - if ok { - return newCfg, errs + if err := agent.ReloadConfig(newCfg); err != nil { + errs = multierror.Append(fmt.Errorf( + "Failed to reload configs: %v", err)) } + return cfg, errs } diff --git a/watch/plan.go b/watch/plan.go index bb939ae663..2b92ca94a4 100644 --- a/watch/plan.go +++ b/watch/plan.go @@ -118,3 +118,9 @@ func (p *Plan) shouldStop() bool { return false } } + +func (p *Plan) IsStopped() bool { + p.stopLock.Lock() + defer p.stopLock.Unlock() + return p.stop +}