mirror of https://github.com/hashicorp/consul
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
438 lines
12 KiB
438 lines
12 KiB
package agent |
|
|
|
import ( |
|
"flag" |
|
"fmt" |
|
"io" |
|
"log" |
|
"os" |
|
"os/signal" |
|
"path/filepath" |
|
"strings" |
|
"syscall" |
|
"time" |
|
|
|
"github.com/hashicorp/consul/agent" |
|
"github.com/hashicorp/consul/agent/config" |
|
"github.com/hashicorp/consul/command/flags" |
|
"github.com/hashicorp/consul/lib" |
|
"github.com/hashicorp/consul/logger" |
|
"github.com/hashicorp/consul/service_os" |
|
"github.com/hashicorp/go-checkpoint" |
|
multierror "github.com/hashicorp/go-multierror" |
|
"github.com/hashicorp/logutils" |
|
"github.com/mitchellh/cli" |
|
"google.golang.org/grpc/grpclog" |
|
) |
|
|
|
func New(ui cli.Ui, revision, version, versionPre, versionHuman string, shutdownCh <-chan struct{}) *cmd { |
|
ui = &cli.PrefixedUi{ |
|
OutputPrefix: "==> ", |
|
InfoPrefix: " ", |
|
ErrorPrefix: "==> ", |
|
Ui: ui, |
|
} |
|
|
|
c := &cmd{ |
|
UI: ui, |
|
revision: revision, |
|
version: version, |
|
versionPrerelease: versionPre, |
|
versionHuman: versionHuman, |
|
shutdownCh: shutdownCh, |
|
} |
|
c.init() |
|
return c |
|
} |
|
|
|
// AgentCommand is a Command implementation that runs a Consul agent. |
|
// The command will not end unless a shutdown message is sent on the |
|
// ShutdownCh. If two messages are sent on the ShutdownCh it will forcibly |
|
// exit. |
|
type cmd struct { |
|
UI cli.Ui |
|
flags *flag.FlagSet |
|
http *flags.HTTPFlags |
|
help string |
|
revision string |
|
version string |
|
versionPrerelease string |
|
versionHuman string |
|
shutdownCh <-chan struct{} |
|
flagArgs config.Flags |
|
logFilter *logutils.LevelFilter |
|
logOutput io.Writer |
|
logger *log.Logger |
|
} |
|
|
|
func (c *cmd) init() { |
|
c.flags = flag.NewFlagSet("", flag.ContinueOnError) |
|
config.AddFlags(c.flags, &c.flagArgs) |
|
c.help = flags.Usage(help, c.flags) |
|
} |
|
|
|
func (c *cmd) Run(args []string) int { |
|
code := c.run(args) |
|
if c.logger != nil { |
|
c.logger.Println("[INFO] agent: Exit code:", code) |
|
} |
|
return code |
|
} |
|
|
|
// readConfig is responsible for setup of our configuration using |
|
// the command line and any file configs |
|
func (c *cmd) readConfig() *config.RuntimeConfig { |
|
b, err := config.NewBuilder(c.flagArgs) |
|
if err != nil { |
|
c.UI.Error(err.Error()) |
|
return nil |
|
} |
|
cfg, err := b.BuildAndValidate() |
|
if err != nil { |
|
c.UI.Error(err.Error()) |
|
return nil |
|
} |
|
for _, w := range b.Warnings { |
|
c.UI.Warn(w) |
|
} |
|
return &cfg |
|
} |
|
|
|
// checkpointResults is used to handler periodic results from our update checker |
|
func (c *cmd) checkpointResults(results *checkpoint.CheckResponse, err error) { |
|
if err != nil { |
|
c.UI.Error(fmt.Sprintf("Failed to check for updates: %v", err)) |
|
return |
|
} |
|
if results.Outdated { |
|
c.UI.Error(fmt.Sprintf("Newer Consul version available: %s (currently running: %s)", results.CurrentVersion, c.version)) |
|
} |
|
for _, alert := range results.Alerts { |
|
switch alert.Level { |
|
case "info": |
|
c.UI.Info(fmt.Sprintf("Bulletin [%s]: %s (%s)", alert.Level, alert.Message, alert.URL)) |
|
default: |
|
c.UI.Error(fmt.Sprintf("Bulletin [%s]: %s (%s)", alert.Level, alert.Message, alert.URL)) |
|
} |
|
} |
|
} |
|
|
|
func (c *cmd) startupUpdateCheck(config *config.RuntimeConfig) { |
|
version := config.Version |
|
if config.VersionPrerelease != "" { |
|
version += fmt.Sprintf("-%s", config.VersionPrerelease) |
|
} |
|
updateParams := &checkpoint.CheckParams{ |
|
Product: "consul", |
|
Version: version, |
|
} |
|
if !config.DisableAnonymousSignature { |
|
updateParams.SignatureFile = filepath.Join(config.DataDir, "checkpoint-signature") |
|
} |
|
|
|
// Schedule a periodic check with expected interval of 24 hours |
|
checkpoint.CheckInterval(updateParams, 24*time.Hour, c.checkpointResults) |
|
|
|
// Do an immediate check within the next 30 seconds |
|
go func() { |
|
time.Sleep(lib.RandomStagger(30 * time.Second)) |
|
c.checkpointResults(checkpoint.Check(updateParams)) |
|
}() |
|
} |
|
|
|
// startupJoin is invoked to handle any joins specified to take place at start time |
|
func (c *cmd) startupJoin(agent *agent.Agent, cfg *config.RuntimeConfig) error { |
|
if len(cfg.StartJoinAddrsLAN) == 0 { |
|
return nil |
|
} |
|
|
|
c.UI.Output("Joining cluster...") |
|
n, err := agent.JoinLAN(cfg.StartJoinAddrsLAN) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
c.UI.Info(fmt.Sprintf("Join completed. Synced with %d initial agents", n)) |
|
return nil |
|
} |
|
|
|
// startupJoinWan is invoked to handle any joins -wan specified to take place at start time |
|
func (c *cmd) startupJoinWan(agent *agent.Agent, cfg *config.RuntimeConfig) error { |
|
if len(cfg.StartJoinAddrsWAN) == 0 { |
|
return nil |
|
} |
|
|
|
c.UI.Output("Joining -wan cluster...") |
|
n, err := agent.JoinWAN(cfg.StartJoinAddrsWAN) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
c.UI.Info(fmt.Sprintf("Join -wan completed. Synced with %d initial agents", n)) |
|
return nil |
|
} |
|
|
|
func (c *cmd) run(args []string) int { |
|
// Parse our configs |
|
if err := c.flags.Parse(args); err != nil { |
|
if !strings.Contains(err.Error(), "help requested") { |
|
c.UI.Error(fmt.Sprintf("error parsing flags: %v", err)) |
|
} |
|
return 1 |
|
} |
|
c.flagArgs.Args = c.flags.Args() |
|
config := c.readConfig() |
|
if config == nil { |
|
return 1 |
|
} |
|
|
|
// Setup the log outputs |
|
logConfig := &logger.Config{ |
|
LogLevel: config.LogLevel, |
|
EnableSyslog: config.EnableSyslog, |
|
SyslogFacility: config.SyslogFacility, |
|
LogFilePath: config.LogFile, |
|
LogRotateDuration: config.LogRotateDuration, |
|
LogRotateBytes: config.LogRotateBytes, |
|
LogRotateMaxFiles: config.LogRotateMaxFiles, |
|
} |
|
logFilter, logGate, logWriter, logOutput, ok := logger.Setup(logConfig, c.UI) |
|
if !ok { |
|
return 1 |
|
} |
|
c.logFilter = logFilter |
|
c.logOutput = logOutput |
|
c.logger = log.New(logOutput, "", log.LstdFlags) |
|
|
|
// Setup gRPC logger to use the same output/filtering |
|
grpclog.SetLoggerV2(logger.NewGRPCLogger(logConfig, c.logger)) |
|
|
|
memSink, err := lib.InitTelemetry(config.Telemetry) |
|
if err != nil { |
|
c.UI.Error(err.Error()) |
|
return 1 |
|
} |
|
|
|
// Create the agent |
|
c.UI.Output("Starting Consul agent...") |
|
agent, err := agent.New(config, c.logger) |
|
if err != nil { |
|
c.UI.Error(fmt.Sprintf("Error creating agent: %s", err)) |
|
return 1 |
|
} |
|
agent.LogOutput = logOutput |
|
agent.LogWriter = logWriter |
|
agent.MemSink = memSink |
|
|
|
segment := config.SegmentName |
|
if config.ServerMode { |
|
segment = "<all>" |
|
} |
|
|
|
c.UI.Info(fmt.Sprintf(" Version: '%s'", c.versionHuman)) |
|
c.UI.Info(fmt.Sprintf(" Node ID: '%s'", config.NodeID)) |
|
c.UI.Info(fmt.Sprintf(" Node name: '%s'", config.NodeName)) |
|
c.UI.Info(fmt.Sprintf(" Datacenter: '%s' (Segment: '%s')", config.Datacenter, segment)) |
|
c.UI.Info(fmt.Sprintf(" Server: %v (Bootstrap: %v)", config.ServerMode, config.Bootstrap)) |
|
c.UI.Info(fmt.Sprintf(" Client Addr: %v (HTTP: %d, HTTPS: %d, gRPC: %d, DNS: %d)", config.ClientAddrs, |
|
config.HTTPPort, config.HTTPSPort, config.GRPCPort, config.DNSPort)) |
|
c.UI.Info(fmt.Sprintf(" Cluster Addr: %v (LAN: %d, WAN: %d)", config.AdvertiseAddrLAN, |
|
config.SerfPortLAN, config.SerfPortWAN)) |
|
c.UI.Info(fmt.Sprintf(" Encrypt: Gossip: %v, TLS-Outgoing: %v, TLS-Incoming: %v, Auto-Encrypt-TLS: %t", |
|
config.EncryptKey != "", config.VerifyOutgoing, config.VerifyIncoming, config.AutoEncryptTLS || config.AutoEncryptAllowTLS)) |
|
|
|
// Enable log streaming |
|
c.UI.Info("") |
|
c.UI.Output("Log data will now stream in as it occurs:\n") |
|
logGate.Flush() |
|
|
|
// wait for signal |
|
signalCh := make(chan os.Signal, 10) |
|
stopCh := make(chan struct{}) |
|
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGPIPE) |
|
|
|
go func() { |
|
for { |
|
var sig os.Signal |
|
select { |
|
case s := <-signalCh: |
|
sig = s |
|
case <-stopCh: |
|
return |
|
} |
|
|
|
switch sig { |
|
case syscall.SIGPIPE: |
|
continue |
|
|
|
case syscall.SIGHUP: |
|
err := fmt.Errorf("cannot reload before agent started") |
|
c.logger.Printf("[ERR] agent: Caught signal: %s err: %s\n", sig, err) |
|
|
|
default: |
|
c.logger.Println("[INFO] agent: Caught signal: ", sig) |
|
agent.InterruptStartCh <- struct{}{} |
|
return |
|
} |
|
} |
|
}() |
|
|
|
err = agent.Start() |
|
signal.Stop(signalCh) |
|
select { |
|
case stopCh <- struct{}{}: |
|
default: |
|
} |
|
if err != nil { |
|
c.UI.Error(fmt.Sprintf("Error starting agent: %s", err)) |
|
return 1 |
|
} |
|
|
|
// shutdown agent before endpoints |
|
defer agent.ShutdownEndpoints() |
|
defer agent.ShutdownAgent() |
|
|
|
if !config.DisableUpdateCheck && !config.DevMode { |
|
c.startupUpdateCheck(config) |
|
} |
|
|
|
if err := c.startupJoin(agent, config); err != nil { |
|
c.UI.Error(err.Error()) |
|
return 1 |
|
} |
|
|
|
if err := c.startupJoinWan(agent, config); err != nil { |
|
c.UI.Error(err.Error()) |
|
return 1 |
|
} |
|
|
|
// Let the agent know we've finished registration |
|
agent.StartSync() |
|
|
|
c.UI.Output("Consul agent running!") |
|
|
|
// wait for signal |
|
signalCh = make(chan os.Signal, 10) |
|
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGPIPE) |
|
|
|
for { |
|
var sig os.Signal |
|
var reloadErrCh chan error |
|
select { |
|
case s := <-signalCh: |
|
sig = s |
|
case ch := <-agent.ReloadCh(): |
|
sig = syscall.SIGHUP |
|
reloadErrCh = ch |
|
case <-service_os.Shutdown_Channel(): |
|
sig = os.Interrupt |
|
case <-c.shutdownCh: |
|
sig = os.Interrupt |
|
case err := <-agent.RetryJoinCh(): |
|
c.logger.Println("[ERR] agent: Retry join failed: ", err) |
|
return 1 |
|
case <-agent.ShutdownCh(): |
|
// agent is already down! |
|
return 0 |
|
} |
|
|
|
switch sig { |
|
case syscall.SIGPIPE: |
|
continue |
|
|
|
case syscall.SIGHUP: |
|
c.logger.Println("[INFO] agent: Caught signal: ", sig) |
|
|
|
conf, err := c.handleReload(agent, config) |
|
if conf != nil { |
|
config = conf |
|
} |
|
if err != nil { |
|
c.logger.Println("[ERR] agent: Reload config failed: ", err) |
|
} |
|
// Send result back if reload was called via HTTP |
|
if reloadErrCh != nil { |
|
reloadErrCh <- err |
|
} |
|
|
|
default: |
|
c.logger.Println("[INFO] agent: Caught signal: ", sig) |
|
|
|
graceful := (sig == os.Interrupt && !(config.SkipLeaveOnInt)) || (sig == syscall.SIGTERM && (config.LeaveOnTerm)) |
|
if !graceful { |
|
c.logger.Println("[INFO] agent: Graceful shutdown disabled. Exiting") |
|
return 1 |
|
} |
|
|
|
c.logger.Println("[INFO] agent: Gracefully shutting down agent...") |
|
gracefulCh := make(chan struct{}) |
|
go func() { |
|
if err := agent.Leave(); err != nil { |
|
c.logger.Println("[ERR] agent: Error on leave:", err) |
|
return |
|
} |
|
close(gracefulCh) |
|
}() |
|
|
|
gracefulTimeout := 15 * time.Second |
|
select { |
|
case <-signalCh: |
|
c.logger.Printf("[INFO] agent: Caught second signal %v. Exiting\n", sig) |
|
return 1 |
|
case <-time.After(gracefulTimeout): |
|
c.logger.Println("[INFO] agent: Timeout on graceful leave. Exiting") |
|
return 1 |
|
case <-gracefulCh: |
|
c.logger.Println("[INFO] agent: Graceful exit completed") |
|
return 0 |
|
} |
|
} |
|
} |
|
} |
|
|
|
// handleReload is invoked when we should reload our configs, e.g. SIGHUP |
|
func (c *cmd) handleReload(agent *agent.Agent, cfg *config.RuntimeConfig) (*config.RuntimeConfig, error) { |
|
c.logger.Println("[INFO] agent: Reloading configuration...") |
|
var errs error |
|
newCfg := c.readConfig() |
|
if newCfg == nil { |
|
errs = multierror.Append(errs, fmt.Errorf("Failed to reload configs")) |
|
return cfg, errs |
|
} |
|
|
|
// Change the log level |
|
minLevel := logutils.LogLevel(strings.ToUpper(newCfg.LogLevel)) |
|
if logger.ValidateLevelFilter(minLevel, c.logFilter) { |
|
c.logFilter.SetMinLevel(minLevel) |
|
} else { |
|
errs = multierror.Append(fmt.Errorf( |
|
"Invalid log level: %s. Valid log levels are: %v", |
|
minLevel, c.logFilter.Levels)) |
|
|
|
// Keep the current log level |
|
newCfg.LogLevel = cfg.LogLevel |
|
} |
|
|
|
if err := agent.ReloadConfig(newCfg); err != nil { |
|
errs = multierror.Append(fmt.Errorf( |
|
"Failed to reload configs: %v", err)) |
|
} |
|
|
|
return newCfg, errs |
|
} |
|
|
|
func (c *cmd) Synopsis() string { |
|
return synopsis |
|
} |
|
|
|
func (c *cmd) Help() string { |
|
return c.help |
|
} |
|
|
|
const synopsis = "Runs a Consul agent" |
|
const help = ` |
|
Usage: consul agent [options] |
|
|
|
Starts the Consul agent and runs until an interrupt is received. The |
|
agent represents a single node in a cluster. |
|
`
|
|
|