mirror of https://github.com/hashicorp/consul
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
538 lines
18 KiB
538 lines
18 KiB
// Copyright (c) HashiCorp, Inc. |
|
// SPDX-License-Identifier: BUSL-1.1 |
|
|
|
package agent |
|
|
|
import ( |
|
"context" |
|
"errors" |
|
"fmt" |
|
"io" |
|
"net" |
|
"sync" |
|
"time" |
|
|
|
"github.com/armon/go-metrics" |
|
"github.com/armon/go-metrics/prometheus" |
|
"github.com/hashicorp/go-hclog" |
|
wal "github.com/hashicorp/raft-wal" |
|
"github.com/hashicorp/raft-wal/verifier" |
|
"google.golang.org/grpc/grpclog" |
|
|
|
autoconf "github.com/hashicorp/consul/agent/auto-config" |
|
"github.com/hashicorp/consul/agent/cache" |
|
"github.com/hashicorp/consul/agent/config" |
|
"github.com/hashicorp/consul/agent/consul" |
|
"github.com/hashicorp/consul/agent/consul/fsm" |
|
"github.com/hashicorp/consul/agent/consul/rate" |
|
"github.com/hashicorp/consul/agent/consul/stream" |
|
"github.com/hashicorp/consul/agent/consul/usagemetrics" |
|
"github.com/hashicorp/consul/agent/consul/xdscapacity" |
|
"github.com/hashicorp/consul/agent/grpc-external/limiter" |
|
grpcInt "github.com/hashicorp/consul/agent/grpc-internal" |
|
"github.com/hashicorp/consul/agent/grpc-internal/balancer" |
|
"github.com/hashicorp/consul/agent/grpc-internal/resolver" |
|
grpcWare "github.com/hashicorp/consul/agent/grpc-middleware" |
|
"github.com/hashicorp/consul/agent/hcp" |
|
"github.com/hashicorp/consul/agent/leafcert" |
|
"github.com/hashicorp/consul/agent/local" |
|
"github.com/hashicorp/consul/agent/pool" |
|
"github.com/hashicorp/consul/agent/router" |
|
"github.com/hashicorp/consul/agent/rpc/middleware" |
|
"github.com/hashicorp/consul/agent/submatview" |
|
"github.com/hashicorp/consul/agent/token" |
|
"github.com/hashicorp/consul/agent/xds" |
|
"github.com/hashicorp/consul/ipaddr" |
|
"github.com/hashicorp/consul/lib" |
|
"github.com/hashicorp/consul/lib/hoststats" |
|
"github.com/hashicorp/consul/logging" |
|
"github.com/hashicorp/consul/tlsutil" |
|
) |
|
|
|
// TODO: BaseDeps should be renamed in the future once more of Agent.Start |
|
// has been moved out in front of Agent.New, and we can better see the setup |
|
// dependencies. |
|
type BaseDeps struct { |
|
consul.Deps // TODO: un-embed |
|
|
|
RuntimeConfig *config.RuntimeConfig |
|
MetricsConfig *lib.MetricsConfig |
|
AutoConfig *autoconf.AutoConfig // TODO: use an interface |
|
Cache *cache.Cache |
|
LeafCertManager *leafcert.Manager |
|
ViewStore *submatview.Store |
|
WatchedFiles []string |
|
NetRPC *LazyNetRPC |
|
|
|
deregisterBalancer, deregisterResolver func() |
|
stopHostCollector context.CancelFunc |
|
} |
|
|
|
type NetRPC interface { |
|
RPC(ctx context.Context, method string, args any, reply any) error |
|
} |
|
|
|
type LazyNetRPC struct { |
|
mu sync.RWMutex |
|
rpc NetRPC |
|
} |
|
|
|
func (r *LazyNetRPC) SetNetRPC(rpc NetRPC) { |
|
r.mu.Lock() |
|
defer r.mu.Unlock() |
|
r.rpc = rpc |
|
} |
|
|
|
func (r *LazyNetRPC) RPC(ctx context.Context, method string, args any, reply any) error { |
|
r.mu.RLock() |
|
r2 := r.rpc |
|
r.mu.RUnlock() |
|
|
|
if r2 == nil { |
|
return errors.New("rpc: initialization ordering error; net-rpc not ready yet") |
|
} |
|
return r2.RPC(ctx, method, args, reply) |
|
} |
|
|
|
type ConfigLoader func(source config.Source) (config.LoadResult, error) |
|
|
|
func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer, providedLogger hclog.InterceptLogger) (BaseDeps, error) { |
|
d := BaseDeps{} |
|
result, err := configLoader(nil) |
|
if err != nil { |
|
return d, err |
|
} |
|
d.WatchedFiles = result.WatchedFiles |
|
d.Experiments = result.RuntimeConfig.Experiments |
|
cfg := result.RuntimeConfig |
|
logConf := cfg.Logging |
|
logConf.Name = logging.Agent |
|
|
|
if providedLogger != nil { |
|
d.Logger = providedLogger |
|
} else { |
|
d.Logger, err = logging.Setup(logConf, logOut) |
|
if err != nil { |
|
return d, err |
|
} |
|
} |
|
|
|
grpcLogInitOnce.Do(func() { |
|
grpclog.SetLoggerV2(logging.NewGRPCLogger(cfg.Logging.LogLevel, d.Logger)) |
|
}) |
|
|
|
for _, w := range result.Warnings { |
|
d.Logger.Warn(w) |
|
} |
|
|
|
cfg.NodeID, err = newNodeIDFromConfig(cfg, d.Logger) |
|
if err != nil { |
|
return d, fmt.Errorf("failed to setup node ID: %w", err) |
|
} |
|
|
|
isServer := result.RuntimeConfig.ServerMode |
|
gauges, counters, summaries := getPrometheusDefs(cfg, isServer) |
|
cfg.Telemetry.PrometheusOpts.GaugeDefinitions = gauges |
|
cfg.Telemetry.PrometheusOpts.CounterDefinitions = counters |
|
cfg.Telemetry.PrometheusOpts.SummaryDefinitions = summaries |
|
|
|
var extraSinks []metrics.MetricSink |
|
if cfg.IsCloudEnabled() { |
|
// This values is set late within newNodeIDFromConfig above |
|
cfg.Cloud.NodeID = cfg.NodeID |
|
|
|
d.HCP, err = hcp.NewDeps(cfg.Cloud, d.Logger.Named("hcp")) |
|
if err != nil { |
|
return d, err |
|
} |
|
if d.HCP.Sink != nil { |
|
extraSinks = append(extraSinks, d.HCP.Sink) |
|
} |
|
} |
|
|
|
d.MetricsConfig, err = lib.InitTelemetry(cfg.Telemetry, d.Logger, extraSinks...) |
|
if err != nil { |
|
return d, fmt.Errorf("failed to initialize telemetry: %w", err) |
|
} |
|
if !cfg.Telemetry.Disable && cfg.Telemetry.EnableHostMetrics { |
|
ctx, cancel := context.WithCancel(context.Background()) |
|
hoststats.NewCollector(ctx, d.Logger, cfg.DataDir) |
|
d.stopHostCollector = cancel |
|
} |
|
|
|
d.TLSConfigurator, err = tlsutil.NewConfigurator(cfg.TLS, d.Logger) |
|
if err != nil { |
|
return d, err |
|
} |
|
|
|
d.RuntimeConfig = cfg |
|
d.Tokens = new(token.Store) |
|
|
|
cfg.Cache.Logger = d.Logger.Named("cache") |
|
// cache-types are not registered yet, but they won't be used until the components are started. |
|
d.Cache = cache.New(cfg.Cache) |
|
d.ViewStore = submatview.NewStore(d.Logger.Named("viewstore")) |
|
d.ConnPool = newConnPool(cfg, d.Logger, d.TLSConfigurator) |
|
|
|
d.NetRPC = &LazyNetRPC{} |
|
|
|
// TODO: create leafCertManager in BaseDeps once NetRPC is available without Agent |
|
d.LeafCertManager = leafcert.NewManager(leafcert.Deps{ |
|
Logger: d.Logger.Named("leaf-certs"), |
|
CertSigner: leafcert.NewNetRPCCertSigner(d.NetRPC), |
|
RootsReader: leafcert.NewCachedRootsReader(d.Cache, cfg.Datacenter), |
|
Config: leafcert.Config{ |
|
TestOverrideCAChangeInitialDelay: cfg.ConnectTestCALeafRootChangeSpread, |
|
}, |
|
}) |
|
// Set the leaf cert manager in the embedded deps type so it can be used by consul servers. |
|
d.Deps.LeafCertManager = d.LeafCertManager |
|
|
|
agentType := "client" |
|
if cfg.ServerMode { |
|
agentType = "server" |
|
} |
|
|
|
resolverBuilder := resolver.NewServerResolverBuilder(resolver.Config{ |
|
AgentType: agentType, |
|
Datacenter: cfg.Datacenter, |
|
// Set the authority to something sufficiently unique so any usage in |
|
// tests would be self-isolating in the global resolver map, while also |
|
// not incurring a huge penalty for non-test code. |
|
Authority: cfg.Datacenter + "." + string(cfg.NodeID), |
|
}) |
|
resolver.Register(resolverBuilder) |
|
d.deregisterResolver = func() { |
|
resolver.Deregister(resolverBuilder.Authority()) |
|
} |
|
|
|
balancerBuilder := balancer.NewBuilder( |
|
resolverBuilder.Authority(), |
|
d.Logger.Named("grpc.balancer"), |
|
) |
|
balancerBuilder.Register() |
|
d.deregisterBalancer = balancerBuilder.Deregister |
|
|
|
d.GRPCConnPool = grpcInt.NewClientConnPool(grpcInt.ClientConnPoolConfig{ |
|
Servers: resolverBuilder, |
|
SrcAddr: d.ConnPool.SrcAddr, |
|
TLSWrapper: grpcInt.TLSWrapper(d.TLSConfigurator.OutgoingRPCWrapper()), |
|
ALPNWrapper: grpcInt.ALPNWrapper(d.TLSConfigurator.OutgoingALPNRPCWrapper()), |
|
UseTLSForDC: d.TLSConfigurator.UseTLS, |
|
DialingFromServer: cfg.ServerMode, |
|
DialingFromDatacenter: cfg.Datacenter, |
|
}) |
|
d.LeaderForwarder = resolverBuilder |
|
|
|
d.Router = router.NewRouter( |
|
d.Logger, |
|
cfg.Datacenter, |
|
fmt.Sprintf("%s.%s", cfg.NodeName, cfg.Datacenter), |
|
grpcInt.NewTracker(resolverBuilder, balancerBuilder), |
|
) |
|
|
|
// this needs to happen prior to creating auto-config as some of the dependencies |
|
// must also be passed to auto-config |
|
d, err = initEnterpriseBaseDeps(d, cfg) |
|
if err != nil { |
|
return d, err |
|
} |
|
|
|
acConf := autoconf.Config{ |
|
DirectRPC: d.ConnPool, |
|
Logger: d.Logger, |
|
Loader: configLoader, |
|
ServerProvider: d.Router, |
|
TLSConfigurator: d.TLSConfigurator, |
|
Cache: d.Cache, |
|
LeafCertManager: d.LeafCertManager, |
|
Tokens: d.Tokens, |
|
EnterpriseConfig: initEnterpriseAutoConfig(d.EnterpriseDeps, cfg), |
|
} |
|
|
|
d.AutoConfig, err = autoconf.New(acConf) |
|
if err != nil { |
|
return d, err |
|
} |
|
|
|
d.NewRequestRecorderFunc = middleware.NewRequestRecorder |
|
d.GetNetRPCInterceptorFunc = middleware.GetNetRPCInterceptor |
|
|
|
d.EventPublisher = stream.NewEventPublisher(10 * time.Second) |
|
|
|
d.XDSStreamLimiter = limiter.NewSessionLimiter() |
|
|
|
d.Registry = consul.NewTypeRegistry() |
|
|
|
return d, nil |
|
} |
|
|
|
// Close cleans up any state and goroutines associated to bd's members not |
|
// handled by something else (e.g. the agent stop channel). |
|
func (bd BaseDeps) Close() { |
|
bd.AutoConfig.Stop() |
|
bd.LeafCertManager.Stop() |
|
bd.MetricsConfig.Cancel() |
|
|
|
for _, fn := range []func(){bd.deregisterBalancer, bd.deregisterResolver, bd.stopHostCollector} { |
|
if fn != nil { |
|
fn() |
|
} |
|
} |
|
} |
|
|
|
// grpcLogInitOnce because the test suite will call NewBaseDeps in many tests and |
|
// causes data races when it is re-initialized. |
|
var grpcLogInitOnce sync.Once |
|
|
|
func newConnPool(config *config.RuntimeConfig, logger hclog.Logger, tls *tlsutil.Configurator) *pool.ConnPool { |
|
var rpcSrcAddr *net.TCPAddr |
|
if !ipaddr.IsAny(config.RPCBindAddr) { |
|
rpcSrcAddr = &net.TCPAddr{IP: config.RPCBindAddr.IP} |
|
} |
|
|
|
pool := &pool.ConnPool{ |
|
Server: config.ServerMode, |
|
SrcAddr: rpcSrcAddr, |
|
Logger: logger.StandardLogger(&hclog.StandardLoggerOptions{InferLevels: true}), |
|
TLSConfigurator: tls, |
|
Datacenter: config.Datacenter, |
|
RPCHoldTimeout: config.RPCHoldTimeout, |
|
MaxQueryTime: config.MaxQueryTime, |
|
DefaultQueryTime: config.DefaultQueryTime, |
|
} |
|
pool.SetRPCClientTimeout(config.RPCClientTimeout) |
|
if config.ServerMode { |
|
pool.MaxTime = 2 * time.Minute |
|
pool.MaxStreams = 64 |
|
} else { |
|
// MaxTime controls how long we keep an idle connection open to a server. |
|
// 127s was chosen as the first prime above 120s |
|
// (arbitrarily chose to use a prime) with the intent of reusing |
|
// connections who are used by once-a-minute cron(8) jobs *and* who |
|
// use a 60s jitter window (e.g. in vixie cron job execution can |
|
// drift by up to 59s per job, or 119s for a once-a-minute cron job). |
|
pool.MaxTime = 127 * time.Second |
|
pool.MaxStreams = 32 |
|
} |
|
return pool |
|
} |
|
|
|
// getPrometheusDefs reaches into every slice of prometheus defs we've defined in each part of the agent, and appends |
|
// all of our slices into one nice slice of definitions per metric type for the Consul agent to pass to go-metrics. |
|
func getPrometheusDefs(cfg *config.RuntimeConfig, isServer bool) ([]prometheus.GaugeDefinition, []prometheus.CounterDefinition, []prometheus.SummaryDefinition) { |
|
// TODO: "raft..." metrics come from the raft lib and we should migrate these to a telemetry |
|
// package within. In the mean time, we're going to define a few here because they're key to monitoring Consul. |
|
raftGauges := []prometheus.GaugeDefinition{ |
|
{ |
|
Name: []string{"raft", "fsm", "lastRestoreDuration"}, |
|
Help: "This measures how long the last FSM restore (from disk or leader) took.", |
|
}, |
|
{ |
|
Name: []string{"raft", "leader", "oldestLogAge"}, |
|
Help: "This measures how old the oldest log in the leader's log store is.", |
|
}, |
|
} |
|
|
|
serverGauges := []prometheus.GaugeDefinition{ |
|
{ |
|
Name: []string{"server", "isLeader"}, |
|
Help: "Tracks if the server is a leader.", |
|
}, |
|
} |
|
|
|
// Build slice of slices for all gauge definitions |
|
var gauges = [][]prometheus.GaugeDefinition{ |
|
cache.Gauges, |
|
consul.RPCGauges, |
|
consul.SessionGauges, |
|
grpcWare.StatsGauges, |
|
xds.StatsGauges, |
|
usagemetrics.Gauges, |
|
consul.ReplicationGauges, |
|
CertExpirationGauges, |
|
Gauges, |
|
raftGauges, |
|
serverGauges, |
|
} |
|
|
|
if cfg.Telemetry.EnableHostMetrics { |
|
gauges = append(gauges, hoststats.Gauges) |
|
} |
|
|
|
// TODO(ffmmm): conditionally add only leader specific metrics to gauges, counters, summaries, etc |
|
if isServer { |
|
gauges = append(gauges, |
|
consul.AutopilotGauges, |
|
consul.LeaderCertExpirationGauges, |
|
consul.LeaderPeeringMetrics, |
|
xdscapacity.StatsGauges, |
|
) |
|
} |
|
|
|
if isServer && cfg.RaftLogStoreConfig.Verification.Enabled { |
|
verifierGauges := make([]prometheus.GaugeDefinition, 0) |
|
for _, d := range verifier.MetricDefinitions.Gauges { |
|
verifierGauges = append(verifierGauges, prometheus.GaugeDefinition{ |
|
Name: []string{"raft", "logstore", "verifier", d.Name}, |
|
Help: d.Desc, |
|
}) |
|
} |
|
gauges = append(gauges, verifierGauges) |
|
} |
|
|
|
if isServer && |
|
(cfg.RaftLogStoreConfig.Backend == consul.LogStoreBackendWAL || |
|
cfg.RaftLogStoreConfig.Backend == consul.LogStoreBackendDefault) { |
|
|
|
walGauges := make([]prometheus.GaugeDefinition, 0) |
|
for _, d := range wal.MetricDefinitions.Gauges { |
|
walGauges = append(walGauges, prometheus.GaugeDefinition{ |
|
Name: []string{"raft", "wal", d.Name}, |
|
Help: d.Desc, |
|
}) |
|
} |
|
gauges = append(gauges, walGauges) |
|
} |
|
|
|
// Flatten definitions |
|
// NOTE(kit): Do we actually want to create a set here so we can ensure definition names are unique? |
|
var gaugeDefs []prometheus.GaugeDefinition |
|
for _, g := range gauges { |
|
// Set Consul to each definition's namespace |
|
// TODO(kit): Prepending the service to each definition should be handled by go-metrics |
|
var withService []prometheus.GaugeDefinition |
|
for _, gauge := range g { |
|
gauge.Name = append([]string{cfg.Telemetry.MetricsPrefix}, gauge.Name...) |
|
withService = append(withService, gauge) |
|
} |
|
gaugeDefs = append(gaugeDefs, withService...) |
|
} |
|
|
|
raftCounters := []prometheus.CounterDefinition{ |
|
// TODO(kit): "raft..." metrics come from the raft lib and we should migrate these to a telemetry |
|
// package within. In the mean time, we're going to define a few here because they're key to monitoring Consul. |
|
{ |
|
Name: []string{"raft", "apply"}, |
|
Help: "This counts the number of Raft transactions occurring over the interval.", |
|
}, |
|
{ |
|
Name: []string{"raft", "state", "candidate"}, |
|
Help: "This increments whenever a Consul server starts an election.", |
|
}, |
|
{ |
|
Name: []string{"raft", "state", "leader"}, |
|
Help: "This increments whenever a Consul server becomes a leader.", |
|
}, |
|
} |
|
|
|
var counters = [][]prometheus.CounterDefinition{ |
|
CatalogCounters, |
|
cache.Counters, |
|
consul.ACLCounters, |
|
consul.CatalogCounters, |
|
consul.ClientCounters, |
|
consul.RPCCounters, |
|
grpcWare.StatsCounters, |
|
local.StateCounters, |
|
xds.StatsCounters, |
|
raftCounters, |
|
rate.Counters, |
|
} |
|
|
|
// For some unknown reason, we seem to add the raft counters above without |
|
// checking if this is a server like we do above for some of the summaries |
|
// above. We should probably fix that but I want to not change behavior right |
|
// now. If we are a server, add summaries for WAL and verifier metrics. |
|
if isServer && cfg.RaftLogStoreConfig.Verification.Enabled { |
|
verifierCounters := make([]prometheus.CounterDefinition, 0) |
|
for _, d := range verifier.MetricDefinitions.Counters { |
|
verifierCounters = append(verifierCounters, prometheus.CounterDefinition{ |
|
Name: []string{"raft", "logstore", "verifier", d.Name}, |
|
Help: d.Desc, |
|
}) |
|
} |
|
counters = append(counters, verifierCounters) |
|
} |
|
if isServer && |
|
(cfg.RaftLogStoreConfig.Backend == consul.LogStoreBackendWAL || |
|
cfg.RaftLogStoreConfig.Backend == consul.LogStoreBackendDefault) { |
|
walCounters := make([]prometheus.CounterDefinition, 0) |
|
for _, d := range wal.MetricDefinitions.Counters { |
|
walCounters = append(walCounters, prometheus.CounterDefinition{ |
|
Name: []string{"raft", "wal", d.Name}, |
|
Help: d.Desc, |
|
}) |
|
} |
|
counters = append(counters, walCounters) |
|
} |
|
|
|
// Flatten definitions |
|
// NOTE(kit): Do we actually want to create a set here so we can ensure definition names are unique? |
|
var counterDefs []prometheus.CounterDefinition |
|
for _, c := range counters { |
|
// TODO(kit): Prepending the service to each definition should be handled by go-metrics |
|
var withService []prometheus.CounterDefinition |
|
for _, counter := range c { |
|
counter.Name = append([]string{cfg.Telemetry.MetricsPrefix}, counter.Name...) |
|
withService = append(withService, counter) |
|
} |
|
counterDefs = append(counterDefs, withService...) |
|
} |
|
|
|
raftSummaries := []prometheus.SummaryDefinition{ |
|
// TODO(kit): "raft..." metrics come from the raft lib and we should migrate these to a telemetry |
|
// package within. In the mean time, we're going to define a few here because they're key to monitoring Consul. |
|
{ |
|
Name: []string{"raft", "commitTime"}, |
|
Help: "This measures the time it takes to commit a new entry to the Raft log on the leader.", |
|
}, |
|
{ |
|
Name: []string{"raft", "leader", "lastContact"}, |
|
Help: "Measures the time since the leader was last able to contact the follower nodes when checking its leader lease.", |
|
}, |
|
{ |
|
Name: []string{"raft", "snapshot", "persist"}, |
|
Help: "Measures the time it takes raft to write a new snapshot to disk.", |
|
}, |
|
{ |
|
Name: []string{"raft", "rpc", "installSnapshot"}, |
|
Help: "Measures the time it takes the raft leader to install a snapshot on a follower that is catching up after being down or has just joined the cluster.", |
|
}, |
|
} |
|
|
|
var summaries = [][]prometheus.SummaryDefinition{ |
|
HTTPSummaries, |
|
consul.ACLSummaries, |
|
consul.ACLEndpointSummaries, |
|
consul.CatalogSummaries, |
|
consul.FederationStateSummaries, |
|
consul.IntentionSummaries, |
|
consul.KVSummaries, |
|
consul.LeaderSummaries, |
|
consul.PreparedQuerySummaries, |
|
consul.RPCSummaries, |
|
consul.SegmentCESummaries, |
|
consul.SessionSummaries, |
|
consul.SessionEndpointSummaries, |
|
consul.TxnSummaries, |
|
fsm.CommandsSummaries, |
|
fsm.SnapshotSummaries, |
|
raftSummaries, |
|
xds.StatsSummaries, |
|
} |
|
// Flatten definitions |
|
// NOTE(kit): Do we actually want to create a set here so we can ensure definition names are unique? |
|
var summaryDefs []prometheus.SummaryDefinition |
|
for _, s := range summaries { |
|
// TODO(kit): Prepending the service to each definition should be handled by go-metrics |
|
var withService []prometheus.SummaryDefinition |
|
for _, summary := range s { |
|
summary.Name = append([]string{cfg.Telemetry.MetricsPrefix}, summary.Name...) |
|
withService = append(withService, summary) |
|
} |
|
summaryDefs = append(summaryDefs, withService...) |
|
} |
|
|
|
return gaugeDefs, counterDefs, summaryDefs |
|
}
|
|
|